Skip to content

Commit 294be10

Browse files
committed
update queue, worker, job reliance
1 parent a7d0801 commit 294be10

File tree

4 files changed

+91
-167
lines changed

4 files changed

+91
-167
lines changed

src/Job.php

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
namespace Leaf;
44

5-
class Job
5+
use Leaf\Queue\Dispatchable;
6+
7+
abstract class Job implements Dispatchable
68
{
79
/**
810
* Job config
@@ -14,20 +16,46 @@ class Job
1416
*/
1517
protected $job = [];
1618

19+
/**
20+
* Data to pass to the job
21+
*/
22+
protected $data = [];
23+
1724
/**
1825
* Queue instance
1926
* @var \Leaf\Queue
2027
*/
2128
protected $queue = null;
2229

30+
/**
31+
* Configured queue connection
32+
*/
33+
protected string $connection = 'default';
34+
35+
/**
36+
* Return configured connection
37+
*/
38+
public function connection()
39+
{
40+
return $this->connection;
41+
}
42+
2343
/**
2444
* Load a job for running
2545
*/
26-
public function __construct($job, $config, $queue)
46+
public function fromQueue($job, $config, $queue)
2747
{
2848
$this->job = $job;
29-
$this->config = $config;
3049
$this->queue = $queue;
50+
51+
if (isset($config['data'])) {
52+
$this->data = $config['data'];
53+
unset($config['data']);
54+
}
55+
56+
$this->config = $config;
57+
58+
return $this;
3159
}
3260

3361
/**
@@ -73,15 +101,6 @@ public function hasReachedRetryLimit()
73101
return $this->job['retry_count'] >= $this->config['tries'];
74102
}
75103

76-
/**
77-
* Handle job retry limit
78-
*/
79-
public function handleRetryLimit()
80-
{
81-
echo "Job {$this->job['id']} has reached retry limit\n";
82-
$this->queue->pop($this->job['id']);
83-
}
84-
85104
/**
86105
* Set job status
87106
*/
@@ -96,17 +115,19 @@ public function setStatus($status)
96115
public function retry()
97116
{
98117
sleep($this->config['delayBeforeRetry'] ?? 0);
99-
$this->queue->retryFailedJob($this->job['id'], $this->config['retry_count']);
118+
$this->queue->retryFailedJob($this->job['id'], $this->job['retry_count']);
100119
}
101120

102121
/**
103122
* Release the job back into the queue after (n) seconds.
104123
*
105-
* @param int $delay
124+
* @param int $delay
106125
* @return void
107126
*/
108127
public function release($delay = 0)
109128
{
129+
sleep($delay);
130+
110131
$this->queue->push([
111132
'class' => $this->job['class'],
112133
'config' => json_encode($this->config),
@@ -116,46 +137,39 @@ public function release($delay = 0)
116137
}
117138

118139
/**
119-
* Create a new job
140+
* Pass data to the job
120141
*/
121-
public static function create(callable $job)
142+
public function with($data)
122143
{
123-
//
144+
$this->data = $data;
145+
return $this;
124146
}
125147

126-
public function handle()
148+
public function stack()
127149
{
128-
//
150+
return;
129151
}
130152

131-
public static function dispatch($config = [], $queue = 'default')
153+
public function getConfig()
132154
{
133-
$queue = new Queue();
134-
$queueConfig = MvcConfig('queue') ?? [];
135-
136-
if (empty($queueConfig)) {
137-
if (!file_exists($configFile = \Aloe\Command\Config::rootpath('queue.config.php'))) {
138-
throw new \Exception('Queue config not found');
139-
}
140-
141-
$queueConfig = require $configFile;
142-
}
143-
144-
$queue->config($queueConfig);
145-
$queue->connect();
146-
147-
return $queue->push([
148-
'class' => get_called_class(),
149-
'config' => json_encode(array_merge($queue->config()['workerConfig'] ?? [], $config)),
150-
'status' => 'pending',
151-
'retry_count' => 0,
152-
]);
155+
return [
156+
'delay' => 0,
157+
'delayBeforeRetry' => 0,
158+
'expire' => 60,
159+
'force' => false,
160+
'memory' => 128,
161+
'quitOnEmpty' => false,
162+
'sleep' => 3,
163+
'timeout' => 60,
164+
'tries' => 3,
165+
'data' => $this->data,
166+
];
153167
}
154168

155169
public function trigger()
156170
{
157171
$this->queue->setJobStatus($this->job['id'], 'processing');
158-
$this->handle();
172+
$this->handle(...$this->data);
159173
}
160174

161175
public function removeFromQueue()

src/Queue.php

Lines changed: 6 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,19 @@ class Queue
1515
/**
1616
* The queue config
1717
*/
18-
protected $config = [
19-
'archive' => false,
20-
'adapter' => 'db',
21-
'connection' => [],
22-
'table' => 'leafphp_queue_main',
23-
'workers' => 1,
24-
'workerConfig' => [
25-
'delay' => 0,
26-
'delayBeforeRetry' => 0,
27-
'expire' => 60,
28-
'force' => false,
29-
'memory' => 128,
30-
'quitOnEmpty' => false,
31-
'sleep' => 3,
32-
'timeout' => 60,
33-
'tries' => 3,
34-
],
35-
];
36-
37-
/**
38-
* Config for the queue and worker
39-
* @param array $config The config for the queue and worker
40-
*/
41-
public function config(?array $config = null)
42-
{
43-
if (!$config) {
44-
return $this->config;
45-
}
46-
47-
$this->config = array_merge(
48-
$this->config,
49-
\Leaf\Config::get('queue') ?? [],
50-
$config ?? [],
51-
);
52-
53-
return $this->config;
54-
}
18+
protected $connection = [];
5519

5620
/**
5721
* Connect to queue adapter
22+
* @param array $connection The connection to use
5823
* @return \Leaf\Queue
5924
*/
60-
public function connect(): Queue
25+
public function connect($connection): Queue
6126
{
62-
$config = $this->config(\Leaf\Config::get('queue') ?? []);
63-
64-
$adapter = ucfirst($config['adapter']);
27+
$adapter = ucfirst($connection['driver']);
6528
$adapter = "\\Leaf\\Queue\\Adapters\\$adapter";
6629

67-
$this->adapter = new $adapter($config);
68-
$this->adapter->connect($config['connections'][$config['default'] ?? 'redis']);
30+
$this->adapter = (new $adapter())->connect($connection);
6931

7032
return $this;
7133
}
@@ -84,10 +46,6 @@ public function getAdapter()
8446
*/
8547
public function push(array $job)
8648
{
87-
if (!$this->adapter) {
88-
$this->connect();
89-
}
90-
9149
$this->adapter->pushJobToQueue($job);
9250
}
9351

@@ -97,10 +55,6 @@ public function push(array $job)
9755
*/
9856
public function pop($id)
9957
{
100-
if (!$this->adapter) {
101-
$this->connect();
102-
}
103-
10458
$this->adapter->popJobFromQueue($id);
10559
}
10660

@@ -112,10 +66,6 @@ public function pop($id)
11266
*/
11367
public function setJobStatus($id, $status)
11468
{
115-
if (!$this->adapter) {
116-
$this->connect();
117-
}
118-
11969
$this->adapter->setJobStatus($id, $status);
12070
}
12171

@@ -125,10 +75,6 @@ public function setJobStatus($id, $status)
12575
*/
12676
public function getNextJob()
12777
{
128-
if (!$this->adapter) {
129-
$this->connect();
130-
}
131-
13278
return $this->adapter->getNextJob();
13379
}
13480

@@ -138,10 +84,6 @@ public function getNextJob()
13884
*/
13985
public function markJobAsFailed($id)
14086
{
141-
if (!$this->adapter) {
142-
$this->connect();
143-
}
144-
14587
$this->adapter->markJobAsFailed($id);
14688
}
14789

@@ -152,10 +94,6 @@ public function markJobAsFailed($id)
15294
*/
15395
public function retryFailedJob($id, $retryCount = 0)
15496
{
155-
if (!$this->adapter) {
156-
$this->connect();
157-
}
158-
15997
$this->adapter->retryFailedJob($id, $retryCount);
16098
}
16199

@@ -167,24 +105,7 @@ public static function commands()
167105
return [
168106
\Leaf\Queue\Commands\DeleteJobCommand::class,
169107
\Leaf\Queue\Commands\GenerateJobCommand::class,
170-
\Leaf\Queue\Commands\QueueConfigCommand::class,
171-
\Leaf\Queue\Commands\QueueInstallCommand::class,
172-
\Leaf\Queue\Commands\QueuePauseCommand::class,
173-
\Leaf\Queue\Commands\QueueRunCommand::class,
108+
\Leaf\Queue\Commands\QueueWorkCommand::class,
174109
];
175110
}
176-
177-
/**
178-
* Initialize a worker to work the queue
179-
*/
180-
public function run()
181-
{
182-
for ($i = 0; $i < $this->config['workers']; $i++) {
183-
$worker = new Worker();
184-
$worker
185-
->config($this->config['workerConfig'])
186-
->queue($this)
187-
->run();
188-
}
189-
}
190111
}

src/Queue/Adapters/Adapter.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
*/
1010
interface Adapter
1111
{
12-
public function __construct($config = []);
13-
1412
/**
1513
* Connect to queue storage
1614
* @param array $connection Credentials for the queue storage

0 commit comments

Comments
 (0)