Skip to content

Commit 2487bfd

Browse files
committed
Add Queue.Job.* lifecycle events with optional Sentry bridge.
Add generic events for queue job lifecycle: - Queue.Job.created: when job is added to queue - Queue.Job.started: when job processing starts - Queue.Job.completed: when job completes successfully - Queue.Job.failed: when job fails Add CakeSentryEventBridge listener that bridges these events to CakeSentry.Queue.* events for lordsimal/cakephp-sentry integration. Users can enable it with: EventManager::instance()->on(new CakeSentryEventBridge())
1 parent e447805 commit 2487bfd

File tree

7 files changed

+548
-3
lines changed

7 files changed

+548
-3
lines changed

docs/sections/misc.md

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,69 @@ This includes also failed ones if not filtered further using `where()` condition
5151

5252
## Events
5353
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
54+
These events are useful for monitoring, logging, and integrating with external services.
55+
56+
### Queue.Job.created
57+
Fired when a new job is added to the queue (producer side).
58+
59+
```php
60+
use Cake\Event\EventInterface;
61+
use Cake\Event\EventManager;
62+
63+
EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
64+
$job = $event->getData('job');
65+
// Track job creation for monitoring
66+
});
67+
```
68+
69+
Event data:
70+
- `job`: The `QueuedJob` entity that was created
71+
72+
### Queue.Job.started
73+
Fired when a worker begins processing a job (consumer side).
74+
75+
```php
76+
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
77+
$job = $event->getData('job');
78+
// Start tracing/monitoring span
79+
});
80+
```
81+
82+
Event data:
83+
- `job`: The `QueuedJob` entity being processed
84+
85+
### Queue.Job.completed
86+
Fired when a job finishes successfully.
87+
88+
```php
89+
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
90+
$job = $event->getData('job');
91+
// Mark trace as successful
92+
});
93+
```
94+
95+
Event data:
96+
- `job`: The `QueuedJob` entity that completed
97+
98+
### Queue.Job.failed
99+
Fired when a job fails (on every failure attempt).
100+
101+
```php
102+
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
103+
$job = $event->getData('job');
104+
$failureMessage = $event->getData('failureMessage');
105+
$exception = $event->getData('exception');
106+
// Mark trace as failed, log error
107+
});
108+
```
109+
110+
Event data:
111+
- `job`: The `QueuedJob` entity that failed
112+
- `failureMessage`: The error message from the failure
113+
- `exception`: The exception object (if available)
54114

55115
### Queue.Job.maxAttemptsExhausted
56-
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
116+
Fired when a job has failed and exhausted all of its configured retry attempts.
57117

58118
```php
59119
use Cake\Event\EventInterface;
@@ -81,10 +141,51 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
81141
});
82142
```
83143

84-
The event data contains:
144+
Event data:
85145
- `job`: The `QueuedJob` entity that failed
86146
- `failureMessage`: The error message from the last failure
87147

