Skip to content

Commit 94567a5

Browse files
committed
feat: add support for job scheduling via db queues
1 parent fd74cb5 commit 94567a5

File tree

5 files changed

+357
-3
lines changed

5 files changed

+357
-3
lines changed

composer.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,8 @@
3737
},
3838
"scripts": {
3939
"format": "vendor/bin/php-cs-fixer fix --config=.php_cs.dist.php --allow-risky=yes"
40-
}
40+
},
41+
"require": {
42+
"dragonmantank/cron-expression": "^3.4"
43+
}
4144
}

src/Job.php

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ abstract class Job implements Dispatchable
2727
*/
2828
protected string $connection = 'default';
2929

30+
/**
31+
* Schedule to run the job (cron or interval)
32+
*/
33+
protected $schedule = null;
34+
3035
/**
3136
* Number of seconds to wait before processing a job
3237
*/
@@ -217,4 +222,144 @@ public function removeFromQueue()
217222
{
218223
$this->queue->pop($this->job['id']);
219224
}
225+
226+
public function schedule()
227+
{
228+
return null;
229+
}
230+
231+
public function cron($expression)
232+
{
233+
$this->schedule = $expression;
234+
return $expression;
235+
}
236+
237+
/**
238+
* Add a recurring interval to the job schedule
239+
* @param string{minute|hour|day|week|month|year} $interval
240+
* @throws \Exception
241+
* @return static
242+
*/
243+
public function every(string $interval)
244+
{
245+
if (!in_array($interval, ['minute', 'hour', 'day', 'week', 'month', 'year'])) {
246+
throw new \Exception("Invalid interval: {$interval}");
247+
}
248+
249+
if ($interval === 'minute') {
250+
$interval = '* * * * *';
251+
} elseif ($interval === 'hour') {
252+
$interval = '0 * * * *';
253+
} elseif ($interval === 'day') {
254+
$interval = '0 0 * * *';
255+
} elseif ($interval === 'week') {
256+
$interval = '0 0 * * 0';
257+
} elseif ($interval === 'month') {
258+
$interval = '0 0 1 * *';
259+
} elseif ($interval === 'year') {
260+
$interval = '0 0 1 1 *';
261+
}
262+
263+
$this->schedule = $interval;
264+
265+
return $this;
266+
}
267+
268+
/**
269+
* Add a day to the job schedule (only if interval is set)
270+
* @param string{monday|tuesday|wednesday|thursday|friday|saturday|sunday} $day
271+
* @throws \Exception
272+
* @return static
273+
*/
274+
public function on(string $day)
275+
{
276+
if (!in_array(strtolower($day), ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday'])) {
277+
throw new \Exception("Invalid day: {$day}");
278+
}
279+
280+
if ($this->schedule === null) {
281+
throw new \Exception("You must set an interval before setting a day. E.g. ->every('week')->on('monday')");
282+
}
283+
284+
if (strtolower($day) === 'sunday') {
285+
$dayNumber = 0;
286+
} elseif (strtolower($day) === 'monday') {
287+
$dayNumber = 1;
288+
} elseif (strtolower($day) === 'tuesday') {
289+
$dayNumber = 2;
290+
} elseif (strtolower($day) === 'wednesday') {
291+
$dayNumber = 3;
292+
} elseif (strtolower($day) === 'thursday') {
293+
$dayNumber = 4;
294+
} elseif (strtolower($day) === 'friday') {
295+
$dayNumber = 5;
296+
} elseif (strtolower($day) === 'saturday') {
297+
$dayNumber = 6;
298+
}
299+
300+
$parts = explode(' ', $this->schedule);
301+
302+
if (count($parts) !== 5) {
303+
throw new \Exception("Invalid schedule format: {$this->schedule}");
304+
}
305+
306+
$parts[4] = $dayNumber;
307+
308+
$this->schedule = implode(' ', $parts);
309+
310+
return $this;
311+
}
312+
313+
/**
314+
* Add a recurring interval in minutes to the job schedule
315+
* @param int $minutes
316+
* @throws \Exception
317+
* @return static
318+
*/
319+
public function inMinutes(int $minutes)
320+
{
321+
if ($minutes < 1 || $minutes > 59) {
322+
throw new \Exception("Invalid minutes: {$minutes}");
323+
}
324+
325+
$parts = explode(' ', $this->schedule);
326+
327+
if (count($parts) !== 5) {
328+
throw new \Exception("Invalid schedule format: {$this->schedule}");
329+
}
330+
331+
$parts[0] = "*/{$minutes}";
332+
333+
$this->schedule = implode(' ', $parts);
334+
335+
return $this;
336+
}
337+
338+
public function at(string $time)
339+
{
340+
if (!preg_match('/^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$/', $time, $matches)) {
341+
throw new \Exception("Invalid time format: {$time}. Expected format is HH:MM in 24-hour format.");
342+
}
343+
344+
$hour = (int) $matches[1];
345+
$minute = (int) $matches[2];
346+
347+
$parts = explode(' ', $this->schedule);
348+
349+
if (count($parts) !== 5) {
350+
throw new \Exception("Invalid schedule format: {$this->schedule}");
351+
}
352+
353+
$parts[0] = (string) $minute;
354+
$parts[1] = (string) $hour;
355+
356+
$this->schedule = implode(' ', $parts);
357+
358+
return $this;
359+
}
360+
361+
public function getSchedule()
362+
{
363+
return $this->schedule;
364+
}
220365
}

src/Queue.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ public function retryFailedJob($id, $retryCount = 0)
9797
$this->adapter->retryFailedJob($id, $retryCount);
9898
}
9999

