Add Cron annotation event

This commit is contained in:
crazywhalecc 2022-03-26 14:51:57 +08:00
parent 971f03ae0f
commit bf7920cc15
5 changed files with 201 additions and 1 deletions

View File

@ -38,6 +38,7 @@
"ext-json": "*",
"ext-posix": "*",
"doctrine/dbal": "^2.13.1",
"dragonmantank/cron-expression": "^3.3",
"jelix/version": "^2.0",
"koriym/attributes": "^1.0",
"psy/psysh": "^0.11.2",

View File

@ -76,4 +76,6 @@
| E00072 | 上下文无法找到 | 检查上下文环境,如是否处于协程环境中 |
| E00073 | 在类中找不到方法 | 检查调用对象是否存在对应的方法method或检查是否插入了对应的macro宏方法 |
| E00074 | 参数非法 | 检查调用的参数是否正常(此处可能有多处问题,请看具体调用栈炸掉的地方) |
| E00075 | Cron表达式非法 | 检查 @Cron 中的表达式是否书写格式正确 |
| E00076 | Cron检查间隔非法 | 检查 @Cron 中的检查间隔毫秒是否在1000-60000之间 |
| E99999 | 未知错误 | |

View File

@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
namespace ZM\Annotation\Cron;
use Attribute;
use Doctrine\Common\Annotations\Annotation\NamedArgumentConstructor;
use Doctrine\Common\Annotations\Annotation\Required;
use Doctrine\Common\Annotations\Annotation\Target;
use ZM\Annotation\AnnotationBase;
/**
* @Annotation
* @NamedArgumentConstructor()
* @Target("METHOD")
*/
#[Attribute(Attribute::TARGET_METHOD | Attribute::IS_REPEATABLE)]
class Cron extends AnnotationBase
{
/**
* @var string
* @Required()
*/
public $expression;
/**
* @var int
*/
public $worker_id = 0;
/**
* @var int
*/
public $check_delay_time = 20000;
/**
* @var int
*/
public $max_iteration_count = 1000;
/**
* @var int Cron执行状态
*/
private $status = 0;
private $record_next_time = 0;
public function __construct(string $expression, int $worker_id = 0, int $check_delay_time = 20000, int $max_iteration_count = 1000)
{
$this->expression = $expression;
$this->worker_id = $worker_id;
$this->check_delay_time = $check_delay_time;
$this->max_iteration_count = $max_iteration_count;
}
public function getStatus(): int
{
return $this->status;
}
/**
* @internal
*/
public function setStatus(int $status): void
{
$this->status = $status;
}
/**
* @internal
*/
public function getRecordNextTime(): int
{
return $this->record_next_time;
}
/**
* @internal
*/
public function setRecordNextTime(int $record_next_time): void
{
$this->record_next_time = $record_next_time;
}
}

View File