148+
### Sentry Integration
149+
150+
The plugin includes a `CakeSentryEventBridge` listener that bridges Queue events to
151+
[lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring.
152+
153+
To enable Sentry integration, register the bridge in your `Application::bootstrap()`:
154+
155+
```php
156+
use Cake\Event\EventManager;
157+
use Queue\Event\CakeSentryEventBridge;
158+
159+
// Enable Sentry queue monitoring
160+
EventManager::instance()->on(new CakeSentryEventBridge());
161+
```
162+
163+
This automatically dispatches the `CakeSentry.Queue.*` events that the Sentry plugin listens to:
164+
- `CakeSentry.Queue.enqueue` - when a job is created
165+
- `CakeSentry.Queue.beforeExecute` - when a job starts processing
166+
- `CakeSentry.Queue.afterExecute` - when a job completes or fails
167+
168+
#### Trace Propagation
169+
170+
For distributed tracing to work across producer and consumer, add Sentry trace headers
171+
to job data when creating jobs:
172+
173+
```php
174+
$data = [
175+
'your_data' => 'here',
176+
];
177+
178+
// Add Sentry trace headers if Sentry SDK is available
179+
if (class_exists(\Sentry\SentrySdk::class)) {
180+
$data['_sentry_trace'] = \Sentry\getTraceparent();
181+
$data['_sentry_baggage'] = \Sentry\getBaggage();
182+
}
183+
184+
$queuedJobsTable->createJob('MyTask', $data);
185+
```
186+
187+
The bridge will automatically extract these headers and pass them to the Sentry plugin.
188+
88189
## Notes
89190

90191
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Queue\Event;
5+
6+
use Cake\Event\Event;
7+
use Cake\Event\EventInterface;
8+
use Cake\Event\EventListenerInterface;
9+
use Cake\Event\EventManager;
10+
use Queue\Model\Entity\QueuedJob;
11+
12+
/**
13+
* Event listener that bridges Queue.Job.* events to CakeSentry.Queue.* events.
14+
*
15+
* This enables integration with lordsimal/cakephp-sentry for queue monitoring
16+
* without the Queue plugin depending on the Sentry SDK.
17+
*
18+
* Usage:
19+
* ```php
20+
* // In Application::bootstrap() or a plugin bootstrap
21+
* EventManager::instance()->on(new \Queue\Event\CakeSentryEventBridge());
22+
* ```
23+
*
24+
* @see https://github.com/LordSimal/cakephp-sentry
25+
*/
26+
class CakeSentryEventBridge implements EventListenerInterface {
27+
28+
/**
29+
* @var float|null Start time for execution time calculation
30+
*/
31+
protected ?float $startTime = null;
32+
33+
/**
34+
* @inheritDoc
35+
*/
36+
public function implementedEvents(): array {
37+
return [
38+
'Queue.Job.created' => 'handleCreated',
39+
'Queue.Job.started' => 'handleStarted',
40+
'Queue.Job.completed' => 'handleCompleted',
41+
'Queue.Job.failed' => 'handleFailed',
42+
];
43+
}
44+
45+
/**
46+
* Handle job created event - dispatches CakeSentry.Queue.enqueue
47+
*
48+
* @param \Cake\Event\EventInterface $event
49+
*
50+
* @return void
51+
*/
52+
public function handleCreated(EventInterface $event): void {
53+
/** @var \Queue\Model\Entity\QueuedJob $job */
54+
$job = $event->getData('job');
55+
56+
$sentryEvent = new Event('CakeSentry.Queue.enqueue', $this, [
57+
'class' => $job->job_task,
58+
'id' => (string)$job->id,
59+
'queue' => $job->job_task,
60+
'data' => $job->data ?? [],
61+
]);
62+
EventManager::instance()->dispatch($sentryEvent);
63+
}
64+
65+
/**
66+
* Handle job started event - dispatches CakeSentry.Queue.beforeExecute
67+
*
68+
* @param \Cake\Event\EventInterface $event
69+
*
70+
* @return void
71+
*/
72+
public function handleStarted(EventInterface $event): void {
73+
$this->startTime = microtime(true);
74+
75+
/** @var \Queue\Model\Entity\QueuedJob $job */
76+
$job = $event->getData('job');
77+
$jobData = is_array($job->data) ? $job->data : [];
78+
79+
$sentryEvent = new Event('CakeSentry.Queue.beforeExecute', $this, [
80+
'class' => $job->job_task,
81+
'sentry_trace' => $jobData['_sentry_trace'] ?? '',
82+
'sentry_baggage' => $jobData['_sentry_baggage'] ?? '',
83+
]);
84+
EventManager::instance()->dispatch($sentryEvent);
85+
}
86+
87+
/**
88+
* Handle job completed event - dispatches CakeSentry.Queue.afterExecute
89+
*
90+
* @param \Cake\Event\EventInterface $event
91+
*
92+
* @return void
93+
*/
94+
public function handleCompleted(EventInterface $event): void {
95+
/** @var \Queue\Model\Entity\QueuedJob $job */
96+
$job = $event->getData('job');
97+
98+
$sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $this->buildAfterExecuteData($job));
99+
EventManager::instance()->dispatch($sentryEvent);
100+
}
101+
102+
/**
103+
* Handle job failed event - dispatches CakeSentry.Queue.afterExecute with exception
104+
*
105+
* @param \Cake\Event\EventInterface $event
106+
*
107+
* @return void
108+
*/
109+
public function handleFailed(EventInterface $event): void {
110+
/** @var \Queue\Model\Entity\QueuedJob $job */
111+
$job = $event->getData('job');
112+
$exception = $event->getData('exception');
113+
114+
$data = $this->buildAfterExecuteData($job);
115+
if ($exception !== null) {
116+
$data['exception'] = $exception;
117+
}
118+
119+
$sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $data);
120+
EventManager::instance()->dispatch($sentryEvent);
121+
}
122+
123+
/**
124+
* Build common data for afterExecute event.
125+
*
126+
* @param \Queue\Model\Entity\QueuedJob $job
127+
*
128+
* @return array<string, mixed>
129+
*/
130+
protected function buildAfterExecuteData(QueuedJob $job): array {
131+
$executionTime = 0;
132+
if ($this->startTime !== null) {
133+
$executionTime = (int)((microtime(true) - $this->startTime) * 1000);
134+
$this->startTime = null;
135+
}
136+
137+
return [
138+
'id' => (string)$job->id,
139+
'queue' => $job->job_task,
140+
'data' => $job->data ?? [],
141+
'execution_time' => $executionTime,
142+
'retry_count' => $job->attempts,
143+
];
144+
}
145+
146+
}

