mirror of
https://github.com/zhamao-robot/zhamao-framework.git
synced 2026-03-18 05:04:51 +08:00
fix schedule swoole deadlock
This commit is contained in:
parent
70886ecd43
commit
dfe4613eac
@ -10,12 +10,7 @@ use ZM\Annotation\Framework\Cron;
|
||||
|
||||
class Schedule
|
||||
{
|
||||
/**
|
||||
* 排程任务列表
|
||||
*
|
||||
* @var Cron[]
|
||||
*/
|
||||
private array $schedules = [];
|
||||
private int $next_run = 0;
|
||||
|
||||
/**
|
||||
* 正在执行的排程任务列表
|
||||
@ -24,24 +19,15 @@ class Schedule
|
||||
*/
|
||||
private array $executing = [];
|
||||
|
||||
private bool $available;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$c = Adaptive::getCoroutine();
|
||||
if (!$c instanceof CoroutineInterface) {
|
||||
$this->available = $c instanceof CoroutineInterface;
|
||||
if (!$this->available) {
|
||||
logger()->error('排程任务只能在协程环境下使用');
|
||||
return;
|
||||
}
|
||||
|
||||
// 每秒检查一次,精度为一分钟,从最近的下一分钟开始
|
||||
$c->create(function () {
|
||||
/* @phpstan-ignore-next-line 协程会睡觉的,不会阻塞 */
|
||||
while (true) {
|
||||
$now = time();
|
||||
$this->run();
|
||||
$sleep_time = 60 - ($now % 60);
|
||||
Adaptive::sleep($sleep_time);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,37 +37,34 @@ class Schedule
|
||||
*/
|
||||
public function addSchedule(Cron $cron): void
|
||||
{
|
||||
$this->schedules[] = $cron;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取到期的排程任务
|
||||
*
|
||||
* @return Cron[]
|
||||
*/
|
||||
public function due(): array
|
||||
{
|
||||
return array_filter($this->schedules, fn (Cron $cron) => $cron->expression->isDue());
|
||||
}
|
||||
|
||||
/**
|
||||
* 运行排程任务
|
||||
*/
|
||||
public function run(): void
|
||||
{
|
||||
// 同时运行到期的排程任务
|
||||
foreach ($this->due() as $cron) {
|
||||
// 检查是否重叠执行
|
||||
if ($cron->no_overlap && in_array($cron, $this->executing, true)) {
|
||||
continue;
|
||||
}
|
||||
$this->executing[] = $cron;
|
||||
// 新建一个协程运行排程任务,避免阻塞
|
||||
Adaptive::getCoroutine()->create(function () use ($cron) {
|
||||
$callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method];
|
||||
container()->call($callable);
|
||||
$this->executing = array_diff($this->executing, [$cron]);
|
||||
});
|
||||
if (!$this->available) {
|
||||
return;
|
||||
}
|
||||
$next_run = $cron->expression->getNextRunDate()->getTimestamp();
|
||||
// 防止在同一分钟内重复执行
|
||||
if ($next_run < $this->next_run) {
|
||||
$next_run = $this->next_run;
|
||||
}
|
||||
$this->next_run = $cron->expression->getNextRunDate()->getTimestamp();
|
||||
$wait_ms = max(0, ($next_run - time()) * 1000);
|
||||
Timer::after($wait_ms, function () use ($cron) {
|
||||
$this->dispatch($cron);
|
||||
$this->addSchedule($cron);
|
||||
});
|
||||
}
|
||||
|
||||
public function dispatch(Cron $cron): void
|
||||
{
|
||||
// 检查是否重叠执行
|
||||
if ($cron->no_overlap && in_array($cron, $this->executing, true)) {
|
||||
return;
|
||||
}
|
||||
$this->executing[] = $cron;
|
||||
// 新建一个协程运行排程任务,避免阻塞
|
||||
Adaptive::getCoroutine()->create(function () use ($cron) {
|
||||
$callable = $cron->class === '' ? $cron->method : [$cron->class, $cron->method];
|
||||
container()->call($callable);
|
||||
$this->executing = array_diff($this->executing, [$cron]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user