diff --git a/src/ZM/Schedule/Schedule.php b/src/ZM/Schedule/Schedule.php index 3ad006eb..586efd12 100644 --- a/src/ZM/Schedule/Schedule.php +++ b/src/ZM/Schedule/Schedule.php @@ -10,12 +10,7 @@ use ZM\Annotation\Framework\Cron; class Schedule { - /** - * 排程任务列表 - * - * @var Cron[] - */ - private array $schedules = []; + private int $next_run = 0; /** * 正在执行的排程任务列表 @@ -24,24 +19,15 @@ class Schedule */ private array $executing = []; + private bool $available; + public function __construct() { $c = Adaptive::getCoroutine(); - if (!$c instanceof CoroutineInterface) { + $this->available = $c instanceof CoroutineInterface; + if (!$this->available) { logger()->error('排程任务只能在协程环境下使用'); - return; } - - // 每秒检查一次,精度为一分钟,从最近的下一分钟开始 - $c->create(function () { - /* @phpstan-ignore-next-line 协程会睡觉的,不会阻塞 */ - while (true) { - $now = time(); - $this->run(); - $sleep_time = 60 - ($now % 60); - Adaptive::sleep($sleep_time); - } - }); } /** @@ -51,37 +37,34 @@ class Schedule */ public function addSchedule(Cron $cron): void { - $this->schedules[] = $cron; - } - - /** - * 获取到期的排程任务 - * - * @return Cron[] - */ - public function due(): array - { - return array_filter($this->schedules, fn (Cron $cron) => $cron->expression->isDue()); - } - - /** - * 运行排程任务 - */ - public function run(): void - { - // 同时运行到期的排程任务 - foreach ($this->due() as $cron) { - // 检查是否重叠执行 - if ($cron->no_overlap && in_array($cron, $this->executing, true)) { - continue; - } - $this->executing[] = $cron; - // 新建一个协程运行排程任务,避免阻塞 - Adaptive::getCoroutine()->create(function () use ($cron) { - $callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method]; - container()->call($callable); - $this->executing = array_diff($this->executing, [$cron]); - }); + if (!$this->available) { + return; } + $next_run = $cron->expression->getNextRunDate()->getTimestamp(); + // 防止在同一分钟内重复执行 + if ($next_run < $this->next_run) { + $next_run = $this->next_run; + } + $this->next_run = $cron->expression->getNextRunDate()->getTimestamp(); + $wait_ms = max(0, ($next_run - time()) * 1000); + Timer::after($wait_ms, function () use ($cron) { + $this->dispatch($cron); + $this->addSchedule($cron); + }); + } + + public function dispatch(Cron $cron): void + { + // 检查是否重叠执行 + if ($cron->no_overlap && in_array($cron, $this->executing, true)) { + return; + } + $this->executing[] = $cron; + // 新建一个协程运行排程任务,避免阻塞 + Adaptive::getCoroutine()->create(function () use ($cron) { + $callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method]; + container()->call($callable); + $this->executing = array_diff($this->executing, [$cron]); + }); } }