Compare commits

...

6 Commits
2.2.1 ... 2.2.4

Author SHA1 Message Date
jerry
0f9767aa16 update docs 2021-02-07 11:48:55 +08:00
jerry
0c9f246690 update to 2.2.4 version
update docs
fix broken ssh caused cpu overloading
fix WorkerCache bug when no global config
add global function zm_atomic
2021-02-07 11:46:42 +08:00
jerry
517d258d61 update to 2.2.3 version, I am tired
fix access_token not working
fix waitMessage() not working in v2.2.2
2021-01-30 00:06:42 +08:00
jerry
61e3818563 update to 2.2.2 version finally
clean redundant code
fix API reply in @OnTick for multi-process
fix loop error reporting
2021-01-29 23:34:34 +08:00
jerry
776ec98a3e fix waitMessage timeout bug 2021-01-29 22:32:29 +08:00
jerry
f3e844bb0a update to 2.2.2 version
fix QQBot error
clean code
2021-01-29 22:27:10 +08:00
34 changed files with 452 additions and 162 deletions

View File

@@ -3,7 +3,7 @@
"description": "High performance QQ robot and web server development framework",
"minimum-stability": "stable",
"license": "Apache-2.0",
"version": "2.2.1",
"version": "2.2.4",
"extra": {
"exclude_annotate": [
"src/ZM"

View File

@@ -0,0 +1,70 @@
# 框架多进程
首先对于多进程概念,对于传统 PHP 程序员可能比较陌生,唯一接触到的地方可能就是 php-fpm 等一些方式处理时间长的请求时开进程去执行。关于多进程,我觉得廖雪峰的 Python 多进程这段讲的不错:
> Unix/Linux 操作系统提供了一个`fork()`系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是`fork()`调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
这里面的重点在于,多进程的创建,是父进程的复制,然后两个进程接下来运行的代码和存的内容就分道扬镳了。
PHP 也是如此,框架的多进程又是怎么一回事呢?为什么要采用多进程呢?
## 作用
使用过框架的你一定知道,框架是以命令行方式运行 PHP 的,而命令行方式运行 PHP就代表要常驻内存就像 Python、Node.js 一样。而默认情况下,比如 Python 的 Flask 为单线程单进程模式,也就是说同时只能处理一个 Web 请求。但大部分情况下,比如 Node.js提供的都是异步 I/O这也就是说明它在 Web 处理请求上,可同时承接的 I/O 密集型请求会更多一些,这样在对一般的 Web 应用中 I/O 密集型场景非常有用,而且往往只需要单进程也可以承载上万的并发请求。
在炸毛框架中,因为框架基于 Swoole 构建,所以天然支持协程,而协程就是针对 I/O 操作进行一个调度,类似异步的 Node.js所以针对项目中存在太多的 SQL 语句执行、文件读写的话,炸毛框架直接上手,无需做任何修改,也可以达到很好的性能。
**但是**CPU 密集型的应用怎么办呢?假设我的 Web 应用有大量的排序、md5 运算怎么办呢?这样的阻塞,假设是一个超级大的 for 循环或者是要执行很长时间的 while 循环CPU 一直在被占用。多进程就是针对 CPU 密集型的应用说 yes 的一个方案。
![Untitled Diagram (1)](../assets/img/single-process.png)
我们假设现在有 3 个请求同时访问,也就是说上面的流程需要执行 3 遍。而如果我们只有一个进程的话,最后一个请求需要等待的时间为 `2*3+5*3=21` 秒,非常耗时。
而如果有两个进程处理 3 个请求,则最后一个完成的请求就缩短了,`2+5+2+5=14` 秒。
![Untitled Diagram (2)](../assets/img/Untitled Diagram (2).png)
所以如果要充分利用你的服务器或者个人电脑的多核 CPU 资源,就要设置多个进程来处理。一个进程只能在一个 CPU 上运行,而设置了多进程后,就可以让多核 CPU 充分运行多个进程,所以我们给框架设置多进程的推荐数值为等同于 CPU 的核心数。
## 为什么不是多线程
因为众所周知PHP 对线程的支持比较不好,而 ZTS 版本的 PHP 又会影响传统的 Web 端 PHP 的性能,再加上 Linux 对线程的切换效率和多进程切换的效率差不多,多线程容易造成数据读写不安全等问题,故 Swoole 使用的是多进程模型。
## 框架进程模型
![Untitled Diagram (3)](../assets/img/Untitled Diagram (3).png)
上图中,横向的时间片可以理解为并行执行,这些操作在多个 CPU 内可能同时在执行。
## 进程间隔离
众所周知,进程是程序在操作系统中的一个边界,和自己有关的一切变量、内容和代码都在自己的进程内,不同进程之间如果不使用管道等方式,是不可以互相访问的。而加上开始描述的,创建子进程是一个复制自身的过程,所以也就会有如下图的情况:
![Untitled Diagram (4)](../assets/img/Untitled Diagram (4).png)
我们以静态类为例,设置一个进程中的全局变量。这里就会出现,同一个静态变量在多个进程中完全不同的值的结果。此后,我们将会在 Worker 进程中执行用户的代码,如果设置 Worker 数量仅为 1 的话,那么就简单许多了,你还是可以使用全局变量或静态类来存储你想要的内容而不用担心这种多个进程变量隔离的情况(因为用户的 Web 请求处理的代码只会在一个 Worker 进程中执行)。如果像上图一样设置了多个 Worker则用户过来的比如 HTTP 请求就有可能出现在不同的 Worker 进程中,给全局变量设值就一定会造成不同步的问题。这时我们就不可以使用全局变量做数据同步(注意,我说的是数据同步)。
## 跨进程同步
跨进程同步方案中,框架给出了很多种解决方案。
- MySQL 数据库
- Redis
- LightCache 轻量缓存(共享内存)
- WorkerCache 大缓存
- ZMAtomic 跨进程原子计数器
下面的表格我将列出下方的特点和各自的优缺点:
| 类型 | 用途 | 优点 | 缺点 |
| ----------- | --------------------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ |
| MySQL | 大型的传统的关系式数据都可以用数据库,你懂的 | 就是数据库的优点 | 和数据库不在同一台服务器的话网络延迟会较大,数据获取效率不高 |
| Redis | 传统的 key-value 数据库 | 数据无同步等问题,性能高 | 有网络通信延迟 |
| LightCache | 框架封装的跨进程的 key-value 存储模型 | 性能强悍,无 I/O 和网络通信 | 需要提前分配最大内存大小,最大单个值长度大小,不灵活 |
| WorkerCache | 框架封装的基于进程的 key-value 存储模型,类似 Redis | 无需提前分配最大内存大小,受限于 PHP memory_limit | 见 WorkerCache 的说明 |
!!! note "WorkerCache 的说明"
对于 WorkerCache 来说其实是比较特殊的进程间通信。具体来说就是WorkerCache 的原理就是将变量指定的存到一个进程中,如果是本进程读写的话直接相当于改一下全局变量,如果是其他进程读写的话,则依靠进程间通信。
所以缺点也显而易见,如果使用过程中不是命中了 WorkerCache 存储所在的进程的话,则一直会使用进程间通信,影响一定的效率。

224
docs/assets/face_id.html Normal file

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 109 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

View File

@@ -119,7 +119,7 @@ $str = CQ::removeCQ("[CQ:at,qq=all]这是带表情的全体消息[CQ:face,id=8]"
定义:`CQ::face($id)`
参数:`$id` 为 QQ 表情对应的 ID 号,一些常见的表情 ID 对应的表情样式见 [炸毛框架 1.x 版本文档](https://docs-v1.zhamao.xin/face_list.html)。
参数:`$id` 为 QQ 表情对应的 ID 号,一些常见的表情 ID 对应的表情样式见 [QQ 对应表情ID表](/assets/face_id.html)。
```php
/**

View File

@@ -292,7 +292,7 @@ class Hello {
* @CQCommand("set_store")
*/
public function setStorage() {
$arg1 = ctx()->getFullArg("请输入要设置的内容名称");
$arg1 = ctx()->getNextArg("请输入要设置的内容名称");
$arg2 = ctx()->getFullArg("请输入要设置的内容");
WorkerCache::set($arg1, $arg2);
return "成功!";

View File

@@ -246,7 +246,7 @@
```php
<?php
namespace Module\Example;
use ZM\Annotation\Swoole\OnSwooleEvent;
use ZM\Annotation\Swoole\OnOpenEvent;
use ZM\ConnectionManager\ConnectionObject;
use ZM\Console\Console;
class Hello {

View File

@@ -42,7 +42,7 @@ function setCookie(name, value) {
var Days = 30;
var exp = new Date();
exp.setTime(exp.getTime() + Days * 24 * 60 * 60 * 1000);
document.cookie = name + "=" + escape(value) + ";expires=" + exp.toGMTString();
document.cookie = name + "=" + escape(value) + ";expires=" + exp.toGMTString() + ";path=/";
}
s_theme=getCookie("_theme");

View File

@@ -1,5 +1,29 @@
# 更新日志v2 版本)
## v2.2.4
> 更新事件2021.2.7
- 修复:终端交互导致的 ssh 断掉后 CPU 占用过高的问题
- 修复WorkerCache 在缺少配置文件下工作异常的问题
- 新增:全局函数:`zm_atomic()`
## v2.2.3
> 更新时间2021.1.30
- 修复waitMessage() 在 v2.2.2 版本中不可用的 bug
- 修复access_token 无效的问题
## v2.2.2
> 更新时间2021.1.29
- 修复:模块文件错误时避免循环报错
- 优化:代码结构
- 修复:在不同进程时调用机器人 API 无法返回且报错的 bug
- **修复机器人无法连接的问题2.1.6 ~ 2.2.1 受影响)**
## v2.2.1
> 更新时间2021.1.29

View File

@@ -34,7 +34,7 @@ extra:
version:
method: mike
copyright: 'Copyright &copy; 2019 - 2020 CrazyBot Team&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span class="tx-switch">
copyright: 'Copyright &copy; 2019 - 2021 CrazyBot Team&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<span class="tx-switch">
<button data-md-color-scheme="default"><code>默认模式</code></button>
<button data-md-color-scheme="slate"><code>暗黑模式</code></button>
</span>

View File

@@ -1,31 +0,0 @@
<?php
namespace Custom\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class CustomCommand extends Command
{
// the name of the command (the part after "bin/console")
protected static $defaultName = 'custom';
protected function configure() {
$this->setDescription("custom description | 自定义命令的描述字段");
$this->addOption("failure", null, null, "以错误码为1返回结果");
// ...
}
protected function execute(InputInterface $input, OutputInterface $output) {
if ($input->getOption("failure")) {
$output->writeln("<error>Hello error! I am wrong message.</error>");
return Command::FAILURE;
} else {
$output->writeln("<comment>Hello world! I am successful message.</comment>");
return Command::SUCCESS;
}
}
}

View File

@@ -1,4 +1,4 @@
<?php #plain
<?php /** @noinspection PhpFullyQualifiedNameUsageInspection */ #plain
//这里写你的全局函数
function pgo(callable $func, $name = "default") {

View File

@@ -3,17 +3,16 @@
namespace ZM\API;
use Co;
use ZM\ConnectionManager\ConnectionObject;
use ZM\Console\Console;
use ZM\Store\LightCacheInside;
use ZM\Store\Lock\SpinLock;
use ZM\Store\ZMAtomic;
use ZM\Utils\CoMessage;
trait CQAPI
{
/**
* @param ConnectionObject $connection
* @param $connection
* @param $reply
* @param |null $function
* @return bool|array
@@ -35,21 +34,20 @@ trait CQAPI
$r[$api_id] = [
"data" => $reply,
"time" => microtime(true),
"self_id" => $connection->getOption("connect_id")
"self_id" => $connection->getOption("connect_id"),
"echo" => $api_id
];
if ($function === true) $r[$api_id]["coroutine"] = Co::getuid();
LightCacheInside::set("wait_api", "wait_api", $r);
SpinLock::unlock("wait_api");
if (server()->push($connection->getFd(), json_encode($reply))) {
if ($function === true) {
Co::suspend();
return CoMessage::yieldByWS($r[$api_id], ["echo"], 60);
} else {
SpinLock::lock("wait_api");
$r = LightCacheInside::get("wait_api", "wait_api");
$data = $r[$api_id];
unset($r[$api_id]);
LightCacheInside::set("wait_api", "wait_api", $r);
SpinLock::unlock("wait_api");
return isset($data['result']) ? $data['result'] : null;
}
return true;
} else {

View File

@@ -228,9 +228,9 @@ class ZMRobot
/**
* 群组单人禁言
* @link https://github.com/howmanybots/onebot/blob/master/v11/specs/api/public.md#set_group_ban-%E7%BE%A4%E7%BB%84%E5%8D%95%E4%BA%BA%E7%A6%81%E8%A8%80
* @param int $group_id
* @param int $user_id
* @param int $duration
* @param $group_id
* @param $user_id
* @param $duration
* @return array|bool|null
*/
public function setGroupBan($group_id, $user_id, $duration = 1800) {

View File

@@ -9,11 +9,10 @@ use ZM\Console\Console;
use ReflectionClass;
use ReflectionException;
use ReflectionMethod;
use ZM\Annotation\Http\{HandleAfter, HandleBefore, Controller, HandleException, Middleware, MiddlewareClass, RequestMapping};
use ZM\Annotation\Http\{HandleAfter, HandleBefore, HandleException, Middleware, MiddlewareClass, RequestMapping};
use ZM\Annotation\Interfaces\Level;
use ZM\Annotation\Module\Closed;
use ZM\Http\RouteManager;
use ZM\Utils\DataProvider;
class AnnotationParser
{
@@ -91,6 +90,7 @@ class AnnotationParser
if ($vs instanceof ErgodicAnnotation) {
foreach (($this->annotation_map[$v]["methods"] ?? []) as $method) {
$copy = clone $vs;
/** @noinspection PhpUndefinedFieldInspection */
$copy->method = $method->getName();
$this->annotation_map[$v]["methods_annotations"][$method->getName()][] = $copy;
}

View File

@@ -3,7 +3,6 @@
namespace ZM\Annotation\CQ;
use Doctrine\Common\Annotations\Annotation\Required;
use Doctrine\Common\Annotations\Annotation\Target;
use ZM\Annotation\AnnotationBase;
use ZM\Annotation\Interfaces\Level;

View File

@@ -4,7 +4,6 @@
namespace ZM\Annotation\Http;
use Doctrine\Common\Annotations\Annotation\Required;
use Doctrine\Common\Annotations\Annotation\Target;
use ZM\Annotation\AnnotationBase;

View File

@@ -5,7 +5,6 @@ namespace ZM\Annotation\Swoole;
use Doctrine\Common\Annotations\Annotation\Target;
use ZM\Annotation\Interfaces\Rule;
/**
* @Annotation

View File

@@ -6,7 +6,6 @@ namespace ZM\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use ZM\Utils\DataProvider;
class DaemonStatusCommand extends DaemonCommand
{

View File

@@ -145,7 +145,7 @@ class Context implements ContextInterface
if ($prompt != "") $this->reply($prompt);
try {
$r = CoMessage::yieldByWS($this->getData(), ["user_id", "self_id", "message_type", onebot_target_id_name($this->getMessageType())]);
$r = CoMessage::yieldByWS($this->getData(), ["user_id", "self_id", "message_type", onebot_target_id_name($this->getMessageType())], $timeout);
} catch (Exception $e) {
$r = false;
}

View File

@@ -31,7 +31,6 @@ class DB
/**
* @param $table_name
* @param bool $enable_cache
* @return Table
* @throws DbException
*/
@@ -95,6 +94,7 @@ class DB
$ps = $conn->prepare($line);
if ($ps === false) {
SqlPoolStorage::$sql_pool->put(null);
/** @noinspection PhpUndefinedFieldInspection */
throw new DbException("SQL语句查询错误" . $line . ",错误信息:" . $conn->error);
} else {
if (!($ps instanceof PDOStatement) && !($ps instanceof PDOStatementProxy)) {

View File

@@ -47,7 +47,7 @@ class Table
return new DeleteBody($this);
}
public function statement($line){
public function statement(){
$this->cache = [];
//TODO: 无返回的statement语句
}

View File

@@ -87,7 +87,6 @@ class EventDispatcher
/**
* @param mixed ...$params
* @throws Exception
* @throws InterruptException
*/
public function dispatchEvents(...$params) {
try {
@@ -104,10 +103,7 @@ class EventDispatcher
} catch (InterruptException $e) {
$this->store = $e->return_var;
$this->status = self::STATUS_INTERRUPTED;
} catch (Exception $e) {
$this->status = self::STATUS_EXCEPTION;
throw $e;
} catch (Error $e) {
} catch (Exception | Error $e) {
$this->status = self::STATUS_EXCEPTION;
throw $e;
}

View File

@@ -62,7 +62,12 @@ class ServerEventHandler
if ($terminal_id !== null) {
ZMBuf::$terminal = $r = STDIN;
Event::add($r, function () use ($r) {
$var = trim(fgets($r));
$fget = fgets($r);
if ($fget === false) {
Event::del($r);
return;
}
$var = trim($fget);
try {
Terminal::executeCommand($var, $r);
} catch (Exception $e) {
@@ -74,17 +79,17 @@ class ServerEventHandler
}
Process::signal(SIGINT, function () use ($r) {
echo "\r";
Console::warning("Server interrupted by keyboard on Master.");
Console::warning("Server interrupted(SIGINT) on Master.");
if ((Framework::$server->inotify ?? null) !== null)
/** @noinspection PhpUndefinedFieldInspection */ Event::del(Framework::$server->inotify);
ZMUtil::stop();
});
if(Framework::$argv["daemon"]) {
if (Framework::$argv["daemon"]) {
$daemon_data = json_encode([
"pid" => \server()->master_pid,
"stdout" => ZMConfig::get("global")["swoole"]["log_file"]
],128|256);
file_put_contents(DataProvider::getWorkingDir()."/.daemon_pid", $daemon_data);
], 128 | 256);
file_put_contents(DataProvider::getWorkingDir() . "/.daemon_pid", $daemon_data);
}
if (Framework::$argv["watch"]) {
if (extension_loaded('inotify')) {
@@ -238,7 +243,7 @@ class ServerEventHandler
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
ZMUtil::stop();
posix_kill($server->master_pid, SIGINT);
}
} else {
// 这里是TaskWorker初始化的内容部分
@@ -255,7 +260,7 @@ class ServerEventHandler
Console::error("PHP Error: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
Console::error("Maybe it caused by your own code if in your own Module directory.");
Console::log($e->getTraceAsString(), 'gray');
ZMUtil::stop();
posix_kill($server->master_pid, SIGINT);
}
}
}
@@ -314,7 +319,7 @@ class ServerEventHandler
*/
public function onRequest(?Request $request, ?\Swoole\Http\Response $response) {
$response = new Response($response);
foreach(ZMConfig::get("global")["http_header"] as $k => $v) {
foreach (ZMConfig::get("global")["http_header"] as $k => $v) {
$response->setHeader($k, $v);
}
unset(Context::$context[Co::getCid()]);
@@ -401,6 +406,14 @@ class ServerEventHandler
Console::debug("Calling Swoole \"open\" event from fd=" . $request->fd);
unset(Context::$context[Co::getCid()]);
$type = strtolower($request->header["x-client-role"] ?? $request->get["type"] ?? "");
$access_token = explode(" ", $request->header["authorization"] ?? $request->get["token"] ?? "")[1] ?? "";
if (($a = ZMConfig::get("global", "access_token")) != "") {
if ($access_token !== $a) {
$server->close($request->fd);
Console::warning("Unauthorized access_token: " . $access_token);
return;
}
}
$type_conn = ManagerGM::getTypeClassName($type);
ManagerGM::pushConnect($request->fd, $type_conn);
$conn = ManagerGM::get($request->fd);
@@ -491,16 +504,20 @@ class ServerEventHandler
/**
* @SwooleHandler("pipeMessage")
* @param $server
* @param Server $server
* @param $src_worker_id
* @param $data
* @throws InterruptException
* @throws Exception
*/
public function onPipeMessage(Server $server, $src_worker_id, $data) {
//var_dump($data, $server->worker_id);
//unset(Context::$context[Co::getCid()]);
$data = json_decode($data, true);
switch ($data["action"] ?? '') {
case "resume_ws_message":
$obj = $data["data"];
Co::resume($obj["coroutine"]);
break;
case "getWorkerCache":
$r = WorkerCache::get($data["key"]);
$action = ["action" => "returnWorkerCache", "cid" => $data["cid"], "value" => $r];
@@ -557,10 +574,11 @@ class ServerEventHandler
* @param Server|null $server
* @param Server\Task $task
* @return mixed
* @noinspection PhpUnusedParameterInspection
*/
public function onTask(?Server $server, Server\Task $task) {
$data = $task->data;
switch($data["action"]) {
switch ($data["action"]) {
case "runMethod":
$c = $data["class"];
$ss = new $c();

View File

@@ -4,8 +4,6 @@
namespace ZM\Http;
use ZM\Console\Console;
class Response
{

View File

@@ -3,7 +3,7 @@
namespace ZM\Module;
use Swoole\Coroutine;
use Exception;
use ZM\Annotation\CQ\CQAPIResponse;
use ZM\Annotation\CQ\CQBefore;
use ZM\Annotation\CQ\CQCommand;
@@ -14,8 +14,6 @@ use ZM\Annotation\CQ\CQRequest;
use ZM\Event\EventDispatcher;
use ZM\Exception\InterruptException;
use ZM\Exception\WaitTimeoutException;
use ZM\Store\LightCacheInside;
use ZM\Store\Lock\SpinLock;
use ZM\Utils\CoMessage;
/**
@@ -26,27 +24,27 @@ class QQBot
{
/**
* @throws InterruptException
* @throws Exception
*/
public function handle() {
try {
$data = json_decode(context()->getFrame()->data, true);
set_coroutine_params(["data" => $data]);
if (isset($data["post_type"])) {
//echo TermColor::ITALIC.json_encode($data, 128|256).TermColor::RESET.PHP_EOL;
set_coroutine_params(["data" => $data]);
ctx()->setCache("level", 0);
//Console::debug("Calling CQ Event from fd=" . ctx()->getConnection()->getFd());
if ($data["post_type"] != "meta_event") {
$r = $this->dispatchBeforeEvents($data); // before在这里执行元事件不执行before为减少不必要的调试日志
if ($r->store === "block") EventDispatcher::interrupt();
}
if (CoMessage::resumeByWS()) {
EventDispatcher::interrupt();
}
//Console::warning("最上数据包:".json_encode($data));
$this->dispatchEvents($data);
} else {
$this->dispatchAPIResponse($data);
}
if (isset($data["echo"]) || isset($data["post_type"])) {
if (CoMessage::resumeByWS()) EventDispatcher::interrupt();
}
if (isset($data["post_type"])) $this->dispatchEvents($data);
else $this->dispatchAPIResponse($data);
} /** @noinspection PhpRedundantCatchClauseInspection */ catch (WaitTimeoutException $e) {
$e->module->finalReply($e->getMessage());
}
@@ -55,7 +53,7 @@ class QQBot
/**
* @param $data
* @return EventDispatcher
* @throws InterruptException
* @throws Exception
*/
public function dispatchBeforeEvents($data) {
$before = new EventDispatcher(CQBefore::class);
@@ -177,45 +175,16 @@ class QQBot
}
}
/**
* @param $req
* @throws Exception
*/
private function dispatchAPIResponse($req) {
$status = $req["status"];
$retcode = $req["retcode"];
$data = $req["data"];
if (isset($req["echo"]) && is_numeric($req["echo"])) {
$r = LightCacheInside::get("wait_api", "wait_api");
if (isset($r[$req["echo"]])) {
$origin = $r[$req["echo"]];
$self_id = $origin["self_id"];
$response = [
"status" => $status,
"retcode" => $retcode,
"data" => $data,
"self_id" => $self_id,
"echo" => $req["echo"]
];
set_coroutine_params(["cq_response" => $response]);
$dispatcher = new EventDispatcher(CQAPIResponse::class);
$dispatcher->setRuleFunction(function (CQAPIResponse $response) {
return $response->retcode == ctx()->getCQResponse()["retcode"];
});
$dispatcher->dispatchEvents($response);
$origin_ctx = ctx()->copy();
set_coroutine_params($origin_ctx);
if (($origin["coroutine"] ?? false) !== false) {
SpinLock::lock("wait_api");
$r = LightCacheInside::get("wait_api", "wait_api");
$r[$req["echo"]]["result"] = $response;
LightCacheInside::set("wait_api", "wait_api", $r);
SpinLock::unlock("wait_api");
Coroutine::resume($origin['coroutine']);
}
SpinLock::lock("wait_api");
$r = LightCacheInside::get("wait_api", "wait_api");
unset($r[$req["echo"]]);
LightCacheInside::set("wait_api", "wait_api", $r);
SpinLock::unlock("wait_api");
}
}
set_coroutine_params(["cq_response" => $req]);
$dispatcher = new EventDispatcher(CQAPIResponse::class);
$dispatcher->setRuleFunction(function (CQAPIResponse $response) {
return $response->retcode == ctx()->getCQResponse()["retcode"];
});
$dispatcher->dispatchEvents($req);
}
}

View File

@@ -23,9 +23,10 @@ class LightCacheInside
$result = self::$kv_table["wait_api"]->create() && self::$kv_table["connect"]->create();
if ($result === false) {
self::$last_error = '系统内存不足,申请失败';
return $result;
return false;
} else {
return true;
}
return $result;
}
/**

View File

@@ -29,72 +29,58 @@ class WorkerCache
}
public static function set($key, $value, $async = false) {
$config = self::$config ?? ZMConfig::get("global", "worker_cache");
$config = self::$config ?? ZMConfig::get("global", "worker_cache") ?? ["worker" => 0];
if ($config["worker"] === server()->worker_id) {
self::$store[$key] = $value;
return true;
} else {
$action = ["action" => $async ? "asyncSetWorkerCache" : "setWorkerCache", "key" => $key, "value" => $value, "cid" => zm_cid()];
$ss = server()->sendMessage(json_encode($action, JSON_UNESCAPED_UNICODE), $config["worker"]);
if(!$ss) return false;
if ($async) return true;
zm_yield();
$p = self::$transfer[zm_cid()] ?? null;
unset(self::$transfer[zm_cid()]);
return $p;
return self::processRemote($action, $async, $config);
}
}
private static function processRemote($action, $async, $config) {
$ss = server()->sendMessage(json_encode($action, JSON_UNESCAPED_UNICODE), $config["worker"]);
if(!$ss) return false;
if ($async) return true;
zm_yield();
$p = self::$transfer[zm_cid()] ?? null;
unset(self::$transfer[zm_cid()]);
return $p;
}
public static function unset($key, $async = false) {
$config = self::$config ?? ZMConfig::get("global", "worker_cache");
$config = self::$config ?? ZMConfig::get("global", "worker_cache") ?? ["worker" => 0];
if ($config["worker"] === server()->worker_id) {
unset(self::$store[$key]);
return true;
} else {
$action = ["action" => $async ? "asyncUnsetWorkerCache" : "unsetWorkerCache", "key" => $key, "cid" => zm_cid()];
$ss = server()->sendMessage(json_encode($action, JSON_UNESCAPED_UNICODE), $config["worker"]);
if(!$ss) return false;
if ($async) return true;
zm_yield();
$p = self::$transfer[zm_cid()] ?? null;
unset(self::$transfer[zm_cid()]);
return $p;
return self::processRemote($action, $async, $config);
}
}
public static function add($key, int $value, $async = false) {
$config = self::$config ?? ZMConfig::get("global", "worker_cache");
$config = self::$config ?? ZMConfig::get("global", "worker_cache") ?? ["worker" => 0];
if ($config["worker"] === server()->worker_id) {
if(!isset(self::$store[$key])) self::$store[$key] = 0;
self::$store[$key] += $value;
return true;
} else {
$action = ["action" => $async ? "asyncAddWorkerCache" : "addWorkerCache", "key" => $key, "value" => $value, "cid" => zm_cid()];
$ss = server()->sendMessage(json_encode($action, JSON_UNESCAPED_UNICODE), $config["worker"]);
// if(!$ss) return false;
if ($async) return true;
zm_yield();
$p = self::$transfer[zm_cid()] ?? null;
unset(self::$transfer[zm_cid()]);
return $p;
return self::processRemote($action, $async, $config);
}
}
public static function sub($key, int $value, $async = false) {
$config = self::$config ?? ZMConfig::get("global", "worker_cache");
$config = self::$config ?? ZMConfig::get("global", "worker_cache") ?? ["worker" => 0];
if ($config["worker"] === server()->worker_id) {
if(!isset(self::$store[$key])) self::$store[$key] = 0;
self::$store[$key] -= $value;
return true;
} else {
$action = ["action" => $async ? "asyncSubWorkerCache" : "subWorkerCache", "key" => $key, "value" => $value, "cid" => zm_cid()];
$ss = server()->sendMessage(json_encode($action, JSON_UNESCAPED_UNICODE), $config["worker"]);
// if(!$ss) return false;
if ($async) return true;
zm_yield();
$p = self::$transfer[zm_cid()] ?? null;
unset(self::$transfer[zm_cid()]);
return $p;
return self::processRemote($action, $async, $config);
}
}
}

View File

@@ -16,7 +16,7 @@ class CoMessage
* @param array $hang
* @param array $compare
* @param int $timeout
* @return bool
* @return mixed
* @throws Exception
*/
public static function yieldByWS(array $hang, array $compare, $timeout = 600) {
@@ -48,4 +48,35 @@ class CoMessage
if ($result === null) return false;
return $result;
}
public static function resumeByWS() {
$dat = ctx()->getData();
$last = null;
SpinLock::lock("wait_api");
$all = LightCacheInside::get("wait_api", "wait_api") ?? [];
foreach ($all as $k => $v) {
if(!isset($v["compare"])) continue;
foreach ($v["compare"] as $vs) {
if (!isset($v[$vs], $dat[$vs])) continue 2;
if ($v[$vs] != $dat[$vs]) {
continue 2;
}
}
$last = $k;
}
if($last !== null) {
$all[$last]["result"] = $dat;
LightCacheInside::set("wait_api", "wait_api", $all);
SpinLock::unlock("wait_api");
if ($all[$last]["worker_id"] != server()->worker_id) {
ZMUtil::sendActionToWorker($all[$last]["worker_id"], "resume_ws_message", $all[$last]);
} else {
Co::resume($all[$last]["coroutine"]);
}
return true;
} else {
SpinLock::unlock("wait_api");
return false;
}
}
}

View File

@@ -51,4 +51,8 @@ class ZMUtil
return ZMBuf::$instance[$class];
}
}
public static function sendActionToWorker($target_id, $action, $data) {
server()->sendMessage(json_encode(["action" => $action, "data" => $data]), $target_id);
}
}

View File

@@ -24,6 +24,7 @@ function phar_classloader($p) {
return;
}
try {
/** @noinspection PhpIncludeInspection */
require_once $filepath;
} catch (Exception $e) {
echo "Error when finding class: " . $p . PHP_EOL;
@@ -75,7 +76,7 @@ function unicode_decode($str) {
/**
* 获取模块文件夹下的每个类文件的类名称
* @param $dir
* @param string $indoor_name
* @param $indoor_name
* @return array
*/
function getAllClasses($dir, $indoor_name) {
@@ -95,6 +96,7 @@ function getAllClasses($dir, $indoor_name) {
continue 2;
}
}
if ($v == "global_function.php") continue;
$class_name = $indoor_name . "\\" . mb_substr($v, 0, -4);
$classes [] = $class_name;
}
@@ -308,3 +310,7 @@ function getAllFdByConnectType(string $type = 'default'): array {
}
return $fds;
}
function zm_atomic($name) {
return \ZM\Store\ZMAtomic::get($name);
}