Skip to content

Commit 33adb1d

Browse files
authored
Merge pull request #20 from techmahedy/techmahedy-1.x
[Job Chaining] Added support for multiple sequential jobs
2 parents 3280774 + 866a4f1 commit 33adb1d

File tree

10 files changed

+1144
-35
lines changed

10 files changed

+1144
-35
lines changed

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,7 @@
4242
"sort-packages": true
4343
},
4444
"minimum-stability": "dev",
45-
"require": {}
45+
"require": {
46+
"opis/closure": "^4.4"
47+
}
4648
}

src/Drain.php

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
<?php
2+
3+
namespace Doppar\Queue;
4+
5+
use Doppar\Queue\Contracts\JobInterface;
6+
use function Opis\Closure\serialize as serialize_closure;
7+
use function Opis\Closure\unserialize as unserialize_closure;
8+
9+
class Drain
10+
{
11+
/**
12+
* The jobs in the chain.
13+
*
14+
* @var array<JobInterface>
15+
*/
16+
protected array $jobs = [];
17+
18+
/**
19+
* The queue name for the chain.
20+
*
21+
* @var string
22+
*/
23+
protected string $queueName = 'default';
24+
25+
/**
26+
* The delay before starting the chain.
27+
*
28+
* @var int
29+
*/
30+
protected int $delay = 0;
31+
32+
/**
33+
* Callback to run when all jobs complete.
34+
*
35+
* @var callable|null
36+
*/
37+
protected $onComplete = null;
38+
39+
/**
40+
* Callback to run when any job fails.
41+
*
42+
* @var callable|null
43+
*/
44+
protected $onFailure = null;
45+
46+
/**
47+
* Chain identifier.
48+
*
49+
* @var string
50+
*/
51+
protected string $chainId;
52+
53+
/**
54+
* Create a new job chain.
55+
*
56+
* @param array<JobInterface> $jobs
57+
*/
58+
public function __construct(array $jobs = [])
59+
{
60+
$this->jobs = array_values($jobs);
61+
$this->chainId = $this->generateChainId();
62+
}
63+
64+
/**
65+
* Create a new job chain.
66+
*
67+
* @param array<JobInterface> $jobs
68+
* @return static
69+
*/
70+
public static function conduct(array $jobs): static
71+
{
72+
return new static($jobs);
73+
}
74+
75+
/**
76+
* Add a job to the chain.
77+
*
78+
* @param JobInterface $job
79+
* @return $this
80+
*/
81+
public function add(JobInterface $job): self
82+
{
83+
$this->jobs[] = $job;
84+
85+
return $this;
86+
}
87+
88+
/**
89+
* Set the queue for all jobs in the chain.
90+
*
91+
* @param string $queue
92+
* @return $this
93+
*/
94+
public function onQueue(string $queue): self
95+
{
96+
$this->queueName = $queue;
97+
98+
return $this;
99+
}
100+
101+
/**
102+
* Set the delay before starting the chain.
103+
*
104+
* @param int $seconds
105+
* @return $this
106+
*/
107+
public function delayFor(int $seconds): self
108+
{
109+
$this->delay = $seconds;
110+
111+
return $this;
112+
}
113+
114+
/**
115+
* Set callback to run when all jobs complete.
116+
*
117+
* @param callable $callback
118+
* @return $this
119+
*/
120+
public function then(callable $callback): self
121+
{
122+
if ($callback instanceof \Closure) {
123+
$callback = serialize_closure($callback);
124+
}
125+
126+
$this->onComplete = $callback;
127+
128+
return $this;
129+
}
130+
131+
/**
132+
* Set callback to run when any job fails.
133+
*
134+
* @param callable $callback
135+
* @return $this
136+
*/
137+
public function catch(callable $callback): self
138+
{
139+
if ($callback instanceof \Closure) {
140+
$callback = serialize_closure($callback);
141+
}
142+
143+
$this->onFailure = $callback;
144+
145+
return $this;
146+
}
147+
148+
/**
149+
* Dispatch the job chain.
150+
*
151+
* Only the FIRST job is pushed to queue.
152+
* Each job will push the next one after successful completion.
153+
*
154+
* @return string
155+
*/
156+
public function dispatch(): string
157+
{
158+
if (empty($this->jobs)) {
159+
throw new \InvalidArgumentException('Cannot dispatch an empty job chain');
160+
}
161+
162+
// Prepare the first job with chain context
163+
$firstJob = $this->jobs[0];
164+
165+
// Attach chain metadata to first job
166+
$firstJob->chainId = $this->chainId;
167+
$firstJob->chainJobs = $this->jobs;
168+
$firstJob->chainIndex = 0;
169+
$firstJob->chainOnComplete = $this->onComplete;
170+
$firstJob->chainOnFailure = $this->onFailure;
171+
172+
// Set queue and delay
173+
$firstJob->onQueue($this->queueName);
174+
$firstJob->delayFor($this->delay);
175+
176+
// Push ONLY the first job to queue
177+
$firstJob->forceQueue();
178+
179+
return $this->chainId;
180+
}
181+
182+
/**
183+
* Execute the chain synchronously.
184+
*
185+
* @return void
186+
*/
187+
public function dispatchSync(): void
188+
{
189+
foreach ($this->jobs as $index => $job) {
190+
try {
191+
$job->handle();
192+
} catch (\Throwable $e) {
193+
if ($this->onFailure) {
194+
$callback = is_string($this->onFailure)
195+
? unserialize_closure($this->onFailure)
196+
: $this->onFailure;
197+
198+
$callback($job, $e, $index);
199+
}
200+
throw $e;
201+
}
202+
}
203+
204+
if ($this->onComplete) {
205+
$callback = is_string($this->onComplete)
206+
? unserialize_closure($this->onComplete)
207+
: $this->onComplete;
208+
209+
$callback();
210+
}
211+
}
212+
213+
/**
214+
* Get the jobs in the chain.
215+
*
216+
* @return array<JobInterface>
217+
*/
218+
public function getJobs(): array
219+
{
220+
return $this->jobs;
221+
}
222+
223+
/**
224+
* Generate a unique chain ID.
225+
*
226+
* @return string
227+
*/
228+
protected function generateChainId(): string
229+
{
230+
return 'chain_' . uniqid('', true);
231+
}
232+
}

0 commit comments

Comments
 (0)