@ -35,6 +35,7 @@ use ZM\Store\LightCacheInside;
use ZM\Store\MySQL\SqlPoolStorage;
use ZM\Store\Redis\ZMRedisPool;
use ZM\Utils\DataProvider;
use ZM\Utils\Manager\CronManager;
use ZM\Utils\Manager\ModuleManager;
use ZM\Utils\SignalListener;
@ -94,8 +95,9 @@ class OnWorkerStart implements SwooleEvent
}
$this->loadAnnotations(); // 加载composer资源、phar外置包、注解解析注册等
CronManager::initCronTasks(); // 初始化定时任务
EventManager::registerTimerTick(); // 启动计时器
set_coroutine_params(['server' => $server, 'worker_id' => $worker_id]);
$dispatcher = new EventDispatcher(OnStart::class);
$dispatcher->setRuleFunction(function ($v) {

View File

@ -0,0 +1,110 @@
<?php
declare(strict_types=1);
namespace ZM\Utils\Manager;
use Cron\CronExpression;
use Doctrine\Common\Annotations\AnnotationException;
use Error;
use Exception;
use InvalidArgumentException;
use Swoole\Timer;
use ZM\Annotation\Cron\Cron;
use ZM\Console\Console;
use ZM\Event\EventDispatcher;
use ZM\Event\EventManager;
use ZM\Exception\InterruptException;
use ZM\Store\ZMAtomic;
class CronManager
{
/**
* 初始化 Cron 注解
* 必须在 WorkerStart 事件中调用
*
* @throws Exception
* @internal
*/
public static function initCronTasks()
{
$dispatcher = new EventDispatcher(Cron::class);
$all = EventManager::$events[Cron::class] ?? [];
foreach ($all as $v) {
/** @var Cron $v */
if (server()->worker_id !== $v->worker_id && $v->worker_id != -1) {
return;
}
try {
if (strpos($v->expression, '\\') !== 0) {
$v->expression = str_replace('\\', '/', $v->expression);
}
$cron = new CronExpression($v->expression);
$cron->setMaxIterationCount($v->max_iteration_count);
$plain_class = $v->class;
Console::debug("Cron task checker starting {$plain_class}:{$v->method}, next run at {$cron->getNextRunDate()->format('Y-m-d H:i:s')}");
if ($v->check_delay_time > 60000 || $v->check_delay_time < 1000) {
Console::warning(zm_internal_errcode('E00076') . 'Delay time must be between 1000 and 60000, reset to 20000');
$v->check_delay_time = 20000;
}
} catch (InvalidArgumentException $e) {
Console::error(zm_internal_errcode('E00075') . 'Invalid cron expression or arguments, please check it!');
throw $e;
}
Timer::tick($v->check_delay_time, static function () use ($v, $dispatcher, $cron) {
set_coroutine_params([]);
if (ZMAtomic::get('stop_signal')->get() != 0) {
Timer::clearAll();
return;
}
try {
Console::debug('Cron: ' . ($cron->isDue() ? 'true' : 'false') . ', last: ' . $cron->getPreviousRunDate()->format('Y-m-d H:i:s') . ', next: ' . $cron->getNextRunDate()->format('Y-m-d H:i:s'));
if ($cron->isDue()) {
if ($v->getStatus() === 0) {
self::startExecute($v, $dispatcher, $cron);
} elseif ($v->getStatus() === 2) {
if ($v->getRecordNextTime() !== $cron->getNextRunDate()->getTimestamp()) {
self::startExecute($v, $dispatcher, $cron);
}
}
} else {
if ($v->getStatus() === 2 && $v->getRecordNextTime()) {
$v->setStatus(0);
}
}
} catch (Exception $e) {
Console::error(zm_internal_errcode('E00034') . 'Uncaught error from Cron: ' . $e->getMessage() . ' at ' . $e->getFile() . "({$e->getLine()})");
} catch (Error $e) {
Console::error(zm_internal_errcode('E00034') . 'Uncaught fatal error from Cron: ' . $e->getMessage());
echo Console::setColor($e->getTraceAsString(), 'gray');
Console::error('Please check your code!');
}
});
}
}
/**
* @throws InterruptException
* @throws AnnotationException
* @throws Exception
*/
private static function startExecute(Cron $v, EventDispatcher $dispatcher, CronExpression $cron)
{
Console::verbose("Cron task {$v->class}:{$v->method} is due, running at " . date('Y-m-d H:i:s') . ($v->getRecordNextTime() === 0 ? '' : (', offset ' . (time() - $v->getRecordNextTime()) . 's')));
$v->setStatus(1);
$starttime = microtime(true);
$pre_next_time = $cron->getNextRunDate()->getTimestamp();
$dispatcher->dispatchEvent($v, null, $cron);
Console::verbose("Cron task {$v->class}:{$v->method} is done, using " . round(microtime(true) - $starttime, 3) . 's');
if ($pre_next_time !== $cron->getNextRunDate()->getTimestamp()) { // 这一步用于判断运行的Cron是否已经覆盖到下一个运行区间
if (time() + round($v->check_delay_time / 1000) >= $pre_next_time) { // 假设检测到下一个周期运行时间已经要超过了预计的时间,则警告运行超时
Console::warning(zm_internal_errcode('E00077') . 'Cron task ' . $v->class . ':' . $v->method . ' is timeout');
}
} else {
Console::verbose('Next run at ' . date('Y-m-d H:i:s', $cron->getNextRunDate()->getTimestamp()));
}
$v->setRecordNextTime($pre_next_time);
$v->setStatus(2);
}
}