Skip to content

Commit 351c43a

Browse files
committed
feat: allow queue listeners to be unique
1 parent 3026dc7 commit 351c43a

File tree

5 files changed

+390
-9
lines changed

5 files changed

+390
-9
lines changed

src/Illuminate/Bus/UniqueLock.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public function acquire($job)
3636
: ($job->uniqueFor ?? 0);
3737

3838
$cache = method_exists($job, 'uniqueVia')
39-
? $job->uniqueVia()
40-
: $this->cache;
39+
? ($job->uniqueVia() ?? $this->cache)
40+
: ($job->uniqueVia ?? $this->cache);
4141

4242
return (bool) $cache->lock($this->getKey($job), $uniqueFor)->get();
4343
}
@@ -51,8 +51,8 @@ public function acquire($job)
5151
public function release($job)
5252
{
5353
$cache = method_exists($job, 'uniqueVia')
54-
? $job->uniqueVia()
55-
: $this->cache;
54+
? ($job->uniqueVia() ?? $this->cache)
55+
: ($job->uniqueVia ?? $this->cache);
5656

5757
$cache->lock($this->getKey($job))->forceRelease();
5858
}

src/Illuminate/Events/CallQueuedListener.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Illuminate\Bus\Queueable;
66
use Illuminate\Container\Container;
7+
use Illuminate\Contracts\Cache\Repository as Cache;
78
use Illuminate\Contracts\Queue\Job;
89
use Illuminate\Contracts\Queue\ShouldQueue;
910
use Illuminate\Queue\InteractsWithQueue;
@@ -82,6 +83,31 @@ class CallQueuedListener implements ShouldQueue
8283
*/
8384
public $shouldBeEncrypted = false;
8485

