Skip to content

Commit 0525a88

Browse files
[10.x] Dispatch events based on a DB transaction result (#48705)
* wip * Refactor * Add EventFake support * remove strict types * Make styleCI happy * Fix test * Add missing test for EventFake * Add test to handle nested transactions * fix typo * Make styleci happy * formatting, inject manager resolver * formatting * formatting * formatting * formatting * more thorough solution * Add additional test for nested transactions * Add additional nested transaction test --------- Co-authored-by: Taylor Otwell <[email protected]>
1 parent c55667a commit 0525a88

File tree

12 files changed

+383
-9
lines changed

12 files changed

+383
-9
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Illuminate\Contracts\Events;
4+
5+
interface ShouldDispatchAfterCommit
6+
{
7+
//
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Illuminate\Contracts\Events;
4+
5+
interface ShouldHandleEventsAfterCommit
6+
{
7+
//
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Illuminate\Contracts\Queue;
4+
5+
interface ShouldQueueAfterCommit extends ShouldQueue
6+
{
7+
//
8+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Illuminate\Database\Eloquent;
4+
5+
trait BroadcastsEventsAfterCommit
6+
{
7+
use BroadcastsEvents;
8+
9+
/**
10+
* Determine if the model event broadcast queued job should be dispatched after all transactions are committed.
11+
*
12+
* @return bool
13+
*/
14+
public function broadcastAfterCommit()
15+
{
16+
return true;
17+
}
18+
}

src/Illuminate/Events/Dispatcher.php

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
1010
use Illuminate\Contracts\Container\Container as ContainerContract;
1111
use Illuminate\Contracts\Events\Dispatcher as DispatcherContract;
12+
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
13+
use Illuminate\Contracts\Events\ShouldHandleEventsAfterCommit;
1214
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
1315
use Illuminate\Contracts\Queue\ShouldQueue;
16+
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
1417
use Illuminate\Support\Arr;
1518
use Illuminate\Support\Str;
1619
use Illuminate\Support\Traits\Macroable;
@@ -56,6 +59,13 @@ class Dispatcher implements DispatcherContract
5659
*/
5760
protected $queueResolver;
5861

62+
/**
63+
* The database transaction manager resolver instance.
64+
*
65+
* @var callable
66+
*/
67+
protected $transactionManagerResolver;
68+
5969
/**
6070
* Create a new event dispatcher instance.
6171
*
@@ -235,10 +245,37 @@ public function dispatch($event, $payload = [], $halt = false)
235245
// When the given "event" is actually an object we will assume it is an event
236246
// object and use the class as the event name and this event itself as the
237247
// payload to the handler, which makes object based events quite simple.
238-
[$event, $payload] = $this->parseEventAndPayload(
239-
$event, $payload
240-
);
248+
[$isEventObject, $event, $payload] = [
249+
is_object($event),
250+
...$this->parseEventAndPayload($event, $payload)
251+
];
252+
253+
// If the event is not intended to be dispatched unless the current database
254+
// transaction is successful, we'll register a callback which will handle
255+
// dispatching this event on the next successful DB transaction commit.
256+
if ($isEventObject &&
257+
$payload[0] instanceof ShouldDispatchAfterCommit &&
258+
! is_null($transactions = $this->resolveTransactionManager())) {
259+
$transactions->addCallback(
260+
fn () => $this->invokeListeners($event, $payload, $halt)
261+
);
241262

263+
return null;
264+
}
265+
266+
return $this->invokeListeners($event, $payload, $halt);
267+
}
268+
269+
/**
270+
* Broadcast an event and call its listeners.
271+
*
272+
* @param string|object $event
273+
* @param mixed $payload
274+
* @param bool $halt
275+
* @return array|null
276+
*/
277+
protected function invokeListeners($event, $payload, $halt = false)
278+
{
242279
if ($this->shouldBroadcast($payload)) {
243280
$this->broadcastEvent($payload[0]);
244281
}
@@ -525,7 +562,9 @@ protected function createQueuedHandlerCallable($class, $method)
525562
*/
526563
protected function handlerShouldBeDispatchedAfterDatabaseTransactions($listener)
527564
{
528-
return ($listener->afterCommit ?? null) && $this->container->bound('db.transactions');
565+
return (($listener->afterCommit ?? null) ||
566+
$listener instanceof ShouldHandleEventsAfterCommit) &&
567+
$this->resolveTransactionManager();
529568
}
530569

531570
/**
@@ -540,7 +579,7 @@ protected function createCallbackForListenerRunningAfterCommits($listener, $meth
540579
return function () use ($method, $listener) {
541580
$payload = func_get_args();
542581

543-
$this->container->make('db.transactions')->addCallback(
582+
$this->resolveTransactionManager()->addCallback(
544583
function () use ($listener, $method, $payload) {
545584
$listener->$method(...$payload);
546585
}
@@ -624,7 +663,12 @@ protected function propagateListenerOptions($listener, $job)
624663
return tap($job, function ($job) use ($listener) {
625664
$data = array_values($job->data);
626665

627-
$job->afterCommit = property_exists($listener, 'afterCommit') ? $listener->afterCommit : null;
666+
if ($listener instanceof ShouldQueueAfterCommit) {
667+
$job->afterCommit = true;
668+
} else {
669+
$job->afterCommit = property_exists($listener, 'afterCommit') ? $listener->afterCommit : null;
670+
}
671+
628672
$job->backoff = method_exists($listener, 'backoff') ? $listener->backoff(...$data) : ($listener->backoff ?? null);
629673
$job->maxExceptions = $listener->maxExceptions ?? null;
630674
$job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil(...$data) : null;
@@ -697,6 +741,29 @@ public function setQueueResolver(callable $resolver)
697741
return $this;
698742
}
699743

744+
/**
745+
* Get the database transaction manager implementation from the resolver.
746+
*
747+
* @return \Illuminate\Database\DatabaseTransactionsManager|null
748+
*/
749+
protected function resolveTransactionManager()
750+
{
751+
return call_user_func($this->transactionManagerResolver);
752+
}
753+
754+
/**
755+
* Set the database transaction manager resolver implementation.
756+
*
757+
* @param callable $resolver
758+
* @return $this
759+
*/
760+
public function setTransactionManagerResolver(callable $resolver)
761+
{
762+
$this->transactionManagerResolver = $resolver;
763+
764+
return $this;
765+
}
766+
700767
/**
701768
* Gets the raw, unprepared listeners.
702769
*

src/Illuminate/Events/EventServiceProvider.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ public function register()
1717
$this->app->singleton('events', function ($app) {
1818
return (new Dispatcher($app))->setQueueResolver(function () use ($app) {
1919
return $app->make(QueueFactoryContract::class);
20+
})->setTransactionManagerResolver(function () use ($app) {
21+
return $app->bound('db.transactions')
22+
? $app->make('db.transactions')
23+
: null;
2024
});
2125
});
2226
}

src/Illuminate/Mail/SendQueuedMailable.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Contracts\Mail\Factory as MailFactory;
77
use Illuminate\Contracts\Mail\Mailable as MailableContract;
88
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
9+
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
910
use Illuminate\Queue\InteractsWithQueue;
1011

1112
class SendQueuedMailable
@@ -57,7 +58,12 @@ public function __construct(MailableContract $mailable)
5758
{
5859
$this->mailable = $mailable;
5960

60-
$this->afterCommit = property_exists($mailable, 'afterCommit') ? $mailable->afterCommit : null;
61+
if ($mailable instanceof ShouldQueueAfterCommit) {
62+
$this->afterCommit = true;
63+
} else {
64+
$this->afterCommit = property_exists($mailable, 'afterCommit') ? $mailable->afterCommit : null;
65+
}
66+
6167
$this->connection = property_exists($mailable, 'connection') ? $mailable->connection : null;
6268
$this->maxExceptions = property_exists($mailable, 'maxExceptions') ? $mailable->maxExceptions : null;
6369
$this->queue = property_exists($mailable, 'queue') ? $mailable->queue : null;

src/Illuminate/Notifications/SendQueuedNotifications.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Illuminate\Bus\Queueable;
66
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
77
use Illuminate\Contracts\Queue\ShouldQueue;
8+
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
89
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
910
use Illuminate\Database\Eloquent\Model;
1011
use Illuminate\Queue\InteractsWithQueue;
@@ -80,7 +81,13 @@ public function __construct($notifiables, $notification, array $channels = null)
8081
$this->tries = property_exists($notification, 'tries') ? $notification->tries : null;
8182
$this->timeout = property_exists($notification, 'timeout') ? $notification->timeout : null;
8283
$this->maxExceptions = property_exists($notification, 'maxExceptions') ? $notification->maxExceptions : null;
83-
$this->afterCommit = property_exists($notification, 'afterCommit') ? $notification->afterCommit : null;
84+
85+
if ($notification instanceof ShouldQueueAfterCommit) {
86+
$this->afterCommit = true;
87+
} else {
88+
$this->afterCommit = property_exists($notification, 'afterCommit') ? $notification->afterCommit : null;
89+
}
90+
8491
$this->shouldBeEncrypted = $notification instanceof ShouldBeEncrypted;
8592
}
8693

src/Illuminate/Queue/Queue.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Illuminate\Container\Container;
88
use Illuminate\Contracts\Encryption\Encrypter;
99
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
10+
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
1011
use Illuminate\Queue\Events\JobQueued;
1112
use Illuminate\Support\Arr;
1213
use Illuminate\Support\InteractsWithTime;
@@ -325,6 +326,10 @@ function () use ($payload, $queue, $delay, $callback, $job) {
325326
*/
326327
protected function shouldDispatchAfterCommit($job)
327328
{
329+
if (is_object($job) && $job instanceof ShouldQueueAfterCommit) {
330+
return true;
331+
}
332+
328333
if (! $job instanceof Closure && is_object($job) && isset($job->afterCommit)) {
329334
return $job->afterCommit;
330335
}

src/Illuminate/Support/Testing/Fakes/EventFake.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
namespace Illuminate\Support\Testing\Fakes;
44

55
use Closure;
6+
use Illuminate\Container\Container;
67
use Illuminate\Contracts\Events\Dispatcher;
8+
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
79
use Illuminate\Support\Arr;
810
use Illuminate\Support\Str;
911
use Illuminate\Support\Traits\ForwardsCalls;
@@ -297,7 +299,7 @@ public function dispatch($event, $payload = [], $halt = false)
297299
$name = is_object($event) ? get_class($event) : (string) $event;
298300

299301
if ($this->shouldFakeEvent($name, $payload)) {
300-
$this->events[$name][] = func_get_args();
302+
$this->fakeEvent($event, $name, func_get_args());
301303
} else {
302304
return $this->dispatcher->dispatch($event, $payload, $halt);
303305
}
@@ -329,6 +331,24 @@ protected function shouldFakeEvent($eventName, $payload)
329331
->isNotEmpty();
330332
}
331333

334+
/**
335+
* Push the event onto the fake events array immediately or after the next database transaction.
336+
*
337+
* @param string|object $event
338+
* @param string $name
339+
* @param array $arguments
340+
* @return void
341+
*/
342+
protected function fakeEvent($event, $name, $arguments)
343+
{
344+
if ($event instanceof ShouldDispatchAfterCommit && Container::getInstance()->bound('db.transactions')) {
345+
return Container::getInstance()->make('db.transactions')
346+
->addCallback(fn () => $this->events[$name][] = $arguments);
347+
}
348+
349+
$this->events[$name][] = $arguments;
350+
}
351+
332352
/**
333353
* Determine whether an event should be dispatched or not.
334354
*

0 commit comments

Comments
 (0)