100+
/**
101+
* Disconnect from the queue
102+
*/
103+
public function disconnect()
104+
{
105+
$this->adapter->disconnect();
106+
}
107+
100108
/**
101109
* Return queue commands
102110
*/

src/Queue/Scheduler.php

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
<?php
2+
3+
namespace Leaf\Queue;
4+
5+
/**
6+
* Leaf Queue Scheduler (DB Only)
7+
* ---
8+
* Scheduler checks the queue schedules table and pops them unto the main queue when due
9+
*/
10+
class Scheduler
11+
{
12+
protected bool $enabled = false;
13+
14+
/**
15+
* @var \Leaf\Db|\Leaf\Redis
16+
*/
17+
protected $adapter = null;
18+
19+
protected array $connection = [];
20+
21+
protected array $schedule = [];
22+
23+
/**
24+
* Check if the scheduler is enabled
25+
* @return bool
26+
*/
27+
public function isEnabled(): bool
28+
{
29+
return $this->enabled;
30+
}
31+
32+
/**
33+
* Connect to queue driver ONLY if there are jobs that need scheduling
34+
* @param array $connection The connection config
35+
* @return $this
36+
*/
37+
public function connect($connection)
38+
{
39+
if ($connection['driver'] !== 'database') {
40+
return $this;
41+
}
42+
43+
foreach (glob(AppPaths('jobs') . '/*.php') as $file) {
44+
require $file;
45+
46+
/**@var \Leaf\Job */
47+
$job = new ("App\\Jobs\\" . pathinfo($file, PATHINFO_FILENAME))();
48+
49+
if ($schedule = $job->schedule()) {
50+
$this->schedule[] = [
51+
'class' => get_class($job),
52+
'schedule' => is_string($schedule) ? $schedule : $schedule->getSchedule(),
53+
'next_run' => $this->getNextRunTime(is_string($schedule) ? $schedule : $schedule->getSchedule()),
54+
];
55+
56+
$this->enabled = true;
57+
}
58+
}
59+
60+
if ($this->enabled) {
61+
$appDbConfig = MvcConfig('database');
62+
$dbConnection = $appDbConfig['connections'][$connection['connection']] ?? $appDbConfig['connections'][$appDbConfig['default']];
63+
64+
$this->adapter = new \Leaf\Db();
65+
$this->adapter->connect([
66+
'dbtype' => $dbConnection['driver'] ?? 'mysql',
67+
'charset' => $dbConnection['charset'] ?? null,
68+
'port' => $dbConnection['port'] ?? null,
69+
'unixSocket' => $dbConnection['unixSocket'] ?? null,
70+
'host' => $dbConnection['host'] ?? '127.0.0.1',
71+
'username' => $dbConnection['username'] ?? 'root',
72+
'password' => $dbConnection['password'] ?? '',
73+
'dbname' => $dbConnection['database'] ?? '',
74+
]);
75+
76+
$this->enabled = true;
77+
$this->connection = $connection;
78+
$this->connection['table'] = $connection['schedule.table'] ?? 'leaf_php_schedules';
79+
80+
$this->writeScheduleToAdapter();
81+
}
82+
83+
return $this;
84+
}
85+
86+
/**
87+
* Write schedules to the adapter
88+
* @return void
89+
*/
90+
protected function writeScheduleToAdapter()
91+
{
92+
foreach ($this->schedule as $schedule) {
93+
$existing = $this->adapter->select($this->connection['table'])
94+
->where('class', $schedule['class'])
95+
->first();
96+
97+
if ($existing) {
98+
$this->adapter->update($this->connection['table'])
99+
->params([
100+
'schedule' => $schedule['schedule'],
101+
])
102+
->where('class', $schedule['class'])
103+
->execute();
104+
} else {
105+
$this->adapter->insert($this->connection['table'])
106+
->params($schedule)
107+
->execute();
108+
}
109+
}
110+
}
111+
112+
/**
113+
* Push due schedules to the main queue
114+
* @return void
115+
*/
116+
public function writeDueSchedulesToQueue()
117+
{
118+
if (!$this->enabled) {
119+
return;
120+
}
121+
122+
$dueSchedules = $this->adapter->select($this->connection['table'])
123+
->where('next_run', '<=', date('Y-m-d H:i:s'))
124+
->get();
125+
126+
foreach ($dueSchedules as $schedule) {
127+
dispatch($schedule['class']);
128+
129+
$this->adapter->update($this->connection['table'])
130+
->params([
131+
'next_run' => $this->getNextRunTime($schedule['schedule']),
132+
'last_run' => date('Y-m-d H:i:s'),
133+
'run_count' => $schedule['run_count'] + 1,
134+
])
135+
->where('class', $schedule['class'])
136+
->execute();
137+
}
138+
}
139+
140+
/**
141+
* Get the next run time for a cron expression
142+
* @param string $cronExpression The cron expression
143+
* @return string The next run time in Y-m-d H:i:s format
144+
*/
145+
protected function getNextRunTime(string $cronExpression): string
146+
{
147+
$cron = new \Cron\CronExpression($cronExpression);
148+
return $cron->getNextRunDate()->format('Y-m-d H:i:s');
149+
}
150+
151+
/**
152+
* Check if a schedule is due
153+
* @param string $cronExpression The cron expression
154+
* @return bool True if due, false otherwise
155+
*/
156+
protected function isDue(string $cronExpression): bool
157+
{
158+
$cron = new \Cron\CronExpression($cronExpression);
159+
return $cron->isDue();
160+
}
161+
162+
/**
163+
* Disconnect from adapter and shut down scheduler
164+
*/
165+
public function disconnect()
166+
{
167+
if ($this->adapter) {
168+
$this->adapter->close();
169+
}
170+
171+
$this->enabled = false;
172+
}
173+
}

0 commit comments

Comments
 (0)