update to 2.4.3 version (build 403)

add config: swoole.max_wait_time (default 5)
add constant MAIN_WORKER
add getExpireTS() for LightCache
fix savePersistence() bug
add zm_go() to prevent errors
This commit is contained in:
jerry 2021-03-29 15:34:24 +08:00
parent 202c8aee77
commit 6b872c6f74
32 changed files with 1280 additions and 882 deletions

View File

@ -35,8 +35,9 @@
"zhamao/connection-manager": "^1.0" "zhamao/connection-manager": "^1.0"
}, },
"suggest": { "suggest": {
"ext-ctype": "*", "ext-ctype": "Use C/C++ extension instead of polyfill will be more efficient",
"ext-mbstring": "*" "ext-mbstring": "Use C/C++ extension instead of polyfill will be more efficient",
"league/climate": "Display columns and status in terminal"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {

View File

@ -29,6 +29,7 @@ $config['swoole'] = [
//'worker_num' => swoole_cpu_num(), //如果你只有一个 OneBot 实例连接到框架并且代码没有复杂的CPU密集计算则可把这里改为1使用全局变量 //'worker_num' => swoole_cpu_num(), //如果你只有一个 OneBot 实例连接到框架并且代码没有复杂的CPU密集计算则可把这里改为1使用全局变量
'dispatch_mode' => 2, //包分配原则,见 https://wiki.swoole.com/#/server/setting?id=dispatch_mode 'dispatch_mode' => 2, //包分配原则,见 https://wiki.swoole.com/#/server/setting?id=dispatch_mode
'max_coroutine' => 300000, 'max_coroutine' => 300000,
'max_wait_time' => 5
//'task_worker_num' => 4, //'task_worker_num' => 4,
//'task_enable_coroutine' => true //'task_enable_coroutine' => true
]; ];

View File

@ -21,3 +21,34 @@ ps aux | grep vendor/bin/start | grep -v grep | awk '{print $2}' | xargs kill -9
kill -INT 23643 kill -INT 23643
# 如果使用 ps aux 看不到框架相关进程,证明关闭成功,否则需要使用第一条强行杀死 # 如果使用 ps aux 看不到框架相关进程,证明关闭成功,否则需要使用第一条强行杀死
``` ```
## 出现 deadlock 字样
一般情况下,如果误操作框架可能会报如下图的错误:
```
===================================================================
[FATAL ERROR]: all coroutines (count: 1) are asleep - deadlock!
===================================================================
[Coroutine-1]
--------------------------------------------------------------------
#0 Swoole\Coroutine\System::sleep() called at [/Users/jerry/project/git-project/zhamao-framework/src/ZM/global_functions.php:232]
#1 zm_sleep() called at [/Users/jerry/project/git-project/zhamao-framework/src/Module/Example/Hello.php:38]
#2 Module\Example\Hello->onStart() called at [/Users/jerry/project/git-project/zhamao-framework/src/ZM/Event/EventDispatcher.php:205]
#3 ZM\Event\EventDispatcher->dispatchEvent() called at [/Users/jerry/project/git-project/zhamao-framework/src/ZM/Event/EventDispatcher.php:89]
#4 ZM\Event\EventDispatcher->dispatchEvents() called at [/Users/jerry/project/git-project/zhamao-framework/src/ZM/Event/SwooleEvent/OnWorkerStart.php:130]
#5 ZM\Event\SwooleEvent\OnWorkerStart->onCall() called at [/Users/jerry/project/git-project/zhamao-framework/src/ZM/Framework.php:336]
```
这种错误的出现原因一般是因为协程未结束而 Worker 进程提前退出导致的,这个错误也可手动造成(在任意 Worker 进程内的位置使用 `zm_yield()` 且不使用 `zm_resume()` 恢复,期间使用 reload 或 stop 重启或停止框架就会报错)。
还有一种情况是数据库、文件读取或下载上传还没有传送结束,时间已经超时,在关闭或重启框架时不得不强行切断协程的运行。这种情况建议根据下方的打印输出栈进行插错,建议将协程运行时间长的过程缩短或调长 `swoole` 配置项下面的 `max_wait_time` 时间2.4.3 版本起此参数默认为 5 秒。
## 使用 LightCache 关闭时无法正常保存持久化
LightCache 因为是跨内存使用的,所以每次重启和关闭框架时,都只会让其中一个进程去保存。因为在 2.4.2 版本开始,持久化的逻辑发生了更改,不再支持 `expire = -2` 进行设置持久化(因为那样会很容易让开发者写错),仅支持使用 `LightCache::addPersistence($key)` 这样的方式进行设置持久化,所以在 2.4.2 版本以后,请使用此方法进行持久化设置,保证数据不丢失。
此外2.4.2 版本起,不再支持用户手动调用 `savePersistence()` 方法,普通用户不可手动调用此方法,否则会导致数据出错。
##

View File

@ -57,7 +57,9 @@ $config['light_cache'] = [
`$value` 可存入 `bool``string``int``array` 等可被 `json_encode()` 的变量,闭包函数和对象不可存入。 `$value` 可存入 `bool``string``int``array` 等可被 `json_encode()` 的变量,闭包函数和对象不可存入。
`$expire``int`,超时时间(秒)。如果设定了大于 0 的值,则表明是在 `$expire` 秒后自动删除。如果为 -1 则什么都不做,如果框架使用了 `stop` 或 Ctrl+C 或意外退出时数据会丢失。如果为 -2则会将此数据持久化保存保存在上方配置文件指定的 json 文件中,待关闭后再次启动框架会自动加载回来,不会丢失。 `$expire``int`,超时时间(秒)。如果设定了大于 0 的值,则表明是在 `$expire` 秒后自动删除(框架中途停止不受影响)。如果为 -1 则什么都不做。框架停止后自动被清除。
**注意:如果前面使用了 set() ,后面再次使用 set() 会重置 expire 过期时间为 -1-1 是框架运行时不过期,关闭框架删除的状态),如果只需要更新值,请使用 update()。**
```php ```php
// use ZM\Store\LightCache; // use ZM\Store\LightCache;
@ -88,6 +90,14 @@ public function storeAfterRemove() {
( 内容不存在! ( 内容不存在!
</chat-box> </chat-box>
### LightCache::update()
更新值而不更新状态。如果键值对不存在,则返回 false。
定义:`LightCache::update(string $key, $value)`
参数同 `set()`,可参考。
### LightCache::get() ### LightCache::get()
获取内容。 获取内容。
@ -106,6 +116,20 @@ zm_sleep(10);
dump(LightCache::getExpire("test")); // 返回 10 dump(LightCache::getExpire("test")); // 返回 10
``` ```
### LightCache::getExpireTS()
获取存储项要过期的时间戳。
定义:`LightCache::getExpireTS(string $key)`
```php
$s = LightCache::set("test", "hello", 20); //假设这条代码执行时时间戳是 1616838482
zm_sleep(10);
dump(LightCache::getExpire("test")); // 返回 1616838502
zm_sleep(10);
dump(LightCache::getExpire("test")); // 返回 null
```
### LightCache::getMemoryUsage() ### LightCache::getMemoryUsage()
获取轻量缓存使用的总空间大小(字节) 获取轻量缓存使用的总空间大小(字节)
@ -157,25 +181,34 @@ dump(LightCache::getAll());
*/ */
``` ```
### LightCache::savePersistence() ### LightCache::addPersistence()
立刻保存所有被标记为持久化的缓存项到磁盘 添加持久化存储的键
!!! note "提示" 用法:`LightCache::addPersistence($key)`
在一般情况下框架定时执行此方法来保存在停止框架、reload 框架和 Ctrl+C 停止框架的时候,均会执行保存。 注:只需调用一次即可,无需多次重复调用,也不需要设置 expire 为 -2 了。2.4.2 起可用此方法)。
详见下方 **持久化**
### LightCache::removePersistence()
删除持久化的键。
用法:`LightCache::removePersistence($key)`
注:只需调用一次即可,无需多次重复调用,也不需要设置 expire 为非 -2 了。2.4.2 起可用此方法)。
### 持久化 ### 持久化
`set()` 的 expire 设置为 -2 即可。 使用 `LightCache::addPersistence($key)` 添加对应需要持久化的键名即可。
```php ```php
/** /**
* @CQCommand("store") * @OnStart()
*/ */
public function store() { public function onStart() {
LightCache::set("msg_time", time(), -2); LightCache::addPersistence("msg_time");
return "OK!";
} }
/** /**
* @CQCommand("getStore") * @CQCommand("getStore")
@ -187,11 +220,11 @@ public function getStore() {
<chat-box> <chat-box>
^ 我在 2021-01-05 15:21:00 发送这条消息 ^ 我在 2021-01-05 15:21:00 发送这条消息
) store ) getStore
( OK! ( 2021-01-05 15:20:00
^ 这时我用 Ctrl+C 停止框架,过一会儿再启动 ^ 这时我用 Ctrl+C 停止框架,过一会儿再启动
) getStore ) getStore
( 存储时间2021-01-05 15:21:00 ( 存储时间2021-01-05 15:20:00
</chat-box> </chat-box>
### 数据加锁 ### 数据加锁

View File

@ -43,6 +43,7 @@
| `worker_num` | Worker 工作进程数 | 运行框架的主机 CPU 核心数 | | `worker_num` | Worker 工作进程数 | 运行框架的主机 CPU 核心数 |
| `dispatch_mode` | 数据包分发策略,见 [文档](https://wiki.swoole.com/#/server/setting?id=dispatch_mode) | 2 | | `dispatch_mode` | 数据包分发策略,见 [文档](https://wiki.swoole.com/#/server/setting?id=dispatch_mode) | 2 |
| `max_coroutine` | 最大协程并发数 | 300000 | | `max_coroutine` | 最大协程并发数 | 300000 |
| `max_wait_time` | 退出进程时等待协程恢复的最长时间(秒) | 52.4.3 版本后默认值) |
| `task_worker_num` | TaskWorker 工作进程数 | 默认不开启(此参数被注释) | | `task_worker_num` | TaskWorker 工作进程数 | 默认不开启(此参数被注释) |
| `task_enable_coroutine` | TaskWorker 工作进程启用协程 | 默认不开启(此参数被注释)或 `bool` | | `task_enable_coroutine` | TaskWorker 工作进程启用协程 | 默认不开启(此参数被注释)或 `bool` |

View File

@ -1,8 +1,14 @@
# 更新日志v2 版本) # 更新日志v2 版本)
# v2.4.2 (build 402) ## v2.4.3 (build 403)
> 更新时间202.3.27 > 更新时间2021.3.29
-
## v2.4.2 (build 402)
> 更新时间2021.3.27
- 更改:`WORKING_DIR` 常量的含义 - 更改:`WORKING_DIR` 常量的含义
- 修复:未指定 `--remote-terminal` 参数时还依旧开启远程终端的 bug - 修复:未指定 `--remote-terminal` 参数时还依旧开启远程终端的 bug

View File

@ -5,6 +5,7 @@ namespace ZM\Annotation\Command;
use Doctrine\Common\Annotations\Annotation\Required; use Doctrine\Common\Annotations\Annotation\Required;
use Doctrine\Common\Annotations\Annotation\Target; use Doctrine\Common\Annotations\Annotation\Target;
use ZM\Annotation\AnnotationBase;
/** /**
* Class TerminalCommand * Class TerminalCommand
@ -12,7 +13,7 @@ use Doctrine\Common\Annotations\Annotation\Target;
* @Annotation * @Annotation
* @Target("METHOD") * @Target("METHOD")
*/ */
class TerminalCommand class TerminalCommand extends AnnotationBase
{ {
/** /**
* @var string * @var string
@ -20,6 +21,8 @@ class TerminalCommand
*/ */
public $command; public $command;
public $alias = '';
/** /**
* @var string * @var string
*/ */

View File

@ -12,7 +12,7 @@ use ZM\Annotation\AnnotationBase;
* Class SwooleHandler * Class SwooleHandler
* @package ZM\Annotation\Swoole * @package ZM\Annotation\Swoole
* @Annotation * @Annotation
* @Target("METHOD") * @Target("ALL")
*/ */
class SwooleHandler extends AnnotationBase class SwooleHandler extends AnnotationBase
{ {

View File

@ -19,8 +19,8 @@ use ZM\Command\SystemdCommand;
class ConsoleApplication extends Application class ConsoleApplication extends Application
{ {
const VERSION_ID = 402; const VERSION_ID = 403;
const VERSION = "2.4.2"; const VERSION = "2.4.3";
public function __construct(string $name = 'UNKNOWN') { public function __construct(string $name = 'UNKNOWN') {
define("ZM_VERSION_ID", self::VERSION_ID); define("ZM_VERSION_ID", self::VERSION_ID);
@ -72,13 +72,6 @@ class ConsoleApplication extends Application
if (!empty($with_default_cmd)) { if (!empty($with_default_cmd)) {
$this->setDefaultCommand($with_default_cmd); $this->setDefaultCommand($with_default_cmd);
} }
/*
$command_register = ZMConfig::get("global", "command_register_class") ?? [];
foreach ($command_register as $v) {
$obj = new $v();
if (!($obj instanceof Command)) throw new TypeError("Command register class must be extended by Symfony\\Component\\Console\\Command\\Command");
$this->add($obj);
}*/
return $this; return $this;
} }

View File

@ -1,701 +0,0 @@
<?php /** @noinspection PhpUnreachableStatementInspection */
/** @noinspection PhpComposerExtensionStubsInspection */
namespace ZM\Event;
use Closure;
use Co;
use Error;
use Exception;
use PDO;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Database\PDOConfig;
use Swoole\Database\PDOPool;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Throwable;
use ZM\Annotation\AnnotationParser;
use ZM\Annotation\Http\RequestMapping;
use ZM\Annotation\Swoole\OnCloseEvent;
use ZM\Annotation\Swoole\OnMessageEvent;
use ZM\Annotation\Swoole\OnOpenEvent;
use ZM\Annotation\Swoole\OnRequestEvent;
use ZM\Annotation\Swoole\OnStart;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\OnTask;
use ZM\Annotation\Swoole\OnTaskEvent;
use ZM\Config\ZMConfig;
use ZM\ConnectionManager\ManagerGM;
use ZM\Console\Console;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\WebSocket\Frame;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\TermColor;
use ZM\Context\Context;
use ZM\Context\ContextInterface;
use ZM\DB\DB;
use ZM\Exception\DbException;
use ZM\Exception\InterruptException;
use ZM\Framework;
use ZM\Http\Response;
use ZM\Module\QQBot;
use ZM\Store\LightCache;
use ZM\Store\LightCacheInside;
use ZM\Store\MySQL\SqlPoolStorage;
use ZM\Store\Redis\ZMRedisPool;
use ZM\Utils\DataProvider;
use ZM\Utils\HttpUtil;
use ZM\Utils\ProcessManager;
use ZM\Utils\ZMUtil;
class ServerEventHandler
{
/**
* @SwooleHandler("start")
*/
public function onStart() {
//global $terminal_id;
$r = null;
/*if ($terminal_id !== null) {
ZMBuf::$terminal = $r = STDIN;
Event::add($r, function () use ($r) {
$fget = fgets($r);
if ($fget === false) {
Event::del($r);
return;
}
$var = trim($fget);
try {
Terminal::executeCommand($var, $r);
} catch (Exception $e) {
Console::error("Uncaught exception " . get_class($e) . ": " . $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")");
} catch (Error $e) {
Console::error("Uncaught error " . get_class($e) . ": " . $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")");
}
});
}*/
Process::signal(SIGINT, function () use ($r) {
if (zm_atomic("_int_is_reload")->get() === 1) {
zm_atomic("_int_is_reload")->set(0);
\server()->reload();
} else {
echo "\r";
Console::warning("Server interrupted(SIGINT) on Master.");
if ((Framework::$server->inotify ?? null) !== null)
/** @noinspection PhpUndefinedFieldInspection */ Event::del(Framework::$server->inotify);
ZMUtil::stop();
}
});
if (Framework::$argv["daemon"]) {
$daemon_data = json_encode([
"pid" => \server()->master_pid,
"stdout" => ZMConfig::get("global")["swoole"]["log_file"]
], 128 | 256);
file_put_contents(DataProvider::getWorkingDir() . "/.daemon_pid", $daemon_data);
}
if (Framework::$argv["watch"]) {
if (extension_loaded('inotify')) {
Console::warning("Enabled File watcher, do not use in production.");
/** @noinspection PhpUndefinedFieldInspection */
Framework::$server->inotify = $fd = inotify_init();
$this->addWatcher(DataProvider::getWorkingDir() . "/src", $fd);
Event::add($fd, function () use ($fd) {
$r = inotify_read($fd);
dump($r);
ZMUtil::reload();
});
} else {
Console::warning("You have not loaded \"inotify\" extension, please install first.");
}
}
}
/**
* @SwooleHandler("shutdown")
*/
public function onShutdown() {
Console::debug("正在关闭 Master 进程pid=" . posix_getpid());
}
/**
* @SwooleHandler("WorkerStop")
* @param $server
* @param $worker_id
* @throws Exception
*/
public function onWorkerStop(Server $server, $worker_id) {
if ($worker_id == (ZMConfig::get("worker_cache")["worker"] ?? 0)) {
LightCache::savePersistence();
}
Console::verbose(($server->taskworker ? "Task" : "") . "Worker #$worker_id 已停止");
}
/**
* @SwooleHandler("WorkerStart")
* @param Server $server
* @param $worker_id
* @throws Exception
*/
public function onWorkerStart(Server $server, $worker_id) {
//if (ZMBuf::atomic("stop_signal")->get() != 0) return;
Process::signal(SIGINT, function () use ($worker_id, $server) {
Timer::clearAll();
ProcessManager::resumeAllWorkerCoroutines();
Console::debug("正在关闭 " . ($server->taskworker ? "Task" : "") . "Worker 进程 " . Console::setColor("#" . \server()->worker_id, "gold") . TermColor::frontColor256(59) . ", pid=" . posix_getpid());
server()->stop($worker_id);
});
unset(Context::$context[Coroutine::getCid()]);
if ($server->taskworker === false) {
Process::signal(SIGUSR1, function() use ($worker_id){
Timer::clearAll();
ProcessManager::resumeAllWorkerCoroutines();
});
zm_atomic("_#worker_".$worker_id)->set($server->worker_pid);
try {
register_shutdown_function(function () use ($server) {
$error = error_get_last();
if ($error["type"] != 0) {
Console::error("Internal fatal error: " . $error["message"] . " at " . $error["file"] . "({$error["line"]})");
}
//DataProvider::saveBuffer();
/** @var Server $server */
if (server() === null) $server->shutdown();
else server()->shutdown();
});
Console::verbose("Worker #{$server->worker_id} 启动中");
Framework::$server = $server;
//ZMBuf::resetCache(); //清空变量缓存
//ZMBuf::set("wait_start", []); //添加队列在workerStart运行完成前先让其他协程等待执行
foreach ($server->connections as $v) {
$server->close($v);
}
// 这里执行的是只需要执行一遍的代码,比如终端监听器和键盘监听器
/*if ($server->worker_id === 0) {
global $terminal_id;
if ($terminal_id !== null)
go(function () {
while (true) {
$r = server()->process->exportSocket();
$result = $r->recv();
try {
if (!Terminal::executeCommand($result)) {
//if ($result == "stop" || $result == "reload" || $result == "r") {
//echo "Stopped.\n";
break;
}
} catch (Exception $e) {
Console::error($e->getMessage());
} catch (Error $e) {
Console::error($e->getMessage());
}
}
});
}*/
//TODO: 单独抽出来MySQL和Redis连接池
if (ZMConfig::get("global", "sql_config")["sql_host"] != "") {
if (SqlPoolStorage::$sql_pool !== null) {
SqlPoolStorage::$sql_pool->close();
SqlPoolStorage::$sql_pool = null;
}
Console::info("新建SQL连接池中");
ob_start();
phpinfo();
$str = ob_get_clean();
$str = explode("\n", $str);
foreach ($str as $k => $v) {
$v = trim($v);
if ($v == "") continue;
if (mb_strpos($v, "API Extensions") === false) continue;
if (mb_strpos($v, "pdo_mysql") === false) {
throw new DbException("未安装 mysqlnd php-mysql扩展。");
}
}
$sql = ZMConfig::get("global", "sql_config");
SqlPoolStorage::$sql_pool = new PDOPool((new PDOConfig())
->withHost($sql["sql_host"])
->withPort($sql["sql_port"])
// ->withUnixSocket('/tmp/mysql.sock')
->withDbName($sql["sql_database"])
->withCharset('utf8mb4')
->withUsername($sql["sql_username"])
->withPassword($sql["sql_password"])
->withOptions($sql["sql_options"] ?? [PDO::ATTR_STRINGIFY_FETCHES => false])
);
DB::initTableList();
}
// 开箱即用的Redis
$redis = ZMConfig::get("global", "redis_config");
if ($redis !== null && $redis["host"] != "") {
if (!extension_loaded("redis")) Console::error("Can not find redis extension.\n");
else ZMRedisPool::init($redis);
}
$this->loadAnnotations(); //加载composer资源、phar外置包、注解解析注册等
//echo json_encode(debug_backtrace(), 128|256);
EventManager::registerTimerTick(); //启动计时器
//ZMBuf::unsetCache("wait_start");
set_coroutine_params(["server" => $server, "worker_id" => $worker_id]);
$dispatcher = new EventDispatcher(OnStart::class);
$dispatcher->setRuleFunction(function ($v) {
return server()->worker_id === $v->worker_id || $v->worker_id === -1;
});
$dispatcher->dispatchEvents($server, $worker_id);
if ($dispatcher->status === EventDispatcher::STATUS_NORMAL) Console::debug("@OnStart 执行完毕");
else Console::warning("@OnStart 执行异常!");
Console::success("Worker #" . $worker_id . " 已启动");
} catch (Exception $e) {
Console::error("Worker加载出错停止服务");
Console::error($e->getMessage() . "\n" . $e->getTraceAsString());
ZMUtil::stop();
return;
} catch (Error $e) {
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
posix_kill($server->master_pid, SIGINT);
}
} else {
// 这里是TaskWorker初始化的内容部分
try {
Framework::$server = $server;
$this->loadAnnotations();
Console::success("TaskWorker #" . $server->worker_id . " 已启动");
} catch (Exception $e) {
Console::error("Worker加载出错停止服务");
Console::error($e->getMessage() . "\n" . $e->getTraceAsString());
ZMUtil::stop();
return;
} catch (Error $e) {
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
posix_kill($server->master_pid, SIGINT);
}
}
}
/**
* @SwooleHandler("message")
* @param $server
* @param Frame $frame
*/
public function onMessage($server, Frame $frame) {
Console::debug("Calling Swoole \"message\" from fd=" . $frame->fd . ": " . TermColor::ITALIC . $frame->data . TermColor::RESET);
unset(Context::$context[Coroutine::getCid()]);
$conn = ManagerGM::get($frame->fd);
set_coroutine_params(["server" => $server, "frame" => $frame, "connection" => $conn]);
$dispatcher1 = new EventDispatcher(OnMessageEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
/** @noinspection PhpUnreachableStatementInspection */
return ctx()->getConnection()->getName() == $v->connect_type && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'message';
} else {
/** @noinspection PhpUnreachableStatementInspection
* @noinspection RedundantSuppression
*/
if (strtolower($v->type) == 'message' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
//$starttime = microtime(true);
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
//Console::success("Used ".round((microtime(true) - $starttime) * 1000, 3)." ms!");
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"message\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"message\": " . $error_msg);
Console::trace();
}
}
/**
* @SwooleHandler("request")
* @param $request
* @param $response
*/
public function onRequest(?Request $request, ?\Swoole\Http\Response $response) {
$response = new Response($response);
foreach (ZMConfig::get("global")["http_header"] as $k => $v) {
$response->setHeader($k, $v);
}
unset(Context::$context[Co::getCid()]);
Console::debug("Calling Swoole \"request\" event from fd=" . $request->fd);
set_coroutine_params(["request" => $request, "response" => $response]);
$dis1 = new EventDispatcher(OnRequestEvent::class);
$dis1->setRuleFunction(function ($v) {
return eval("return " . $v->getRule() . ";") ? true : false;
});
$dis = new EventDispatcher(OnSwooleEvent::class);
$dis->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'request';
} else {
/** @noinspection PhpUnreachableStatementInspection */
if (strtolower($v->type) == 'request' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$dis1->dispatchEvents($request, $response);
$dis->dispatchEvents($request, $response);
if ($dis->status === EventDispatcher::STATUS_NORMAL && $dis1->status === EventDispatcher::STATUS_NORMAL) {
$result = HttpUtil::parseUri($request, $response, $request->server["request_uri"], $node, $params);
if ($result === true) {
ctx()->setCache("params", $params);
$dispatcher = new EventDispatcher(RequestMapping::class);
$div = new RequestMapping();
$div->route = $node["route"];
$div->params = $params;
$div->method = $node["method"];
$div->request_method = $node["request_method"];
$div->class = $node["class"];
//Console::success("正在执行路由:".$node["method"]);
$dispatcher->dispatchEvent($div, null, $params, $request, $response);
if (is_string($dispatcher->store) && !$response->isEnd()) $response->end($dispatcher->store);
}
}
if (!$response->isEnd()) {
//Console::warning('返回了404');
HttpUtil::responseCodePage($response, 404);
}
} catch (InterruptException $e) {
// do nothing
} catch (Exception $e) {
$response->status(500);
Console::info($request->server["remote_addr"] . ":" . $request->server["remote_port"] .
" [" . $response->getStatusCode() . "] " . $request->server["request_uri"]
);
if (!$response->isEnd()) {
if (ZMConfig::get("global", "debug_mode"))
$response->end("Internal server exception: " . $e->getMessage());
else
$response->end("Internal server error.");
}
Console::error("Internal server exception (500), caused by " . get_class($e) . ": " . $e->getMessage());
Console::log($e->getTraceAsString(), "gray");
} catch (Error $e) {
$response->status(500);
Console::info($request->server["remote_addr"] . ":" . $request->server["remote_port"] .
" [" . $response->getStatusCode() . "] " . $request->server["request_uri"]
);
if (!$response->isEnd()) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
if (ZMConfig::get("global", "debug_mode"))
$response->end("Internal server error: " . $error_msg);
else
$response->end("Internal server error.");
}
Console::error("Internal server error (500), caused by " . get_class($e) . ": " . $e->getMessage());
Console::log($e->getTraceAsString(), "gray");
}
}
/**
* @SwooleHandler("open")
* @param $server
* @param Request $request
*/
public function onOpen($server, Request $request) {
Console::debug("Calling Swoole \"open\" event from fd=" . $request->fd);
unset(Context::$context[Co::getCid()]);
$type = strtolower($request->header["x-client-role"] ?? $request->get["type"] ?? "");
$access_token = explode(" ", $request->header["authorization"] ?? "")[1] ?? $request->get["token"] ?? "";
$token = ZMConfig::get("global", "access_token");
if ($token instanceof Closure) {
if (!$token($access_token)) {
$server->close($request->fd);
Console::warning("Unauthorized access_token: " . $access_token);
return;
}
} elseif (is_string($token)) {
if ($access_token !== $token && $token !== "") {
$server->close($request->fd);
Console::warning("Unauthorized access_token: " . $access_token);
return;
}
}
$type_conn = ManagerGM::getTypeClassName($type);
ManagerGM::pushConnect($request->fd, $type_conn);
$conn = ManagerGM::get($request->fd);
set_coroutine_params(["server" => $server, "request" => $request, "connection" => $conn, "fd" => $request->fd]);
$conn->setOption("connect_id", strval($request->header["x-self-id"] ?? ""));
$dispatcher1 = new EventDispatcher(OnOpenEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
return ctx()->getConnection()->getName() == $v->connect_type && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'open';
} else {
/** @noinspection PhpUnreachableStatementInspection */
if (strtolower($v->type) == 'open' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
$onebot_status = $obb_onebot["status"];
if ($conn->getName() === 'qq' && $onebot_status === true) {
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", $request->fd);
}
}
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"open\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"open\": " . $error_msg);
Console::trace();
}
}
/**
* @SwooleHandler("close")
* @param $server
* @param $fd
*/
public function onClose($server, $fd) {
unset(Context::$context[Co::getCid()]);
$conn = ManagerGM::get($fd);
if ($conn === null) return;
Console::debug("Calling Swoole \"close\" event from fd=" . $fd);
set_coroutine_params(["server" => $server, "connection" => $conn, "fd" => $fd]);
$dispatcher1 = new EventDispatcher(OnCloseEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
return $v->connect_type == ctx()->getConnection()->getName() && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'close';
} else {
/** @noinspection PhpUnreachableStatementInspection */
if (strtolower($v->type) == 'close' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
if ($conn->getName() === 'qq' && $obb_onebot["status"] === true) {
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", -1);
}
}
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"close\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"close\": " . $error_msg);
Console::trace();
}
ManagerGM::popConnect($fd);
}
/**
* @SwooleHandler("pipeMessage")
* @param Server $server
* @param $src_worker_id
* @param $data
* @throws Exception
* @noinspection PhpUnusedParameterInspection
*/
public function onPipeMessage(Server $server, $src_worker_id, $data) {
//var_dump($data, $server->worker_id);
//unset(Context::$context[Co::getCid()]);
$data = json_decode($data, true);
ProcessManager::workerAction($src_worker_id, $data);
}
/**
* @SwooleHandler("beforeReload")
*/
public function onBeforeReload() {
for($i = 0; $i < ZM_WORKER_NUM; ++$i) {
$pid = zm_atomic("_#worker_".$i)->get();
Process::kill($pid, SIGUSR1);
}
Console::info(Console::setColor("Reloading server...", "gold"));
usleep(800 * 1000);
LightCacheInside::unset("wait_api", "wait_api");
}
/**
* @SwooleHandler("task")
* @param Server|null $server
* @param Server\Task $task
* @noinspection PhpUnusedParameterInspection
*/
public function onTask(?Server $server, Server\Task $task) {
if (isset($task->data["task"])) {
$dispatcher = new EventDispatcher(OnTask::class);
$dispatcher->setRuleFunction(function ($v) use ($task) {
/** @var OnTask $v */
return $v->task_name == $task->data["task"];
});
$dispatcher->setReturnFunction(function ($return) {
EventDispatcher::interrupt($return);
});
$params = $task->data["params"];
try {
$dispatcher->dispatchEvents(...$params);
} catch (Throwable $e) {
$finish["throw"] = $e;
}
if ($dispatcher->status === EventDispatcher::STATUS_EXCEPTION) {
$finish["result"] = null;
$finish["retcode"] = -1;
} else {
$finish["result"] = $dispatcher->store;
$finish["retcode"] = 0;
}
if (zm_atomic("server_is_stopped")->get() === 1) {
return;
}
$task->finish($finish);
} else {
try {
$dispatcher = new EventDispatcher(OnTaskEvent::class);
$dispatcher->setRuleFunction(function ($v) {
/** @var OnTaskEvent $v */
return eval("return " . $v->getRule() . ";");
});
$dispatcher->dispatchEvents();
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"task\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"task\": " . $error_msg);
Console::trace();
}
}
}
/**
* @throws ReflectionException
* @throws Exception
*/
private function loadAnnotations() {
//加载phar包
/*Console::debug("加载外部phar包中");
$dir = DataProvider::getWorkingDir() . "/resources/package/";
if (version_compare(SWOOLE_VERSION, "4.4.0", ">=")) Timer::clearAll();
if (is_dir($dir)) {
$list = scandir($dir);
unset($list[0], $list[1]);
foreach ($list as $v) {
if (is_dir($dir . $v)) continue;
if (pathinfo($dir . $v, 4) == "phar") {
Console::debug("加载Phar: " . $dir . $v . "");
require_once($dir . $v);
}
}
}*/
//加载各个模块的注解类,以及反射
Console::debug("检索Module中");
$parser = new AnnotationParser();
$path = DataProvider::getWorkingDir() . "/src/";
$dir = scandir($path);
unset($dir[0], $dir[1]);
$composer = json_decode(file_get_contents(DataProvider::getWorkingDir() . "/composer.json"), true);
foreach ($dir as $v) {
if (is_dir($path . "/" . $v) && isset($composer["autoload"]["psr-4"][$v . "\\"]) && !in_array($composer["autoload"]["psr-4"][$v . "\\"], $composer["extra"]["exclude_annotate"] ?? [])) {
if (\server()->worker_id == 0)
Console::verbose("Add " . $v . " to register path");
$parser->addRegisterPath(DataProvider::getWorkingDir() . "/src/" . $v . "/", $v);
}
}
$parser->registerMods();
EventManager::loadEventByParser($parser); //加载事件
//加载自定义的全局函数
Console::debug("加载自定义上下文中...");
$context_class = ZMConfig::get("global", "context_class");
if (!is_a($context_class, ContextInterface::class, true)) {
throw new Exception("Context class must implemented from ContextInterface!");
}
//加载插件
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
if ($obb_onebot["status"]) {
$obj = new OnSwooleEvent();
$obj->class = QQBot::class;
$obj->method = 'handleByEvent';
$obj->type = 'message';
$obj->level = $obb_onebot["message_level"] ?? 99999;
$obj->rule = 'connectIsQQ()';
EventManager::addEvent(OnSwooleEvent::class, $obj);
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", -1);
} else {
LightCacheInside::set("connect", "conn_fd", -2);
}
}
//TODO: 编写加载外部插件的方式
}
private function addWatcher($maindir, $fd) {
$dir = scandir($maindir);
unset($dir[0], $dir[1]);
foreach ($dir as $subdir) {
if (is_dir($maindir . "/" . $subdir)) {
Console::debug("添加监听目录:" . $maindir . "/" . $subdir);
inotify_add_watch($fd, $maindir . "/" . $subdir, IN_ATTRIB | IN_ISDIR);
$this->addWatcher($maindir . "/" . $subdir, $fd);
}
}
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace ZM\Event;
interface SwooleEvent
{
}

View File

@ -0,0 +1,26 @@
<?php
namespace ZM\Event\SwooleEvent;
use Swoole\Process;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
/**
* Class OnBeforeReload
* @package ZM\Event\SwooleEvent
* @SwooleHandler("BeforeReload")
*/
class OnBeforeReload implements SwooleEvent
{
public function onCall() {
Console::info(Console::setColor("Reloading server...", "gold"));
for ($i = 0; $i < ZM_WORKER_NUM; ++$i) {
Process::kill(zm_atomic("_#worker_".$i)->get(), SIGUSR1);
}
usleep(800 * 1000);
}
}

View File

@ -0,0 +1,73 @@
<?php
namespace ZM\Event\SwooleEvent;
use Co;
use Error;
use Exception;
use ZM\Annotation\Swoole\OnCloseEvent;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\ConnectionManager\ManagerGM;
use ZM\Console\Console;
use ZM\Context\Context;
use ZM\Event\EventDispatcher;
use ZM\Event\SwooleEvent;
use ZM\Store\LightCacheInside;
/**
* Class OnClose
* @package ZM\Event\SwooleEvent
* @SwooleHandler("close")
*/
class OnClose implements SwooleEvent
{
/** @noinspection PhpUnreachableStatementInspection */
public function onCall($server, $fd) {
unset(Context::$context[Co::getCid()]);
$conn = ManagerGM::get($fd);
if ($conn === null) return;
Console::debug("Calling Swoole \"close\" event from fd=" . $fd);
set_coroutine_params(["server" => $server, "connection" => $conn, "fd" => $fd]);
$dispatcher1 = new EventDispatcher(OnCloseEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
return $v->connect_type == ctx()->getConnection()->getName() && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'close';
} else {
/** @noinspection PhpUnreachableStatementInspection */
if (strtolower($v->type) == 'close' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
if ($conn->getName() === 'qq' && $obb_onebot["status"] === true) {
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", -1);
}
}
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"close\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"close\": " . $error_msg);
Console::trace();
}
ManagerGM::popConnect($fd);
}
}

View File

@ -0,0 +1,27 @@
<?php /** @noinspection PhpUnusedParameterInspection */
/** @noinspection PhpComposerExtensionStubsInspection */
namespace ZM\Event\SwooleEvent;
use Swoole\Server;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
/**
* Class OnManagerStart
* @package ZM\Event\SwooleEvent
* @SwooleHandler("ManagerStart")
*/
class OnManagerStart implements SwooleEvent
{
public function onCall(Server $server) {
pcntl_signal(SIGINT, function () {
Console::verbose("Interrupted in manager!");
});
Console::verbose("进程 Manager 已启动");
}
}

View File

@ -0,0 +1,68 @@
<?php
namespace ZM\Event\SwooleEvent;
use Error;
use Exception;
use Swoole\Coroutine;
use Swoole\WebSocket\Frame;
use ZM\Annotation\Swoole\OnMessageEvent;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\ConnectionManager\ManagerGM;
use ZM\Console\Console;
use ZM\Console\TermColor;
use ZM\Context\Context;
use ZM\Event\EventDispatcher;
use ZM\Event\SwooleEvent;
/**
* Class OnMessage
* @package ZM\Event\SwooleEvent
* @SwooleHandler("message")
*/
class OnMessage implements SwooleEvent
{
public function onCall($server, Frame $frame) {
Console::debug("Calling Swoole \"message\" from fd=" . $frame->fd . ": " . TermColor::ITALIC . $frame->data . TermColor::RESET);
unset(Context::$context[Coroutine::getCid()]);
$conn = ManagerGM::get($frame->fd);
set_coroutine_params(["server" => $server, "frame" => $frame, "connection" => $conn]);
$dispatcher1 = new EventDispatcher(OnMessageEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
/** @noinspection PhpUnreachableStatementInspection */
return ctx()->getConnection()->getName() == $v->connect_type && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'message';
} else {
/** @noinspection PhpUnreachableStatementInspection
* @noinspection RedundantSuppression
*/
if (strtolower($v->type) == 'message' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
//$starttime = microtime(true);
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
//Console::success("Used ".round((microtime(true) - $starttime) * 1000, 3)." ms!");
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"message\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"message\": " . $error_msg);
Console::trace();
}
}
}

View File

@ -0,0 +1,92 @@
<?php
namespace ZM\Event\SwooleEvent;
use Closure;
use Co;
use Error;
use Exception;
use Swoole\Http\Request;
use ZM\Annotation\Swoole\OnOpenEvent;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\ConnectionManager\ManagerGM;
use ZM\Console\Console;
use ZM\Context\Context;
use ZM\Event\EventDispatcher;
use ZM\Event\SwooleEvent;
use ZM\Store\LightCacheInside;
/**
* Class OnOpen
* @package ZM\Event\SwooleEvent
* @SwooleHandler("open")
*/
class OnOpen implements SwooleEvent
{
/** @noinspection PhpUnreachableStatementInspection */
public function onCall($server, Request $request) {
Console::debug("Calling Swoole \"open\" event from fd=" . $request->fd);
unset(Context::$context[Co::getCid()]);
$type = strtolower($request->header["x-client-role"] ?? $request->get["type"] ?? "");
$access_token = explode(" ", $request->header["authorization"] ?? "")[1] ?? $request->get["token"] ?? "";
$token = ZMConfig::get("global", "access_token");
if ($token instanceof Closure) {
if (!$token($access_token)) {
$server->close($request->fd);
Console::warning("Unauthorized access_token: " . $access_token);
return;
}
} elseif (is_string($token)) {
if ($access_token !== $token && $token !== "") {
$server->close($request->fd);
Console::warning("Unauthorized access_token: " . $access_token);
return;
}
}
$type_conn = ManagerGM::getTypeClassName($type);
ManagerGM::pushConnect($request->fd, $type_conn);
$conn = ManagerGM::get($request->fd);
set_coroutine_params(["server" => $server, "request" => $request, "connection" => $conn, "fd" => $request->fd]);
$conn->setOption("connect_id", strval($request->header["x-self-id"] ?? ""));
$dispatcher1 = new EventDispatcher(OnOpenEvent::class);
$dispatcher1->setRuleFunction(function ($v) {
return ctx()->getConnection()->getName() == $v->connect_type && eval("return " . $v->getRule() . ";");
});
$dispatcher = new EventDispatcher(OnSwooleEvent::class);
$dispatcher->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'open';
} else {
if (strtolower($v->type) == 'open' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
$onebot_status = $obb_onebot["status"];
if ($conn->getName() === 'qq' && $onebot_status === true) {
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", $request->fd);
}
}
$dispatcher1->dispatchEvents($conn);
$dispatcher->dispatchEvents($conn);
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"open\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"open\": " . $error_msg);
Console::trace();
}
}
}

View File

@ -0,0 +1,23 @@
<?php /** @noinspection PhpUnusedParameterInspection */
namespace ZM\Event\SwooleEvent;
use Swoole\Server;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Event\SwooleEvent;
use ZM\Utils\ProcessManager;
/**
* Class OnPipeMessage
* @package ZM\Event\SwooleEvent
* @SwooleHandler("PipeMessage")
*/
class OnPipeMessage implements SwooleEvent
{
public function onCall(Server $server, $src_worker_id, $data) {
$data = json_decode($data, true);
ProcessManager::workerAction($src_worker_id, $data);
}
}

View File

@ -0,0 +1,111 @@
<?php
namespace ZM\Event\SwooleEvent;
use Co;
use Error;
use Exception;
use Swoole\Http\Request;
use ZM\Annotation\Http\RequestMapping;
use ZM\Annotation\Swoole\OnRequestEvent;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\Console\Console;
use ZM\Context\Context;
use ZM\Event\EventDispatcher;
use ZM\Event\SwooleEvent;
use ZM\Exception\InterruptException;
use ZM\Http\Response;
use ZM\Utils\HttpUtil;
/**
* Class OnRequest
* @package ZM\Event\SwooleEvent
* @SwooleHandler("request")
*/
class OnRequest implements SwooleEvent
{
public function onCall(?Request $request, ?\Swoole\Http\Response $response) {
$response = new Response($response);
foreach (ZMConfig::get("global")["http_header"] as $k => $v) {
$response->setHeader($k, $v);
}
unset(Context::$context[Co::getCid()]);
Console::debug("Calling Swoole \"request\" event from fd=" . $request->fd);
set_coroutine_params(["request" => $request, "response" => $response]);
$dis1 = new EventDispatcher(OnRequestEvent::class);
$dis1->setRuleFunction(function ($v) {
/** @noinspection PhpUnreachableStatementInspection */
return eval("return " . $v->getRule() . ";") ? true : false;
});
$dis = new EventDispatcher(OnSwooleEvent::class);
$dis->setRuleFunction(function ($v) {
if ($v->getRule() == '') {
return strtolower($v->type) == 'request';
} else {
/** @noinspection PhpUnreachableStatementInspection */
if (strtolower($v->type) == 'request' && eval("return " . $v->getRule() . ";")) return true;
else return false;
}
});
try {
$dis1->dispatchEvents($request, $response);
$dis->dispatchEvents($request, $response);
if ($dis->status === EventDispatcher::STATUS_NORMAL && $dis1->status === EventDispatcher::STATUS_NORMAL) {
$result = HttpUtil::parseUri($request, $response, $request->server["request_uri"], $node, $params);
if ($result === true) {
ctx()->setCache("params", $params);
$dispatcher = new EventDispatcher(RequestMapping::class);
$div = new RequestMapping();
$div->route = $node["route"];
$div->params = $params;
$div->method = $node["method"];
$div->request_method = $node["request_method"];
$div->class = $node["class"];
//Console::success("正在执行路由:".$node["method"]);
$dispatcher->dispatchEvent($div, null, $params, $request, $response);
if (is_string($dispatcher->store) && !$response->isEnd()) $response->end($dispatcher->store);
}
}
if (!$response->isEnd()) {
//Console::warning('返回了404');
HttpUtil::responseCodePage($response, 404);
}
} catch (InterruptException $e) {
// do nothing
} catch (Exception $e) {
$response->status(500);
Console::info($request->server["remote_addr"] . ":" . $request->server["remote_port"] .
" [" . $response->getStatusCode() . "] " . $request->server["request_uri"]
);
if (!$response->isEnd()) {
if (ZMConfig::get("global", "debug_mode"))
$response->end("Internal server exception: " . $e->getMessage());
else
$response->end("Internal server error.");
}
Console::error("Internal server exception (500), caused by " . get_class($e) . ": " . $e->getMessage());
Console::log($e->getTraceAsString(), "gray");
} catch (Error $e) {
$response->status(500);
Console::info($request->server["remote_addr"] . ":" . $request->server["remote_port"] .
" [" . $response->getStatusCode() . "] " . $request->server["request_uri"]
);
if (!$response->isEnd()) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
if (ZMConfig::get("global", "debug_mode"))
$response->end("Internal server error: " . $error_msg);
else
$response->end("Internal server error.");
}
Console::error("Internal server error (500), caused by " . get_class($e) . ": " . $e->getMessage());
Console::log($e->getTraceAsString(), "gray");
}
}
}

View File

@ -0,0 +1,22 @@
<?php /** @noinspection PhpUnusedParameterInspection */
namespace ZM\Event\SwooleEvent;
use Swoole\Server;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
/**
* Class OnShutdown
* @package ZM\Event\SwooleEvent
* @SwooleHandler("shutdown")
*/
class OnShutdown implements SwooleEvent
{
public function onCall(Server $server) {
Console::verbose("正在关闭 Master 进程pid=" . posix_getpid());
}
}

View File

@ -0,0 +1,74 @@
<?php /** @noinspection PhpComposerExtensionStubsInspection */
namespace ZM\Event\SwooleEvent;
use Swoole\Event;
use Swoole\Process;
use Swoole\Server;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
use ZM\Framework;
use ZM\Utils\DataProvider;
use ZM\Utils\ZMUtil;
/**
* Class OnStart
* @package ZM\Event\SwooleEvent
* @SwooleHandler("start")
*/
class OnStart implements SwooleEvent
{
public function onCall(Server $server) {
$r = null;
Process::signal(SIGINT, function () use ($r, $server) {
if (zm_atomic("_int_is_reload")->get() === 1) {
zm_atomic("_int_is_reload")->set(0);
$server->reload();
} else {
echo "\r";
Console::warning("Server interrupted(SIGINT) on Master.");
if ((Framework::$server->inotify ?? null) !== null)
/** @noinspection PhpUndefinedFieldInspection */ Event::del(Framework::$server->inotify);
Process::kill($server->master_pid, SIGTERM);
}
});
if (Framework::$argv["daemon"]) {
$daemon_data = json_encode([
"pid" => $server->master_pid,
"stdout" => ZMConfig::get("global")["swoole"]["log_file"]
], 128 | 256);
file_put_contents(DataProvider::getWorkingDir() . "/.daemon_pid", $daemon_data);
}
if (Framework::$argv["watch"]) {
if (extension_loaded('inotify')) {
Console::warning("Enabled File watcher, do not use in production.");
/** @noinspection PhpUndefinedFieldInspection */
Framework::$server->inotify = $fd = inotify_init();
$this->addWatcher(DataProvider::getWorkingDir() . "/src", $fd);
Event::add($fd, function () use ($fd) {
$r = inotify_read($fd);
dump($r);
ZMUtil::reload();
});
} else {
Console::warning("You have not loaded \"inotify\" extension, please install first.");
}
}
}
private function addWatcher($maindir, $fd) {
$dir = scandir($maindir);
unset($dir[0], $dir[1]);
foreach ($dir as $subdir) {
if (is_dir($maindir . "/" . $subdir)) {
Console::debug("添加监听目录:" . $maindir . "/" . $subdir);
inotify_add_watch($fd, $maindir . "/" . $subdir, IN_ATTRIB | IN_ISDIR);
$this->addWatcher($maindir . "/" . $subdir, $fd);
}
}
}
}

View File

@ -0,0 +1,77 @@
<?php
namespace ZM\Event\SwooleEvent;
use Error;
use Exception;
use Swoole\Server;
use Swoole\Server\Task;
use Throwable;
use ZM\Annotation\Swoole\OnTaskEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\Console;
use ZM\Event\EventDispatcher;
use ZM\Event\SwooleEvent;
/**
* Class OnTask
* @package ZM\Event\SwooleEvent
* @SwooleHandler("task")
*/
class OnTask implements SwooleEvent
{
/**
* @noinspection PhpUnreachableStatementInspection
* @noinspection PhpUnusedParameterInspection
* @param Server|null $server
* @param Task $task
*/
public function onCall(?Server $server, Task $task) {
if (isset($task->data["task"])) {
$dispatcher = new EventDispatcher(\ZM\Annotation\Swoole\OnTask::class);
$dispatcher->setRuleFunction(function ($v) use ($task) {
/** @var \ZM\Annotation\Swoole\OnTask $v */
return $v->task_name == $task->data["task"];
});
$dispatcher->setReturnFunction(function ($return) {
EventDispatcher::interrupt($return);
});
$params = $task->data["params"];
try {
$dispatcher->dispatchEvents(...$params);
} catch (Throwable $e) {
$finish["throw"] = $e;
}
if ($dispatcher->status === EventDispatcher::STATUS_EXCEPTION) {
$finish["result"] = null;
$finish["retcode"] = -1;
} else {
$finish["result"] = $dispatcher->store;
$finish["retcode"] = 0;
}
if (zm_atomic("server_is_stopped")->get() === 1) {
return;
}
$task->finish($finish);
} else {
try {
$dispatcher = new EventDispatcher(OnTaskEvent::class);
$dispatcher->setRuleFunction(function ($v) {
/** @var OnTaskEvent $v */
return eval("return " . $v->getRule() . ";");
});
$dispatcher->dispatchEvents();
} catch (Exception $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught exception " . get_class($e) . " when calling \"task\": " . $error_msg);
Console::trace();
} catch (Error $e) {
$error_msg = $e->getMessage() . " at " . $e->getFile() . "(" . $e->getLine() . ")";
Console::error("Uncaught " . get_class($e) . " when calling \"task\": " . $error_msg);
Console::trace();
}
}
}
}

View File

@ -0,0 +1,24 @@
<?php /** @noinspection PhpUnusedParameterInspection */
namespace ZM\Event\SwooleEvent;
use Swoole\Server;
use Swoole\Timer;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
/**
* Class OnWorkerExit
* @package ZM\Event\SwooleEvent
* @SwooleHandler("WorkerExit")
*/
class OnWorkerExit implements SwooleEvent
{
public function onCall(Server $server, $worker_id) {
Timer::clearAll();
Console::info("正在结束 Worker #".$worker_id.",进程内可能有事务在运行...");
}
}

View File

@ -0,0 +1,232 @@
<?php /** @noinspection PhpComposerExtensionStubsInspection */
namespace ZM\Event\SwooleEvent;
use Error;
use Exception;
use PDO;
use ReflectionException;
use Swoole\Coroutine;
use Swoole\Database\PDOConfig;
use Swoole\Database\PDOPool;
use Swoole\Process;
use Swoole\Server;
use Swoole\Timer;
use ZM\Annotation\AnnotationParser;
use ZM\Annotation\Swoole\OnStart;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\Console\Console;
use ZM\Context\Context;
use ZM\Context\ContextInterface;
use ZM\DB\DB;
use ZM\Event\EventDispatcher;
use ZM\Event\EventManager;
use ZM\Event\SwooleEvent;
use ZM\Exception\DbException;
use ZM\Framework;
use ZM\Module\QQBot;
use ZM\Store\LightCacheInside;
use ZM\Store\MySQL\SqlPoolStorage;
use ZM\Store\Redis\ZMRedisPool;
use ZM\Utils\DataProvider;
use ZM\Utils\ProcessManager;
use ZM\Utils\ZMUtil;
/**
* Class OnWorkerStart
* @package ZM\Event\SwooleEvent
* @SwooleHandler("WorkerStart")
*/
class OnWorkerStart implements SwooleEvent
{
public function onCall(Server $server, $worker_id) {
Process::signal(SIGINT, function () use ($worker_id, $server) {
});
unset(Context::$context[Coroutine::getCid()]);
if ($server->taskworker === false) {
Process::signal(SIGUSR1, function () use ($worker_id) {
Timer::clearAll();
ProcessManager::resumeAllWorkerCoroutines();
});
zm_atomic("_#worker_" . $worker_id)->set($server->worker_pid);
if (LightCacheInside::get("wait_api", "wait_api") !== null) {
LightCacheInside::unset("wait_api", "wait_api");
}
try {
register_shutdown_function(function () use ($server) {
$error = error_get_last();
if ($error["type"] != 0) {
Console::error("Internal fatal error: " . $error["message"] . " at " . $error["file"] . "({$error["line"]})");
}
//DataProvider::saveBuffer();
/** @var Server $server */
if (server() === null) $server->shutdown();
else server()->shutdown();
});
Console::verbose("Worker #{$server->worker_id} 启动中");
Framework::$server = $server;
//ZMBuf::resetCache(); //清空变量缓存
//ZMBuf::set("wait_start", []); //添加队列在workerStart运行完成前先让其他协程等待执行
foreach ($server->connections as $v) {
$server->close($v);
}
//TODO: 单独抽出来MySQL和Redis连接池
if (ZMConfig::get("global", "sql_config")["sql_host"] != "") {
if (SqlPoolStorage::$sql_pool !== null) {
SqlPoolStorage::$sql_pool->close();
SqlPoolStorage::$sql_pool = null;
}
Console::info("新建SQL连接池中");
ob_start();
phpinfo();
$str = ob_get_clean();
$str = explode("\n", $str);
foreach ($str as $k => $v) {
$v = trim($v);
if ($v == "") continue;
if (mb_strpos($v, "API Extensions") === false) continue;
if (mb_strpos($v, "pdo_mysql") === false) {
throw new DbException("未安装 mysqlnd php-mysql扩展。");
}
}
$sql = ZMConfig::get("global", "sql_config");
SqlPoolStorage::$sql_pool = new PDOPool((new PDOConfig())
->withHost($sql["sql_host"])
->withPort($sql["sql_port"])
// ->withUnixSocket('/tmp/mysql.sock')
->withDbName($sql["sql_database"])
->withCharset('utf8mb4')
->withUsername($sql["sql_username"])
->withPassword($sql["sql_password"])
->withOptions($sql["sql_options"] ?? [PDO::ATTR_STRINGIFY_FETCHES => false])
);
DB::initTableList();
}
// 开箱即用的Redis
$redis = ZMConfig::get("global", "redis_config");
if ($redis !== null && $redis["host"] != "") {
if (!extension_loaded("redis")) Console::error("Can not find redis extension.\n");
else ZMRedisPool::init($redis);
}
$this->loadAnnotations(); //加载composer资源、phar外置包、注解解析注册等
//echo json_encode(debug_backtrace(), 128|256);
EventManager::registerTimerTick(); //启动计时器
//ZMBuf::unsetCache("wait_start");
set_coroutine_params(["server" => $server, "worker_id" => $worker_id]);
$dispatcher = new EventDispatcher(OnStart::class);
$dispatcher->setRuleFunction(function ($v) {
return server()->worker_id === $v->worker_id || $v->worker_id === -1;
});
$dispatcher->dispatchEvents($server, $worker_id);
if ($dispatcher->status === EventDispatcher::STATUS_NORMAL) Console::debug("@OnStart 执行完毕");
else Console::warning("@OnStart 执行异常!");
Console::success("Worker #" . $worker_id . " 已启动");
} catch (Exception $e) {
Console::error("Worker加载出错停止服务");
Console::error($e->getMessage() . "\n" . $e->getTraceAsString());
Process::kill($server->master_pid, SIGTERM);
return;
} catch (Error $e) {
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
Process::kill($server->master_pid, SIGTERM);
}
} else {
// 这里是TaskWorker初始化的内容部分
try {
Framework::$server = $server;
$this->loadAnnotations();
Console::success("TaskWorker #" . $server->worker_id . " 已启动");
} catch (Exception $e) {
Console::error("Worker加载出错停止服务");
Console::error($e->getMessage() . "\n" . $e->getTraceAsString());
Process::kill($server->master_pid, SIGTERM);
return;
} catch (Error $e) {
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
Process::kill($server->master_pid, SIGTERM);
}
}
}
/**
* @throws ReflectionException
* @throws Exception
*/
private function loadAnnotations() {
//加载phar包
/*Console::debug("加载外部phar包中");
$dir = DataProvider::getWorkingDir() . "/resources/package/";
if (version_compare(SWOOLE_VERSION, "4.4.0", ">=")) Timer::clearAll();
if (is_dir($dir)) {
$list = scandir($dir);
unset($list[0], $list[1]);
foreach ($list as $v) {
if (is_dir($dir . $v)) continue;
if (pathinfo($dir . $v, 4) == "phar") {
Console::debug("加载Phar: " . $dir . $v . "");
require_once($dir . $v);
}
}
}*/
//加载各个模块的注解类,以及反射
Console::debug("检索Module中");
$parser = new AnnotationParser();
$path = DataProvider::getWorkingDir() . "/src/";
$dir = scandir($path);
unset($dir[0], $dir[1]);
$composer = json_decode(file_get_contents(DataProvider::getWorkingDir() . "/composer.json"), true);
foreach ($dir as $v) {
if (is_dir($path . "/" . $v) && isset($composer["autoload"]["psr-4"][$v . "\\"]) && !in_array($composer["autoload"]["psr-4"][$v . "\\"], $composer["extra"]["exclude_annotate"] ?? [])) {
if (\server()->worker_id == 0)
Console::verbose("Add " . $v . " to register path");
$parser->addRegisterPath(DataProvider::getWorkingDir() . "/src/" . $v . "/", $v);
}
}
$parser->registerMods();
EventManager::loadEventByParser($parser); //加载事件
//加载自定义的全局函数
Console::debug("加载自定义上下文中...");
$context_class = ZMConfig::get("global", "context_class");
if (!is_a($context_class, ContextInterface::class, true)) {
throw new Exception("Context class must implemented from ContextInterface!");
}
//加载插件
$obb_onebot = ZMConfig::get("global", "onebot") ??
ZMConfig::get("global", "modules")["onebot"] ??
["status" => true, "single_bot_mode" => false, "message_level" => 99999];
if ($obb_onebot["status"]) {
$obj = new OnSwooleEvent();
$obj->class = QQBot::class;
$obj->method = 'handleByEvent';
$obj->type = 'message';
$obj->level = $obb_onebot["message_level"] ?? 99999;
$obj->rule = 'connectIsQQ()';
EventManager::addEvent(OnSwooleEvent::class, $obj);
if ($obb_onebot["single_bot_mode"]) {
LightCacheInside::set("connect", "conn_fd", -1);
} else {
LightCacheInside::set("connect", "conn_fd", -2);
}
}
//TODO: 编写加载外部插件的方式
}
}

View File

@ -0,0 +1,27 @@
<?php
namespace ZM\Event\SwooleEvent;
use Swoole\Server;
use ZM\Annotation\Swoole\SwooleHandler;
use ZM\Config\ZMConfig;
use ZM\Console\Console;
use ZM\Event\SwooleEvent;
use ZM\Store\LightCache;
/**
* Class OnWorkerStop
* @package ZM\Event\SwooleEvent
* @SwooleHandler("WorkerStop")
*/
class OnWorkerStop implements SwooleEvent
{
public function onCall(Server $server, $worker_id) {
if ($worker_id == (ZMConfig::get("worker_cache")["worker"] ?? 0)) {
LightCache::savePersistence();
}
Console::verbose(($server->taskworker ? "Task" : "") . "Worker #$worker_id 已停止");
}
}

View File

@ -12,7 +12,6 @@ use ZM\Annotation\Swoole\OnSetup;
use ZM\Config\ZMConfig; use ZM\Config\ZMConfig;
use ZM\ConnectionManager\ManagerGM; use ZM\ConnectionManager\ManagerGM;
use ZM\Console\TermColor; use ZM\Console\TermColor;
use ZM\Event\ServerEventHandler;
use ZM\Store\LightCache; use ZM\Store\LightCache;
use ZM\Store\LightCacheInside; use ZM\Store\LightCacheInside;
use ZM\Store\Lock\SpinLock; use ZM\Store\Lock\SpinLock;
@ -97,6 +96,9 @@ class Framework
$add_port = ZMConfig::get("global", "remote_terminal")["status"] ?? false; $add_port = ZMConfig::get("global", "remote_terminal")["status"] ?? false;
$this->parseCliArgs(self::$argv, $add_port); $this->parseCliArgs(self::$argv, $add_port);
if (!isset($this->server_set["max_wait_time"])) {
$this->server_set["max_wait_time"] = 5;
}
$worker = $this->server_set["worker_num"] ?? swoole_cpu_num(); $worker = $this->server_set["worker_num"] ?? swoole_cpu_num();
define("ZM_WORKER_NUM", $worker); define("ZM_WORKER_NUM", $worker);
ZMAtomic::init(); ZMAtomic::init();
@ -286,6 +288,7 @@ class Framework
self::$loaded_files = get_included_files(); self::$loaded_files = get_included_files();
self::$server->start(); self::$server->start();
zm_atomic("server_is_stopped")->set(1); zm_atomic("server_is_stopped")->set(1);
Console::log("zhamao-framework is stopped.");
} }
/** /**
@ -294,13 +297,23 @@ class Framework
* @throws ReflectionException * @throws ReflectionException
*/ */
private function registerServerEvents() { private function registerServerEvents() {
$all_event_class = ZMConfig::get("global", "server_event_handler_class") ?? [];
if (!in_array(ServerEventHandler::class, $all_event_class)) {
$all_event_class[] = ServerEventHandler::class;
}
$event_list = []; $event_list = [];
foreach ($all_event_class as $v) {
$reader = new AnnotationReader(); $reader = new AnnotationReader();
$all = getAllClasses(FRAMEWORK_ROOT_DIR . "/src/ZM/Event/SwooleEvent/", "ZM\\Event\\SwooleEvent");
foreach ($all as $v) {
$class = new $v();
$reflection_class = new ReflectionClass($class);
$anno_class = $reader->getClassAnnotation($reflection_class, SwooleHandler::class);
if ($anno_class !== null) { // 类名形式的注解
$anno_class->class = $v;
$anno_class->method = "onCall";
$event_list[strtolower($anno_class->event)] = $anno_class;
}
}
$all_event_class = ZMConfig::get("global", "server_event_handler_class") ?? [];
foreach ($all_event_class as $v) {
$reflection_class = new ReflectionClass($v); $reflection_class = new ReflectionClass($v);
$methods = $reflection_class->getMethods(ReflectionMethod::IS_PUBLIC); $methods = $reflection_class->getMethods(ReflectionMethod::IS_PUBLIC);
foreach ($methods as $vs) { foreach ($methods as $vs) {
@ -338,75 +351,62 @@ class Framework
global $terminal_id; global $terminal_id;
$terminal_id = uuidgen(); $terminal_id = uuidgen();
foreach ($args as $x => $y) { foreach ($args as $x => $y) {
if ($y) {
switch ($x) { switch ($x) {
case 'worker-num': case 'worker-num':
if ($y) {
if (intval($y) >= 1 && intval($y) <= 1024) { if (intval($y) >= 1 && intval($y) <= 1024) {
$this->server_set["worker_num"] = intval($y); $this->server_set["worker_num"] = intval($y);
} else { } else {
Console::warning("Invalid worker num! Turn to default value (".($this->server_set["worker_num"] ?? swoole_cpu_num()).")"); Console::warning("Invalid worker num! Turn to default value (" . ($this->server_set["worker_num"] ?? swoole_cpu_num()) . ")");
}
} }
break; break;
case 'task-worker-num': case 'task-worker-num':
if ($y) {
if (intval($y) >= 1 && intval($y) <= 1024) { if (intval($y) >= 1 && intval($y) <= 1024) {
$this->server_set["task_worker_num"] = intval($y); $this->server_set["task_worker_num"] = intval($y);
$this->server_set["task_enable_coroutine"] = true; $this->server_set["task_enable_coroutine"] = true;
} else { } else {
Console::warning("Invalid worker num! Turn to default value (0)"); Console::warning("Invalid worker num! Turn to default value (0)");
} }
}
break; break;
case 'disable-coroutine': case 'disable-coroutine':
if ($y) {
$coroutine_mode = false; $coroutine_mode = false;
}
break; break;
case 'debug-mode': case 'debug-mode':
if ($y || ZMConfig::get("global", "debug_mode")) {
$coroutine_mode = false; $coroutine_mode = false;
$terminal_id = null; $terminal_id = null;
Console::warning("You are in debug mode, do not use in production!"); Console::warning("You are in debug mode, do not use in production!");
}
break; break;
case 'daemon': case 'daemon':
if ($y) {
$this->server_set["daemonize"] = 1; $this->server_set["daemonize"] = 1;
Console::$theme = "no-color"; Console::$theme = "no-color";
Console::log("已启用守护进程,输出重定向到 " . $this->server_set["log_file"]); Console::log("已启用守护进程,输出重定向到 " . $this->server_set["log_file"]);
$terminal_id = null; $terminal_id = null;
}
break; break;
case 'disable-console-input': case 'disable-console-input':
case 'no-interaction': case 'no-interaction':
if ($y) $terminal_id = null; $terminal_id = null;
break; break;
case 'log-error': case 'log-error':
if ($y) Console::setLevel(0); Console::setLevel(0);
break; break;
case 'log-warning': case 'log-warning':
if ($y) Console::setLevel(1); Console::setLevel(1);
break; break;
case 'log-info': case 'log-info':
if ($y) Console::setLevel(2); Console::setLevel(2);
break; break;
case 'log-verbose': case 'log-verbose':
case 'verbose': case 'verbose':
if ($y) Console::setLevel(3); Console::setLevel(3);
break; break;
case 'log-debug': case 'log-debug':
if ($y) Console::setLevel(4); Console::setLevel(4);
break; break;
case 'log-theme': case 'log-theme':
if ($y !== null) {
Console::$theme = $y; Console::$theme = $y;
}
break; break;
case 'remote-terminal': case 'remote-terminal':
if ($y) {
$add_port = true; $add_port = true;
}
break; break;
case 'show-php-ver': case 'show-php-ver':
default: default:
@ -415,6 +415,7 @@ class Framework
break; break;
} }
} }
}
if ($coroutine_mode) Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL); if ($coroutine_mode) Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
else Runtime::enableCoroutine(false, SWOOLE_HOOK_ALL); else Runtime::enableCoroutine(false, SWOOLE_HOOK_ALL);
} }
@ -508,4 +509,15 @@ class Framework
public static function getTtyWidth(): string { public static function getTtyWidth(): string {
return explode(" ", trim(exec("stty size")))[1]; return explode(" ", trim(exec("stty size")))[1];
} }
public static function loadFrameworkState() {
if (!file_exists(DataProvider::getDataFolder() . ".state.json")) return [];
$r = json_decode(file_get_contents(DataProvider::getDataFolder() . ".state.json"), true);
if ($r === null) $r = [];
return $r;
}
public static function saveFrameworkState($data) {
return file_put_contents(DataProvider::getDataFolder() . ".state.json", json_encode($data, 64 | 128 | 256));
}
} }

View File

@ -10,6 +10,8 @@ use ZM\Annotation\Swoole\OnSave;
use ZM\Console\Console; use ZM\Console\Console;
use ZM\Event\EventDispatcher; use ZM\Event\EventDispatcher;
use ZM\Exception\ZMException; use ZM\Exception\ZMException;
use ZM\Framework;
use ZM\Utils\ProcessManager;
class LightCache class LightCache
{ {
@ -50,6 +52,31 @@ class LightCache
} }
if ($result === false) { if ($result === false) {
self::$last_error = '系统内存不足,申请失败'; self::$last_error = '系统内存不足,申请失败';
} else {
$obj = Framework::loadFrameworkState();
foreach(($obj["expiring_light_cache"] ?? []) as $k => $v) {
$value = $v["value"];
if (is_array($value)) {
$value = json_encode($value, JSON_UNESCAPED_UNICODE);
if (strlen($value) >= self::$config["max_strlen"]) return false;
$data_type = "json";
} elseif (is_string($value)) {
$data_type = "";
} elseif (is_int($value)) {
$data_type = "int";
} elseif (is_bool($value)) {
$data_type = "bool";
$value = json_encode($value);
} else {
return false;
}
$result = self::$kv_table->set($k, [
"value" => $value,
"expire" => $v["expire"],
"data_type" => $data_type
]);
if ($result === false) return false;
}
} }
return $result; return $result;
} }
@ -78,6 +105,19 @@ class LightCache
return $r === false ? null : $r - time(); return $r === false ? null : $r - time();
} }
/**
* @param string $key
* @return mixed|null
* @throws ZMException
* @since 2.4.3
*/
public static function getExpireTS(string $key) {
if (self::$kv_table === null) throw new ZMException("not initialized LightCache");
self::checkExpire($key);
$r = self::$kv_table->get($key, "expire");
return $r === false ? null : $r;
}
/** /**
* @param string $key * @param string $key
* @param string|array|int $value * @param string|array|int $value
@ -206,6 +246,10 @@ class LightCache
* @throws Exception * @throws Exception
*/ */
public static function savePersistence() { public static function savePersistence() {
if (server()->worker_id !== MAIN_WORKER) {
ProcessManager::sendActionToWorker(MAIN_WORKER, "save_persistence", []);
return;
}
$dispatcher = new EventDispatcher(OnSave::class); $dispatcher = new EventDispatcher(OnSave::class);
$dispatcher->dispatchEvents(); $dispatcher->dispatchEvents();
@ -220,6 +264,25 @@ class LightCache
} }
file_put_contents(self::$config["persistence_path"], json_encode($r, 64 | 128 | 256)); file_put_contents(self::$config["persistence_path"], json_encode($r, 64 | 128 | 256));
} }
$obj = Framework::loadFrameworkState();
$obj["expiring_light_cache"] = [];
$del = [];
foreach (self::$kv_table as $k => $v) {
if ($v["expire"] <= time() && $v["expire"] >= 0) {
$del[] = $k;
continue;
} elseif ($v["expire"] > time()) {
$obj["expiring_light_cache"][$k] = [
"expire" => $v["expire"],
"value" => self::parseGet($v)
];
}
}
foreach ($del as $v) {
self::unset($v);
}
Framework::saveFrameworkState($obj);
Console::verbose("Saved."); Console::verbose("Saved.");
} }