86+
/**
87+
* Indicates if the listener should be unique.
88+
*/
89+
public bool $shouldBeUnique = false;
90+
91+
/**
92+
* Indicates if the listener should be unique until processing begins.
93+
*/
94+
public bool $shouldBeUniqueUntilProcessing = false;
95+
96+
/**
97+
* The unique ID of the listener.
98+
*/
99+
public mixed $uniqueId = null;
100+
101+
/**
102+
* The number of seconds the unique lock should be maintained.
103+
*/
104+
public ?int $uniqueFor = null;
105+
106+
/**
107+
* The cache store used to manage unique locks.
108+
*/
109+
public ?Cache $uniqueVia = null;
110+
85111
/**
86112
* Create a new job instance.
87113
*
@@ -172,6 +198,46 @@ public function displayName()
172198
return $this->class;
173199
}
174200

201+
/**
202+
* Determine if the listener should be unique.
203+
*/
204+
public function shouldBeUnique(): bool
205+
{
206+
return $this->shouldBeUnique;
207+
}
208+
209+
/**
210+
* Determine if the listener should be unique until processing begins.
211+
*/
212+
public function shouldBeUniqueUntilProcessing(): bool
213+
{
214+
return $this->shouldBeUniqueUntilProcessing;
215+
}
216+
217+
/**
218+
* Get the unique ID for the listener.
219+
*/
220+
public function uniqueId(): mixed
221+
{
222+
return $this->uniqueId;
223+
}
224+
225+
/**
226+
* Get the number of seconds the unique lock should be maintained.
227+
*/
228+
public function uniqueFor(): ?int
229+
{
230+
return $this->uniqueFor;
231+
}
232+
233+
/**
234+
* Get the cache store used to manage unique locks.
235+
*/
236+
public function uniqueVia(): ?Cache
237+
{
238+
return $this->uniqueVia;
239+
}
240+
175241
/**
176242
* Prepare the instance for cloning.
177243
*

src/Illuminate/Events/Dispatcher.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44

55
use Closure;
66
use Exception;
7+
use Illuminate\Bus\UniqueLock;
78
use Illuminate\Container\Container;
89
use Illuminate\Contracts\Broadcasting\Factory as BroadcastFactory;
910
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
11+
use Illuminate\Contracts\Cache\Repository as Cache;
1012
use Illuminate\Contracts\Container\Container as ContainerContract;
1113
use Illuminate\Contracts\Events\Dispatcher as DispatcherContract;
1214
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
1315
use Illuminate\Contracts\Events\ShouldHandleEventsAfterCommit;
1416
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
17+
use Illuminate\Contracts\Queue\ShouldBeUnique;
18+
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
1519
use Illuminate\Contracts\Queue\ShouldQueue;
1620
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
1721
use Illuminate\Support\Arr;
@@ -649,6 +653,11 @@ protected function queueHandler($class, $method, $arguments)
649653
{
650654
[$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);
651655

656+
if ($job->shouldBeUnique &&
657+
! (new UniqueLock($this->container->make(Cache::class)))->acquire($job)) {
658+
return;
659+
}
660+
652661
$connection = $this->resolveQueue()->connection(method_exists($listener, 'viaConnection')
653662
? (isset($arguments[0]) ? $listener->viaConnection($arguments[0]) : $listener->viaConnection())
654663
: $listener->connection ?? null);
@@ -720,6 +729,21 @@ protected function propagateListenerOptions($listener, $job)
720729
method_exists($listener, 'middleware') ? $listener->middleware(...$data) : [],
721730
$listener->middleware ?? []
722731
));
732+
733+
$job->shouldBeUnique = $listener instanceof ShouldBeUnique;
734+
$job->shouldBeUniqueUntilProcessing = $listener instanceof ShouldBeUniqueUntilProcessing;
735+
736+
if ($job->shouldBeUnique) {
737+
$job->uniqueId = method_exists($listener, 'uniqueId')
738+
? $listener->uniqueId(...$data)
739+
: ($listener->uniqueId ?? null);
740+
$job->uniqueFor = method_exists($listener, 'uniqueFor')
741+
? $listener->uniqueFor(...$data)
742+
: ($listener->uniqueFor ?? 0);
743+
$job->uniqueVia = method_exists($listener, 'uniqueVia')
744+
? $listener->uniqueVia(...$data)
745+
: null;
746+
}
723747
});
724748
}
725749

src/Illuminate/Queue/CallQueuedHandler.php

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Illuminate\Contracts\Queue\ShouldBeUnique;
1515
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
1616
use Illuminate\Database\Eloquent\ModelNotFoundException;
17+
use Illuminate\Events\CallQueuedListener;
1718
use Illuminate\Log\Context\Repository as ContextRepository;
1819
use Illuminate\Pipeline\Pipeline;
1920
use Illuminate\Queue\Attributes\DeleteWhenMissingModels;
@@ -67,7 +68,7 @@ public function call(Job $job, array $data)
6768

6869
$this->dispatchThroughMiddleware($job, $command);
6970

70-
if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
71+
if (! $job->isReleased() && ! $this->commandShouldBeUniqueUntilProcessing($command)) {
7172
$this->ensureUniqueJobLockIsReleased($command);
7273
}
7374

@@ -120,12 +121,12 @@ protected function dispatchThroughMiddleware(Job $job, $command)
120121
return (new Pipeline($this->container))->send($command)
121122
->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
122123
->finally(function ($command) use (&$lockReleased) {
123-
if (! $lockReleased && $command instanceof ShouldBeUniqueUntilProcessing && ! $command->job->isReleased()) {
124+
if (! $lockReleased && $this->commandShouldBeUniqueUntilProcessing($command) && ! $command->job->isReleased()) {
124125
$this->ensureUniqueJobLockIsReleased($command);
125126
}
126127
})
127128
->then(function ($command) use ($job, &$lockReleased) {
128-
if ($command instanceof ShouldBeUniqueUntilProcessing) {
129+
if ($this->commandShouldBeUniqueUntilProcessing($command)) {
129130
$this->ensureUniqueJobLockIsReleased($command);
130131

131132
$lockReleased = true;
@@ -212,11 +213,29 @@ protected function ensureSuccessfulBatchJobIsRecorded($command)
212213
*/
213214
protected function ensureUniqueJobLockIsReleased($command)
214215
{
215-
if ($command instanceof ShouldBeUnique) {
216+
if ($this->commandShouldBeUnique($command)) {
216217
(new UniqueLock($this->container->make(Cache::class)))->release($command);
217218
}
218219
}
219220

221+
/**
222+
* Determine if the given command should be unique.
223+
*/
224+
protected function commandShouldBeUnique(mixed $command): bool
225+
{
226+
return $command instanceof ShouldBeUnique ||
227+
($command instanceof CallQueuedListener && $command->shouldBeUnique());
228+
}
229+
230+
/**
231+
* Determine if the given command should be unique until processing begins.
232+
*/
233+
protected function commandShouldBeUniqueUntilProcessing(mixed $command): bool
234+
{
235+
return $command instanceof ShouldBeUniqueUntilProcessing ||
236+
($command instanceof CallQueuedListener && $command->shouldBeUniqueUntilProcessing());
237+
}
238+
220239
/**
221240
* Handle a model not found exception.
222241
*
@@ -294,7 +313,7 @@ public function failed(array $data, $e, string $uuid, ?Job $job = null)
294313
$command = $this->setJobInstanceIfNecessary($job, $command);
295314
}
296315

297-
if (! $command instanceof ShouldBeUniqueUntilProcessing) {
316+
if (! $this->commandShouldBeUniqueUntilProcessing($command)) {
298317
$this->ensureUniqueJobLockIsReleased($command);
299318
}
300319

0 commit comments

Comments
 (0)