support task worker

This commit is contained in:
Jerry 2023-01-28 11:25:58 +08:00
parent 46243d577c
commit 809ac4d6b3
2 changed files with 28 additions and 11 deletions

View File

@ -59,9 +59,9 @@ class WorkerEventListener
if (($name = Framework::getInstance()->getDriver()->getName()) === 'swoole') {
/* @phpstan-ignore-next-line */
$server = Framework::getInstance()->getDriver()->getSwooleServer();
ProcessStateManager::saveProcessState(ZM_PROCESS_WORKER, $server->worker_pid, ['worker_id' => $server->worker_id]);
ProcessStateManager::saveProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, $server->worker_pid, ['worker_id' => $server->worker_id]);
} elseif ($name === 'workerman') {
ProcessStateManager::saveProcessState(ZM_PROCESS_WORKER, getmypid(), ['worker_id' => ProcessManager::getProcessId()]);
ProcessStateManager::saveProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, getmypid(), ['worker_id' => ProcessManager::getProcessId()]);
}
// 打印进程ID
@ -73,7 +73,7 @@ class WorkerEventListener
}
if (Framework::getInstance()->getArgv()['print-process-pid']) {
$i = ProcessManager::getProcessId();
logger()->info('WORKER#' . $i . ":\t" . ProcessStateManager::getProcessState(ZM_PROCESS_WORKER, $i));
logger()->info('WORKER#' . $i . ":\t" . ProcessStateManager::getProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, $i));
}
// 如果使用的是 LightCache注册下自动保存的监听器
@ -118,7 +118,7 @@ class WorkerEventListener
public function onWorkerStart1(): void
{
logger()->debug('Worker #' . ProcessManager::getProcessId() . ' started');
logger()->debug('{is_task}Worker 进程 #{id} 已启动', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]);
}
/**
@ -130,9 +130,10 @@ class WorkerEventListener
if (is_a(config('global.kv.use', \LightCache::class), LightCache::class, true)) {
LightCache::saveAll();
}
logger()->debug('Worker #' . ProcessManager::getProcessId() . ' stopping');
logger()->debug('{is_task}Worker 进程 #{id} 正在停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]);
if (DIRECTORY_SEPARATOR !== '\\') {
ProcessStateManager::removeProcessState(ZM_PROCESS_WORKER, ProcessManager::getProcessId());
ProcessStateManager::removeProcessState(ProcessStateManager::isTaskWorker() ? ZM_PROCESS_TASKWORKER : ZM_PROCESS_WORKER, ProcessManager::getProcessId());
}
// 清空 MySQL 的连接池
foreach (DBPool::getAllPools() as $name => $pool) {
@ -142,7 +143,7 @@ class WorkerEventListener
public function onWorkerStop1(): void
{
logger()->debug('Worker #' . ProcessManager::getProcessId() . ' stopped');
logger()->debug('{is_task}Worker 进程 #{id} 已停止', ['is_task' => ProcessStateManager::isTaskWorker() ? 'Task' : '', 'id' => ProcessManager::getProcessId()]);
}
/**
@ -164,7 +165,7 @@ class WorkerEventListener
// 如果在排除表就排除,否则就解析注解
if (is_dir(SOURCE_ROOT_DIR . '/' . $v) && !in_array($v, $excludes)) {
// 添加解析路径对应Base命名空间也贴出来
$parser->addPsr4Path(SOURCE_ROOT_DIR . '/' . $v . '/', trim($k, '\\'));
$parser->addPsr4Path(SOURCE_ROOT_DIR . '/' . $v . '/', trim($k, '\\'), ['source-annotation']);
}
}

View File

@ -4,6 +4,7 @@ declare(strict_types=1);
namespace ZM\Process;
use OneBot\Driver\Process\ProcessManager;
use ZM\Exception\ZMKnownException;
use ZM\Store\FileSystem;
@ -12,10 +13,25 @@ class ProcessStateManager
public static array $process_mode = [];
/**
* 查看是否为多 Worker 模式,插件可能用得到
*/
public static function isMultiWorkers(): bool
{
return (self::$process_mode['worker'] ?? 1) > 1;
}
public static function isTaskWorker(): bool
{
return (ProcessManager::getProcessType() & ONEBOT_PROCESS_TASKWORKER) !== 0;
}
/**
* 删除进程运行状态
*
* @throws ZMKnownException
* @internal
*/
public static function removeProcessState(int $type, int|string $id_or_name = null)
public static function removeProcessState(int $type, int|string $id_or_name = null): void
{
switch ($type) {
case ZM_PROCESS_MASTER:
@ -67,7 +83,7 @@ class ProcessStateManager
* @throws ZMKnownException
* @internal
*/
public static function getProcessState(int $type, mixed $id_or_name = null)
public static function getProcessState(int $type, mixed $id_or_name = null): mixed
{
$file = ZM_STATE_DIR;
switch ($type) {
@ -116,7 +132,7 @@ class ProcessStateManager
*
* @internal
*/
public static function saveProcessState(int $type, int|string $pid, array $data = [])
public static function saveProcessState(int $type, int|string $pid, array $data = []): void
{
switch ($type) {
case ZM_PROCESS_MASTER: