Skip to content

Commit 99ad970

Browse files
committed
feat: add support for leaf redis
1 parent dfe5558 commit 99ad970

File tree

2 files changed

+46
-17
lines changed

2 files changed

+46
-17
lines changed

src/Queue/Adapters/Redis.php

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ class Redis implements Adapter
1616

1717
protected array $config = [];
1818

19-
public function __construct($config = [])
19+
public function __construct()
2020
{
21-
$this->config = $config;
2221
$this->redis = new \Leaf\Redis();
2322
}
2423

@@ -27,11 +26,19 @@ public function __construct($config = [])
2726
*/
2827
public function connect($connection)
2928
{
30-
$this->redis->init($connection);
29+
$this->config['table'] = $connection['table'] ?? 'leaf_php_jobs';
30+
31+
if (redis()->ping()) {
32+
$this->redis = redis();
33+
} else {
34+
$this->redis->connect(MvcConfig('redis'));
35+
}
3136

3237
if (!$this->redis->get($this->config['table'])) {
3338
$this->redis->set($this->config['table'], json_encode([]));
3439
}
40+
41+
return $this;
3542
}
3643

3744
/**
@@ -84,6 +91,7 @@ public function setJobStatus($id, $status)
8491
foreach ($jobs as $key => $job) {
8592
if ($job['id'] === $id) {
8693
$jobs[$key]['status'] = $status;
94+
8795
$this->redis->set($this->config['table'], json_encode($jobs));
8896

8997
return true;
@@ -116,28 +124,25 @@ public function getJobs()
116124
*/
117125
public function getNextJob()
118126
{
119-
$jobs = $this->redis->get($this->config['table']) ?? [];
120-
$jobs = json_decode($jobs, true);
121-
122-
$job = array_values(array_filter($jobs, function ($job) {
123-
return $job['status'] === 'pending';
124-
}))[0] ?? null;
127+
foreach ($this->getJobs() as $job) {
128+
if ($job['status'] === 'pending') {
129+
return $job;
130+
}
131+
}
125132

126-
return $job;
133+
return null;
127134
}
128135

129136
/**
130137
* @inheritDoc
131138
*/
132139
public function retryFailedJob($id, $retryCount)
133140
{
134-
$jobs = $this->redis->get($this->config['table']) ?? [];
135-
$jobs = json_decode($jobs, true);
136-
137-
foreach ($jobs as $key => $job) {
141+
foreach ($this->getJobs() as $key => $job) {
138142
if ($job['id'] === $id) {
139143
$jobs[$key]['status'] = 'pending';
140144
$jobs[$key]['retry_count'] = (int) $retryCount + 1;
145+
141146
$this->redis->set($this->config['table'], json_encode($jobs));
142147

143148
return true;

src/Queue/Commands/QueueWorkCommand.php

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,42 @@
66

77
class QueueWorkCommand extends Command
88
{
9+
protected $queueConfig;
10+
911
protected static $defaultName = 'queue:work';
1012
public $description = 'Start your queue worker';
1113
public $help = 'Start your queue worker';
1214

15+
protected function config()
16+
{
17+
$this->queueConfig = MvcConfig('queue');
18+
$this->setOption('queue', 'queue', 'optional', 'The queue you want to run', $this->queueConfig['default']);
19+
}
20+
1321
protected function handle()
1422
{
15-
$this->writeln('Queue worker started for queue \'default\'...');
23+
$queue = $this->option('queue');
24+
25+
$this->writeln("Queue worker started for queue '$queue'...");
26+
27+
if ($this->queueConfig['connections'][$queue]['driver'] === 'database') {
28+
$this->writeln('> Using database connection for queue...');
29+
30+
if (!file_exists(DatabasePath("{$this->queueConfig['connections'][$queue]['table']}.yml"))) {
31+
$this->writeln("> Queue table not found. Creating queue table...");
1632

17-
$queueConfig = MvcConfig('queue');
33+
\Leaf\FS\File::copy(__DIR__ . '/stubs/schema.yml', DatabasePath("{$this->queueConfig['connections'][$queue]['table']}.yml"));
34+
\Aloe\Core::run(
35+
"php leaf db:migrate {$this->queueConfig['connections'][$queue]['table']}",
36+
$this->output
37+
);
38+
}
39+
} else {
40+
$this->writeln('> Using redis connection for queue...');
41+
}
1842

1943
(new \Leaf\Worker())
20-
->queue($queueConfig['connections'][$queueConfig['default']])
44+
->queue($this->queueConfig['connections'][$queue])
2145
->run();
2246

2347
return 0;

0 commit comments

Comments
 (0)