mirror of
https://github.com/zhamao-robot/zhamao-framework.git
synced 2026-03-18 05:04:51 +08:00
Merge pull request #286 from zhamao-robot/fix-schedule-swoole-deadlock
修复排程任务 Swoole 死锁
This commit is contained in:
commit
2133d73a32
@ -109,7 +109,8 @@ class WorkerEventListener
|
||||
}
|
||||
// Windows 系统的 CtrlC 由于和 Select 有一定的冲突,如果没事件解析的话 CtrlC 会阻塞,所以必须添加一个空的计时器
|
||||
if (PHP_OS_FAMILY === 'Windows') {
|
||||
Framework::getInstance()->getDriver()->getEventLoop()->addTimer(1000, function () {}, 0);
|
||||
Framework::getInstance()->getDriver()->getEventLoop()->addTimer(1000, function () {
|
||||
}, 0);
|
||||
}
|
||||
// 回显 debug 日志:进程占用的内存
|
||||
$memory_total = memory_get_usage() / 1024 / 1024;
|
||||
@ -121,6 +122,14 @@ class WorkerEventListener
|
||||
logger()->debug('{is_task}Worker 进程 #{id} 已启动', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]);
|
||||
}
|
||||
|
||||
public function onWorkerExit(): void
|
||||
{
|
||||
// 清除计时器
|
||||
Framework::getInstance()->getDriver()->getEventLoop()->clearAllTimer();
|
||||
$worker_id = ProcessManager::getProcessId();
|
||||
logger()->notice('正在结束 Worker #' . $worker_id . ' 中的任务...');
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws ZMKnownException
|
||||
* @throws \JsonException
|
||||
@ -132,6 +141,11 @@ class WorkerEventListener
|
||||
}
|
||||
logger()->debug('{is_task}Worker 进程 #{id} 正在停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]);
|
||||
|
||||
if (Framework::getInstance()->getDriver()->getName() !== 'swoole') {
|
||||
logger()->debug('清除计时器中');
|
||||
Framework::getInstance()->getDriver()->getEventLoop()->clearAllTimer();
|
||||
}
|
||||
|
||||
if (DIRECTORY_SEPARATOR !== '\\') {
|
||||
ProcessStateManager::removeProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, ProcessManager::getProcessId());
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ use OneBot\Driver\Event\DriverInitEvent;
|
||||
use OneBot\Driver\Event\Http\HttpRequestEvent;
|
||||
use OneBot\Driver\Event\Process\ManagerStartEvent;
|
||||
use OneBot\Driver\Event\Process\ManagerStopEvent;
|
||||
use OneBot\Driver\Event\Process\WorkerExitEvent;
|
||||
use OneBot\Driver\Event\Process\WorkerStartEvent;
|
||||
use OneBot\Driver\Event\Process\WorkerStopEvent;
|
||||
use OneBot\Driver\Event\WebSocket\WebSocketCloseEvent;
|
||||
@ -235,6 +236,7 @@ class Framework
|
||||
ob_event_provider()->addEventListener(WorkerStartEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStart1'], 1);
|
||||
ob_event_provider()->addEventListener(WorkerStopEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStop999'], 999);
|
||||
ob_event_provider()->addEventListener(WorkerStopEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStop1'], 1);
|
||||
ob_event_provider()->addEventListener(WorkerExitEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerExit'], 999);
|
||||
// Http 事件
|
||||
ob_event_provider()->addEventListener(HttpRequestEvent::getName(), [HttpEventListener::getInstance(), 'onRequest999'], 999);
|
||||
ob_event_provider()->addEventListener(HttpRequestEvent::getName(), [HttpEventListener::getInstance(), 'onRequest1'], 1);
|
||||
|
||||
@ -10,12 +10,7 @@ use ZM\Annotation\Framework\Cron;
|
||||
|
||||
class Schedule
|
||||
{
|
||||
/**
|
||||
* 排程任务列表
|
||||
*
|
||||
* @var Cron[]
|
||||
*/
|
||||
private array $schedules = [];
|
||||
private int $next_run = 0;
|
||||
|
||||
/**
|
||||
* 正在执行的排程任务列表
|
||||
@ -24,24 +19,15 @@ class Schedule
|
||||
*/
|
||||
private array $executing = [];
|
||||
|
||||
private bool $available;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$c = Adaptive::getCoroutine();
|
||||
if (!$c instanceof CoroutineInterface) {
|
||||
$this->available = $c instanceof CoroutineInterface;
|
||||
if (!$this->available) {
|
||||
logger()->error('排程任务只能在协程环境下使用');
|
||||
return;
|
||||
}
|
||||
|
||||
// 每秒检查一次,精度为一分钟,从最近的下一分钟开始
|
||||
$c->create(function () {
|
||||
/* @phpstan-ignore-next-line 协程会睡觉的,不会阻塞 */
|
||||
while (true) {
|
||||
$now = time();
|
||||
$this->run();
|
||||
$sleep_time = 60 - ($now % 60);
|
||||
Adaptive::sleep($sleep_time);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,37 +37,34 @@ class Schedule
|
||||
*/
|
||||
public function addSchedule(Cron $cron): void
|
||||
{
|
||||
$this->schedules[] = $cron;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取到期的排程任务
|
||||
*
|
||||
* @return Cron[]
|
||||
*/
|
||||
public function due(): array
|
||||
{
|
||||
return array_filter($this->schedules, fn (Cron $cron) => $cron->expression->isDue());
|
||||
}
|
||||
|
||||
/**
|
||||
* 运行排程任务
|
||||
*/
|
||||
public function run(): void
|
||||
{
|
||||
// 同时运行到期的排程任务
|
||||
foreach ($this->due() as $cron) {
|
||||
// 检查是否重叠执行
|
||||
if ($cron->no_overlap && in_array($cron, $this->executing, true)) {
|
||||
continue;
|
||||
}
|
||||
$this->executing[] = $cron;
|
||||
// 新建一个协程运行排程任务,避免阻塞
|
||||
Adaptive::getCoroutine()->create(function () use ($cron) {
|
||||
$callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method];
|
||||
container()->call($callable);
|
||||
$this->executing = array_diff($this->executing, [$cron]);
|
||||
});
|
||||
if (!$this->available) {
|
||||
return;
|
||||
}
|
||||
$next_run = $cron->expression->getNextRunDate()->getTimestamp();
|
||||
// 防止在同一分钟内重复执行
|
||||
if ($next_run < $this->next_run) {
|
||||
$next_run = $this->next_run;
|
||||
}
|
||||
$this->next_run = $cron->expression->getNextRunDate()->getTimestamp();
|
||||
$wait_ms = max(0, ($next_run - time()) * 1000);
|
||||
Timer::after($wait_ms, function () use ($cron) {
|
||||
$this->dispatch($cron);
|
||||
$this->addSchedule($cron);
|
||||
});
|
||||
}
|
||||
|
||||
public function dispatch(Cron $cron): void
|
||||
{
|
||||
// 检查是否重叠执行
|
||||
if ($cron->no_overlap && in_array($cron, $this->executing, true)) {
|
||||
return;
|
||||
}
|
||||
$this->executing[] = $cron;
|
||||
// 新建一个协程运行排程任务,避免阻塞
|
||||
Adaptive::getCoroutine()->create(function () use ($cron) {
|
||||
$callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method];
|
||||
container()->call($callable);
|
||||
$this->executing = array_diff($this->executing, [$cron]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user