View File

@ -68,7 +68,7 @@ class CoMessage
LightCacheInside::set("wait_api", "wait_api", $all); LightCacheInside::set("wait_api", "wait_api", $all);
SpinLock::unlock("wait_api"); SpinLock::unlock("wait_api");
if ($all[$last]["worker_id"] != server()->worker_id) { if ($all[$last]["worker_id"] != server()->worker_id) {
ZMUtil::sendActionToWorker($all[$last]["worker_id"], "resume_ws_message", $all[$last]); ProcessManager::sendActionToWorker($all[$last]["worker_id"], "resume_ws_message", $all[$last]);
} else { } else {
Co::resume($all[$last]["coroutine"]); Co::resume($all[$last]["coroutine"]);
} }

View File

@ -20,7 +20,7 @@ class ProcessManager
$server = server(); $server = server();
switch ($data["action"] ?? '') { switch ($data["action"] ?? '') {
case 'add_short_command': case 'add_short_command':
Console::verbose("Adding short command ".$data["data"][0]); Console::verbose("Adding short command " . $data["data"][0]);
$obj = new CQCommand(); $obj = new CQCommand();
$obj->method = quick_reply_closure($data["data"][1]); $obj->method = quick_reply_closure($data["data"][1]);
$obj->match = $data["data"][0]; $obj->match = $data["data"][0];
@ -111,11 +111,11 @@ class ProcessManager
public static function resumeAllWorkerCoroutines() { public static function resumeAllWorkerCoroutines() {
if (server()->worker_id === -1) { if (server()->worker_id === -1) {
Console::warning("Cannot call '".__FUNCTION__."' in non-worker process!"); Console::warning("Cannot call '" . __FUNCTION__ . "' in non-worker process!");
return; return;
} }
foreach ((LightCacheInside::get("wait_api", "wait_api") ?? []) as $k => $v) { foreach ((LightCacheInside::get("wait_api", "wait_api") ?? []) as $k => $v) {
if (($v["result"] ?? false) === null && isset($v["coroutine"], $v["worker_id"])) { if (isset($v["coroutine"], $v["worker_id"])) {
if (server()->worker_id == $v["worker_id"]) Co::resume($v["coroutine"]); if (server()->worker_id == $v["worker_id"]) Co::resume($v["coroutine"]);
} }
} }

View File

@ -4,17 +4,20 @@
namespace ZM\Utils; namespace ZM\Utils;
use Doctrine\Common\Annotations\AnnotationReader;
use Exception; use Exception;
use Psy\Shell; use ReflectionClass;
use Swoole\Process;
use ZM\Annotation\Command\TerminalCommand; use ZM\Annotation\Command\TerminalCommand;
use ZM\ConnectionManager\ManagerGM; use ZM\ConnectionManager\ManagerGM;
use ZM\Console\Console; use ZM\Console\Console;
use ZM\Event\EventDispatcher; use ZM\Event\EventDispatcher;
use ZM\Event\EventManager; use ZM\Event\EventManager;
use ZM\Framework;
class Terminal class Terminal
{ {
public static $default_commands = false;
/** /**
* @param string $cmd * @param string $cmd
* @return bool * @return bool
@ -23,75 +26,14 @@ class Terminal
* @throws Exception * @throws Exception
*/ */
public static function executeCommand(string $cmd) { public static function executeCommand(string $cmd) {
if (self::$default_commands === false) {
self::init();
}
$it = explodeMsg($cmd); $it = explodeMsg($cmd);
switch ($it[0] ?? '') {
case 'help':
$help[] = "exit | q:\t断开远程终端";
$help[] = "logtest:\t输出所有可以打印的log等级示例消息用于测试Console";
$help[] = "call:\t\t用于执行不需要参数的动态函数,比如 `call \Module\Example\Hello hitokoto`";
$help[] = "level:\t\t设置log等级例如 `level 0|1|2|3|4`";
$help[] = "bc:\t\teval执行代码但输入必须是将代码base64之后的如 `bc em1faW5mbygn5L2g5aW9Jyk7`";
$help[] = "stop:\t\t停止服务器";
$help[] = "reload | r:\t热重启用户编写的模块代码";
foreach((EventManager::$events[TerminalCommand::class] ?? []) as $v) {
$help[]=$v->command.":\t\t".(empty($v->description) ? "<用户自定义指令>" : $v->description);
}
echo implode("\n", $help) . PHP_EOL;
return true;
case 'logtest':
Console::log(date("[H:i:s]") . " [L] This is normal msg. (0)");
Console::error("This is error msg. (0)");
Console::warning("This is warning msg. (1)");
Console::info("This is info msg. (2)");
Console::success("This is success msg. (2)");
Console::verbose("This is verbose msg. (3)");
Console::debug("This is debug msg. (4)");
return true;
case 'call':
$class_name = $it[1];
$function_name = $it[2];
$class = new $class_name([]);
$r = $class->$function_name();
if (is_string($r)) Console::success($r);
return true;
case 'psysh':
if (Framework::$argv["disable-coroutine"]) {
(new Shell())->run();
} else
Console::error("Only \"--disable-coroutine\" mode can use psysh!!!");
return true;
case 'level':
$level = intval(is_numeric($it[1] ?? 99) ? ($it[1] ?? 99) : 99);
if ($level > 4 || $level < 0) Console::warning("Usage: 'level 0|1|2|3|4'");
else Console::setLevel($level) || Console::success("Success!!");
break;
case 'bc':
$code = base64_decode($it[1] ?? '', true);
try {
eval($code);
} catch (Exception $e) {
}
return true;
case 'echo':
Console::info($it[1]);
return true;
case 'color':
Console::log($it[2], $it[1]);
return true;
case 'stop':
ZMUtil::stop();
return false;
case 'reload':
case 'r':
ZMUtil::reload();
return false;
case '':
return true;
default:
$dispatcher = new EventDispatcher(TerminalCommand::class); $dispatcher = new EventDispatcher(TerminalCommand::class);
$dispatcher->setRuleFunction(function ($v) use ($it) { $dispatcher->setRuleFunction(function ($v) use ($it) {
/** @var TerminalCommand $v */ /** @var TerminalCommand $v */
return $v->command == $it[0]; return $v->command == $it[0] || $v->alias == $it[0];
}); });
$dispatcher->setReturnFunction(function () { $dispatcher->setReturnFunction(function () {
EventDispatcher::interrupt('none'); EventDispatcher::interrupt('none');
@ -101,7 +43,6 @@ class Terminal
Console::info("Command not found: " . $cmd); Console::info("Command not found: " . $cmd);
return true; return true;
} }
}
return false; return false;
} }
@ -119,4 +60,129 @@ class Terminal
server()->send($v->getFd(), ">>> "); server()->send($v->getFd(), ">>> ");
} }
} }
public static function init() {
Console::debug("Initializing Terminal...");
foreach ((EventManager::$events[TerminalCommand::class] ?? []) as $v) {
if ($v->command == "help") {
self::$default_commands = true;
break;
}
}
$class = new Terminal();
$reader = new AnnotationReader();
$reflection = new ReflectionClass($class);
foreach ($reflection->getMethods() as $k => $v) {
$r = $reader->getMethodAnnotation($v, TerminalCommand::class);
if ($r !== null) {
Console::debug("adding command " . $r->command);
$r->class = Terminal::class;
$r->method = $v->getName();
EventManager::addEvent(TerminalCommand::class, $r);
}
}
self::$default_commands = true;
}
/**
* @TerminalCommand(command="help",alias="h",description="显示帮助菜单")
*/
public function help() {
$help = [];
foreach ((EventManager::$events[TerminalCommand::class] ?? []) as $v) {
/** @var TerminalCommand $v */
$cmd = $v->command . ($v->alias !== "" ? (" | " . $v->alias) : "");
$painted = Console::setColor($v->command, "green") . ($v->alias !== "" ? (" | " . Console::setColor($v->alias, "green")) : "");
$help[] = $painted . ":" . str_pad("", 16 - strlen($cmd) - 1) . ($v->description === "" ? "<无描述>" : $v->description);
}
echo implode("\n", $help) . PHP_EOL;
}
/**
* @TerminalCommand(command="status",description="显示Swoole Server运行状态需要安装league/climate组件")
* @noinspection PhpFullyQualifiedNameUsageInspection
*/
public function status() {
if (!class_exists("\League\CLImate\CLImate")) {
Console::warning("你还没有安装 league/climate 组件,无法使用此功能!");
return;
}
$climate = new \League\CLImate\CLImate;
$climate->output->addDefault('buffer');
$objs = server()->stats();
$climate->columns($objs);
$obj = $climate->output->get('buffer')->get();
$climate->output->get("buffer")->clean();
echo $obj;
}
/**
* @TerminalCommand(command="logtest",description="测试log的显示等级")
*/
public function testlog() {
Console::log(date("[H:i:s]") . " [L] This is normal msg. (0)");
Console::error("This is error msg. (0)");
Console::warning("This is warning msg. (1)");
Console::info("This is info msg. (2)");
Console::success("This is success msg. (2)");
Console::verbose("This is verbose msg. (3)");
Console::debug("This is debug msg. (4)");
}
/**
* @TerminalCommand(command="call",description="用于执行不需要参数的动态函数,比如 `call \Module\Example\Hello hitokoto`")
* @param $it
*/
public function call($it) {
$class_name = $it[1];
$function_name = $it[2];
$class = new $class_name([]);
$r = $class->$function_name();
if (is_string($r)) Console::success($r);
}
/**
* @TerminalCommand(command="level",description="设置log等级例如 `level 0|1|2|3|4`")
* @param $it
*/
public function level($it) {
$level = intval(is_numeric($it[1] ?? 99) ? ($it[1] ?? 99) : 99);
if ($level > 4 || $level < 0) Console::warning("Usage: 'level 0|1|2|3|4'");
else Console::setLevel($level) || Console::success("Success!!");
}
/**
* @TerminalCommand(command="bc",description="eval执行代码但输入必须是将代码base64之后的如 `bc em1faW5mbygn5L2g5aW9Jyk7`")
* @param $it
*/
public function bc($it) {
$code = base64_decode($it[1] ?? '', true);
try {
eval($code);
} catch (Exception $e) {
}
}
/**
* @TerminalCommand(command="echo",description="输出内容,用法:`echo hello`")
* @param $it
*/
public function echoI($it) {
Console::info($it[1]);
}
/**
* @TerminalCommand(command="stop",description="停止框架")
*/
public function stop() {
Process::kill(server()->master_pid, SIGTERM);
}
/**
* @TerminalCommand(command="reload",alias="r",description="重启框架(重载用户代码)")
*/
public function reload() {
Process::kill(server()->master_pid, SIGUSR1);
}
} }

View File

@ -27,15 +27,13 @@ class ZMUtil
Process::kill(zm_atomic("_#worker_" . $i)->get(), SIGUSR1); Process::kill(zm_atomic("_#worker_" . $i)->get(), SIGUSR1);
} }
server()->shutdown(); server()->shutdown();
server()->stop();
} }
/** /**
* @throws Exception * @throws Exception
*/ */
public static function reload() { public static function reload() {
zm_atomic("_int_is_reload")->set(1); Process::kill(server()->master_pid, SIGUSR1);
system("kill -INT " . intval(server()->master_pid));
} }
public static function getModInstance($class) { public static function getModInstance($class) {
@ -47,11 +45,6 @@ class ZMUtil
} }
} }
public static function sendActionToWorker($target_id, $action, $data) {
Console::verbose($action . ": " . $data);
server()->sendMessage(json_encode(["action" => $action, "data" => $data]), $target_id);
}
/** /**
* 在工作进程中返回可以通过reload重新加载的php文件列表 * 在工作进程中返回可以通过reload重新加载的php文件列表
* @return string[]|string[][] * @return string[]|string[][]

View File

@ -7,6 +7,7 @@ define("ZM_START_TIME", microtime(true));
define("ZM_DATA", ZMConfig::get("global", "zm_data")); define("ZM_DATA", ZMConfig::get("global", "zm_data"));
define("APP_VERSION", LOAD_MODE == 1 ? (json_decode(file_get_contents(DataProvider::getWorkingDir() . "/composer.json"), true)["version"] ?? "unknown") : "unknown"); define("APP_VERSION", LOAD_MODE == 1 ? (json_decode(file_get_contents(DataProvider::getWorkingDir() . "/composer.json"), true)["version"] ?? "unknown") : "unknown");
define("CRASH_DIR", ZMConfig::get("global", "crash_dir")); define("CRASH_DIR", ZMConfig::get("global", "crash_dir"));
define("MAIN_WORKER", ZMConfig::get("global", "worker_cache")["worker"] ?? 0);
@mkdir(ZM_DATA); @mkdir(ZM_DATA);
@mkdir(CRASH_DIR); @mkdir(CRASH_DIR);
@ -19,3 +20,9 @@ define("ZM_MATCH_SECOND", 3);
define("ZM_BREAKPOINT", 'if(\ZM\Framework::$argv["debug-mode"]) extract(\Psy\debug(get_defined_vars(), isset($this) ? $this : @get_called_class()));'); define("ZM_BREAKPOINT", 'if(\ZM\Framework::$argv["debug-mode"]) extract(\Psy\debug(get_defined_vars(), isset($this) ? $this : @get_called_class()));');
define("BP", ZM_BREAKPOINT); define("BP", ZM_BREAKPOINT);
define("ZM_DEFAULT_FETCH_MODE", 4); define("ZM_DEFAULT_FETCH_MODE", 4);
define("ZM_LOG_ERROR", 0);
define("ZM_LOG_WARNING", 1);
define("ZM_LOG_INFO", 2);
define("ZM_LOG_VERBOSE", 3);
define("ZM_LOG_DEBUG", 4);

View File

@ -285,6 +285,10 @@ function zm_timer_tick($ms, callable $callable) {
} }
} }
function zm_go(callable $callable) {
call_with_catch($callable);
}
function zm_data_hash($v): string { function zm_data_hash($v): string {
return md5($v["user_id"] . "^" . $v["self_id"] . "^" . $v["message_type"] . "^" . ($v[$v["message_type"] . "_id"] ?? $v["user_id"])); return md5($v["user_id"] . "^" . $v["self_id"] . "^" . $v["message_type"] . "^" . ($v[$v["message_type"] . "_id"] ?? $v["user_id"]));
} }