From 809ac4d6b32517e9d199f0d434ef688f0b9ec789 Mon Sep 17 00:00:00 2001 From: Jerry Date: Sat, 28 Jan 2023 11:25:58 +0800 Subject: [PATCH] support task worker --- src/ZM/Event/Listener/WorkerEventListener.php | 17 +++++++------- src/ZM/Process/ProcessStateManager.php | 22 ++++++++++++++++--- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/ZM/Event/Listener/WorkerEventListener.php b/src/ZM/Event/Listener/WorkerEventListener.php index e2c9dbc1..1ad886e1 100644 --- a/src/ZM/Event/Listener/WorkerEventListener.php +++ b/src/ZM/Event/Listener/WorkerEventListener.php @@ -59,9 +59,9 @@ class WorkerEventListener if (($name = Framework::getInstance()->getDriver()->getName()) === 'swoole') { /* @phpstan-ignore-next-line */ $server = Framework::getInstance()->getDriver()->getSwooleServer(); - ProcessStateManager::saveProcessState(ZM_PROCESS_WORKER, $server->worker_pid, ['worker_id' => $server->worker_id]); + ProcessStateManager::saveProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, $server->worker_pid, ['worker_id' => $server->worker_id]); } elseif ($name === 'workerman') { - ProcessStateManager::saveProcessState(ZM_PROCESS_WORKER, getmypid(), ['worker_id' => ProcessManager::getProcessId()]); + ProcessStateManager::saveProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, getmypid(), ['worker_id' => ProcessManager::getProcessId()]); } // 打印进程ID @@ -73,7 +73,7 @@ class WorkerEventListener } if (Framework::getInstance()->getArgv()['print-process-pid']) { $i = ProcessManager::getProcessId(); - logger()->info('WORKER#' . $i . ":\t" . ProcessStateManager::getProcessState(ZM_PROCESS_WORKER, $i)); + logger()->info('WORKER#' . $i . ":\t" . ProcessStateManager::getProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, $i)); } // 如果使用的是 LightCache,注册下自动保存的监听器 @@ -118,7 +118,7 @@ class WorkerEventListener public function onWorkerStart1(): void { - logger()->debug('Worker #' . ProcessManager::getProcessId() . ' started'); + logger()->debug('{is_task}Worker 进程 #{id} 已启动', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]); } /** @@ -130,9 +130,10 @@ class WorkerEventListener if (is_a(config('global.kv.use', \LightCache::class), LightCache::class, true)) { LightCache::saveAll(); } - logger()->debug('Worker #' . ProcessManager::getProcessId() . ' stopping'); + logger()->debug('{is_task}Worker 进程 #{id} 正在停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]); + if (DIRECTORY_SEPARATOR !== '\\') { - ProcessStateManager::removeProcessState(ZM_PROCESS_WORKER, ProcessManager::getProcessId()); + ProcessStateManager::removeProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, ProcessManager::getProcessId()); } // 清空 MySQL 的连接池 foreach (DBPool::getAllPools() as $name => $pool) { @@ -142,7 +143,7 @@ class WorkerEventListener public function onWorkerStop1(): void { - logger()->debug('Worker #' . ProcessManager::getProcessId() . ' stopped'); + logger()->debug('{is_task}Worker 进程 #{id} 已停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]); } /** @@ -164,7 +165,7 @@ class WorkerEventListener // 如果在排除表就排除,否则就解析注解 if (is_dir(SOURCE_ROOT_DIR . '/' . $v) && !in_array($v, $excludes)) { // 添加解析路径,对应Base命名空间也贴出来 - $parser->addPsr4Path(SOURCE_ROOT_DIR . '/' . $v . '/', trim($k, '\\')); + $parser->addPsr4Path(SOURCE_ROOT_DIR . '/' . $v . '/', trim($k, '\\'), ['source-annotation']); } } diff --git a/src/ZM/Process/ProcessStateManager.php b/src/ZM/Process/ProcessStateManager.php index 69786e5c..91b5e09e 100644 --- a/src/ZM/Process/ProcessStateManager.php +++ b/src/ZM/Process/ProcessStateManager.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace ZM\Process; +use OneBot\Driver\Process\ProcessManager; use ZM\Exception\ZMKnownException; use ZM\Store\FileSystem; @@ -12,10 +13,25 @@ class ProcessStateManager public static array $process_mode = []; /** + * 查看是否为多 Worker 模式,插件可能用得到 + */ + public static function isMultiWorkers(): bool + { + return (self::$process_mode['worker'] ?? 1) > 1; + } + + public static function isTaskWorker(): bool + { + return (ProcessManager::getProcessType() & ONEBOT_PROCESS_TASKWORKER) !== 0; + } + + /** + * 删除进程运行状态 + * * @throws ZMKnownException * @internal */ - public static function removeProcessState(int $type, int|string $id_or_name = null) + public static function removeProcessState(int $type, int|string $id_or_name = null): void { switch ($type) { case ZM_PROCESS_MASTER: @@ -67,7 +83,7 @@ class ProcessStateManager * @throws ZMKnownException * @internal */ - public static function getProcessState(int $type, mixed $id_or_name = null) + public static function getProcessState(int $type, mixed $id_or_name = null): mixed { $file = ZM_STATE_DIR; switch ($type) { @@ -116,7 +132,7 @@ class ProcessStateManager * * @internal */ - public static function saveProcessState(int $type, int|string $pid, array $data = []) + public static function saveProcessState(int $type, int|string $pid, array $data = []): void { switch ($type) { case ZM_PROCESS_MASTER: