From dfe4613eac89233e11ea14f8cf3f30f1ac53b766 Mon Sep 17 00:00:00 2001 From: sunxyw Date: Fri, 10 Feb 2023 18:00:29 +0800 Subject: [PATCH 1/2] fix schedule swoole deadlock --- src/ZM/Schedule/Schedule.php | 83 ++++++++++++++---------------------- 1 file changed, 33 insertions(+), 50 deletions(-) diff --git a/src/ZM/Schedule/Schedule.php b/src/ZM/Schedule/Schedule.php index 3ad006eb..586efd12 100644 --- a/src/ZM/Schedule/Schedule.php +++ b/src/ZM/Schedule/Schedule.php @@ -10,12 +10,7 @@ use ZM\Annotation\Framework\Cron; class Schedule { - /** - * 排程任务列表 - * - * @var Cron[] - */ - private array $schedules = []; + private int $next_run = 0; /** * 正在执行的排程任务列表 @@ -24,24 +19,15 @@ class Schedule */ private array $executing = []; + private bool $available; + public function __construct() { $c = Adaptive::getCoroutine(); - if (!$c instanceof CoroutineInterface) { + $this->available = $c instanceof CoroutineInterface; + if (!$this->available) { logger()->error('排程任务只能在协程环境下使用'); - return; } - - // 每秒检查一次,精度为一分钟,从最近的下一分钟开始 - $c->create(function () { - /* @phpstan-ignore-next-line 协程会睡觉的,不会阻塞 */ - while (true) { - $now = time(); - $this->run(); - $sleep_time = 60 - ($now % 60); - Adaptive::sleep($sleep_time); - } - }); } /** @@ -51,37 +37,34 @@ class Schedule */ public function addSchedule(Cron $cron): void { - $this->schedules[] = $cron; - } - - /** - * 获取到期的排程任务 - * - * @return Cron[] - */ - public function due(): array - { - return array_filter($this->schedules, fn (Cron $cron) => $cron->expression->isDue()); - } - - /** - * 运行排程任务 - */ - public function run(): void - { - // 同时运行到期的排程任务 - foreach ($this->due() as $cron) { - // 检查是否重叠执行 - if ($cron->no_overlap && in_array($cron, $this->executing, true)) { - continue; - } - $this->executing[] = $cron; - // 新建一个协程运行排程任务,避免阻塞 - Adaptive::getCoroutine()->create(function () use ($cron) { - $callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method]; - container()->call($callable); - $this->executing = array_diff($this->executing, [$cron]); - }); + if (!$this->available) { + return; } + $next_run = $cron->expression->getNextRunDate()->getTimestamp(); + // 防止在同一分钟内重复执行 + if ($next_run < $this->next_run) { + $next_run = $this->next_run; + } + $this->next_run = $cron->expression->getNextRunDate()->getTimestamp(); + $wait_ms = max(0, ($next_run - time()) * 1000); + Timer::after($wait_ms, function () use ($cron) { + $this->dispatch($cron); + $this->addSchedule($cron); + }); + } + + public function dispatch(Cron $cron): void + { + // 检查是否重叠执行 + if ($cron->no_overlap && in_array($cron, $this->executing, true)) { + return; + } + $this->executing[] = $cron; + // 新建一个协程运行排程任务,避免阻塞 + Adaptive::getCoroutine()->create(function () use ($cron) { + $callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method]; + container()->call($callable); + $this->executing = array_diff($this->executing, [$cron]); + }); } } From 52030e8db3ddb956bb5a283b67a69427e104ffc9 Mon Sep 17 00:00:00 2001 From: crazywhalecc Date: Fri, 10 Feb 2023 19:13:24 +0800 Subject: [PATCH 2/2] fix swoole timer stuck --- src/ZM/Event/Listener/WorkerEventListener.php | 16 +++++++++++++++- src/ZM/Framework.php | 2 ++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ZM/Event/Listener/WorkerEventListener.php b/src/ZM/Event/Listener/WorkerEventListener.php index 3213819d..7658bc32 100644 --- a/src/ZM/Event/Listener/WorkerEventListener.php +++ b/src/ZM/Event/Listener/WorkerEventListener.php @@ -109,7 +109,8 @@ class WorkerEventListener } // Windows 系统的 CtrlC 由于和 Select 有一定的冲突,如果没事件解析的话 CtrlC 会阻塞,所以必须添加一个空的计时器 if (PHP_OS_FAMILY === 'Windows') { - Framework::getInstance()->getDriver()->getEventLoop()->addTimer(1000, function () {}, 0); + Framework::getInstance()->getDriver()->getEventLoop()->addTimer(1000, function () { + }, 0); } // 回显 debug 日志:进程占用的内存 $memory_total = memory_get_usage() / 1024 / 1024; @@ -121,6 +122,14 @@ class WorkerEventListener logger()->debug('{is_task}Worker 进程 #{id} 已启动', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]); } + public function onWorkerExit(): void + { + // 清除计时器 + Framework::getInstance()->getDriver()->getEventLoop()->clearAllTimer(); + $worker_id = ProcessManager::getProcessId(); + logger()->notice('正在结束 Worker #' . $worker_id . ' 中的任务...'); + } + /** * @throws ZMKnownException * @throws \JsonException @@ -132,6 +141,11 @@ class WorkerEventListener } logger()->debug('{is_task}Worker 进程 #{id} 正在停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]); + if (Framework::getInstance()->getDriver()->getName() !== 'swoole') { + logger()->debug('清除计时器中'); + Framework::getInstance()->getDriver()->getEventLoop()->clearAllTimer(); + } + if (DIRECTORY_SEPARATOR !== '\\') { ProcessStateManager::removeProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, ProcessManager::getProcessId()); } diff --git a/src/ZM/Framework.php b/src/ZM/Framework.php index 84de090d..325961ee 100644 --- a/src/ZM/Framework.php +++ b/src/ZM/Framework.php @@ -9,6 +9,7 @@ use OneBot\Driver\Event\DriverInitEvent; use OneBot\Driver\Event\Http\HttpRequestEvent; use OneBot\Driver\Event\Process\ManagerStartEvent; use OneBot\Driver\Event\Process\ManagerStopEvent; +use OneBot\Driver\Event\Process\WorkerExitEvent; use OneBot\Driver\Event\Process\WorkerStartEvent; use OneBot\Driver\Event\Process\WorkerStopEvent; use OneBot\Driver\Event\WebSocket\WebSocketCloseEvent; @@ -235,6 +236,7 @@ class Framework ob_event_provider()->addEventListener(WorkerStartEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStart1'], 1); ob_event_provider()->addEventListener(WorkerStopEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStop999'], 999); ob_event_provider()->addEventListener(WorkerStopEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerStop1'], 1); + ob_event_provider()->addEventListener(WorkerExitEvent::getName(), [WorkerEventListener::getInstance(), 'onWorkerExit'], 999); // Http 事件 ob_event_provider()->addEventListener(HttpRequestEvent::getName(), [HttpEventListener::getInstance(), 'onRequest999'], 999); ob_event_provider()->addEventListener(HttpRequestEvent::getName(), [HttpEventListener::getInstance(), 'onRequest1'], 1);