src/Model/Table/QueuedJobsTable.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use ArrayObject;
77
use Cake\Core\Configure;
88
use Cake\Core\Plugin;
9+
use Cake\Event\Event;
910
use Cake\Event\EventInterface;
11+
use Cake\Event\EventManager;
1012
use Cake\I18n\DateTime;
1113
use Cake\ORM\Query\SelectQuery;
1214
use Cake\ORM\Table;
@@ -216,8 +218,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
216218
}
217219

218220
$queuedJob = $this->newEntity($queuedJob);
221+
$queuedJob = $this->saveOrFail($queuedJob);
219222

220-
return $this->saveOrFail($queuedJob);
223+
$event = new Event('Queue.Job.created', $this, [
224+
'job' => $queuedJob,
225+
]);
226+
EventManager::instance()->dispatch($event);
227+
228+
return $queuedJob;
221229
}
222230

223231
/**

src/Queue/Processor.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
218218
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
219219
$taskName = $queuedJob->job_task;
220220

221+
// Dispatch started event
222+
$event = new Event('Queue.Job.started', $this, [
223+
'job' => $queuedJob,
224+
]);
225+
EventManager::instance()->dispatch($event);
226+
221227
$return = $failureMessage = null;
222228
try {
223229
$this->time = time();
@@ -247,6 +253,14 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
247253
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
248254
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
249255

256+
// Dispatch failed event
257+
$event = new Event('Queue.Job.failed', $this, [
258+
'job' => $queuedJob,
259+
'failureMessage' => $failureMessage,
260+
'exception' => $e ?? null,
261+
]);
262+
EventManager::instance()->dispatch($event);
263+
250264
// Dispatch event when job has exhausted all retries
251265
if ($failedStatus === 'aborted') {
252266
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
@@ -260,6 +274,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
260274
}
261275

262276
$this->QueuedJobs->markJobDone($queuedJob);
277+
278+
// Dispatch completed event
279+
$event = new Event('Queue.Job.completed', $this, [
280+
'job' => $queuedJob,
281+
]);
282+
EventManager::instance()->dispatch($event);
283+
263284
$this->io->out('Job Finished.');
264285
$this->currentJob = null;
265286
}

0 commit comments

Comments
 (0)