Skip to content

Commit 5ec7e88

Browse files
committed
Add CakeSentry.Queue.* events for Sentry integration.
Dispatch events compatible with lordsimal/cakephp-sentry: - CakeSentry.Queue.enqueue: when job is added to queue - CakeSentry.Queue.beforeExecute: when job processing starts - CakeSentry.Queue.afterExecute: when job completes or fails This enables queue monitoring via Sentry without requiring the queue plugin to depend on Sentry SDK directly.
1 parent e447805 commit 5ec7e88

File tree

5 files changed

+220
-1
lines changed

5 files changed

+220
-1
lines changed

docs/sections/misc.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,62 @@ The event data contains:
8585
- `job`: The `QueuedJob` entity that failed
8686
- `failureMessage`: The error message from the last failure
8787

88+
### Sentry Integration (CakeSentry Events)
89+
90+
The Queue plugin dispatches events compatible with [lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring and tracing.
91+
These events allow automatic integration with [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature.
92+
93+
The following events are dispatched:
94+
95+
#### CakeSentry.Queue.enqueue
96+
Fired when a job is added to the queue (producer side).
97+
98+
Event data:
99+
- `class`: The job task name
100+
- `id`: The job ID
101+
- `queue`: The queue/task name
102+
- `data`: The job payload
103+
104+
#### CakeSentry.Queue.beforeExecute
105+
Fired when a worker begins processing a job (consumer side).
106+
107+
Event data:
108+
- `class`: The job task name
109+
- `sentry_trace`: Sentry trace header from job data (if present)
110+
- `sentry_baggage`: Sentry baggage header from job data (if present)
111+
112+
#### CakeSentry.Queue.afterExecute
113+
Fired when a job finishes (success or failure).
114+
115+
Event data:
116+
- `id`: The job ID
117+
- `queue`: The queue/task name
118+
- `data`: The job payload
119+
- `execution_time`: Execution time in milliseconds
120+
- `retry_count`: Number of attempts
121+
- `exception`: The exception object (only on failure)
122+
123+
#### Trace Propagation
124+
125+
For distributed tracing to work across producer and consumer, you can add Sentry trace headers to job data when creating jobs:
126+
127+
```php
128+
// When creating a job with trace propagation
129+
$data = [
130+
'your_data' => 'here',
131+
];
132+
133+
// Add Sentry trace headers if Sentry SDK is available
134+
if (class_exists(\Sentry\SentrySdk::class)) {
135+
$data['_sentry_trace'] = \Sentry\getTraceparent();
136+
$data['_sentry_baggage'] = \Sentry\getBaggage();
137+
}
138+
139+
$queuedJobsTable->createJob('MyTask', $data);
140+
```
141+
142+
The Queue plugin will automatically pass these headers to the `CakeSentry.Queue.beforeExecute` event for trace continuation.
143+
88144
## Notes
89145

90146
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).

src/Model/Table/QueuedJobsTable.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use ArrayObject;
77
use Cake\Core\Configure;
88
use Cake\Core\Plugin;
9+
use Cake\Event\Event;
910
use Cake\Event\EventInterface;
11+
use Cake\Event\EventManager;
1012
use Cake\I18n\DateTime;
1113
use Cake\ORM\Query\SelectQuery;
1214
use Cake\ORM\Table;
@@ -216,8 +218,18 @@ public function createJob(string $jobTask, array|object|null $data = null, array
216218
}
217219

218220
$queuedJob = $this->newEntity($queuedJob);
221+
$queuedJob = $this->saveOrFail($queuedJob);
222+
223+
// Dispatch CakeSentry event for queue tracing integration
224+
$event = new Event('CakeSentry.Queue.enqueue', $this, [
225+
'class' => $queuedJob->job_task,
226+
'id' => (string)$queuedJob->id,
227+
'queue' => $queuedJob->job_task,
228+
'data' => $queuedJob->data ?? [],
229+
]);
230+
EventManager::instance()->dispatch($event);
219231

220-
return $this->saveOrFail($queuedJob);
232+
return $queuedJob;
221233
}
222234

223235
/**

src/Queue/Processor.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,16 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
218218
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
219219
$taskName = $queuedJob->job_task;
220220

221+
// Dispatch CakeSentry beforeExecute event for queue tracing
222+
$jobData = is_array($queuedJob->data) ? $queuedJob->data : [];
223+
$event = new Event('CakeSentry.Queue.beforeExecute', $this, [
224+
'class' => $queuedJob->job_task,
225+
'sentry_trace' => $jobData['_sentry_trace'] ?? '',
226+
'sentry_baggage' => $jobData['_sentry_baggage'] ?? '',
227+
]);
228+
EventManager::instance()->dispatch($event);
229+
230+
$startTime = microtime(true);
221231
$return = $failureMessage = null;
222232
try {
223233
$this->time = time();
@@ -241,12 +251,25 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
241251
$this->logError($taskName . ' (job ' . $queuedJob->id . ')' . "\n" . $failureMessage, $pid);
242252
}
243253

254+
$executionTime = (int)((microtime(true) - $startTime) * 1000);
255+
244256
if ($return === false) {
245257
$this->QueuedJobs->markJobFailed($queuedJob, $failureMessage);
246258
$failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->getTaskConf());
247259
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
248260
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
249261

262+
// Dispatch CakeSentry afterExecute event for failure
263+
$event = new Event('CakeSentry.Queue.afterExecute', $this, [
264+
'id' => (string)$queuedJob->id,
265+
'queue' => $queuedJob->job_task,
266+
'data' => $jobData,
267+
'execution_time' => $executionTime,
268+
'retry_count' => $queuedJob->attempts,
269+
'exception' => $e ?? null,
270+
]);
271+
EventManager::instance()->dispatch($event);
272+
250273
// Dispatch event when job has exhausted all retries
251274
if ($failedStatus === 'aborted') {
252275
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
@@ -260,6 +283,17 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
260283
}
261284

262285
$this->QueuedJobs->markJobDone($queuedJob);
286+
287+
// Dispatch CakeSentry afterExecute event for success
288+
$event = new Event('CakeSentry.Queue.afterExecute', $this, [
289+
'id' => (string)$queuedJob->id,
290+
'queue' => $queuedJob->job_task,
291+
'data' => $jobData,
292+
'execution_time' => $executionTime,
293+
'retry_count' => $queuedJob->attempts,
294+
]);
295+
EventManager::instance()->dispatch($event);
296+
263297
$this->io->out('Job Finished.');
264298
$this->currentJob = null;
265299
}

tests/TestCase/Model/Table/QueuedJobsTableTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
use Cake\Core\Configure;
1313
use Cake\Datasource\ConnectionManager;
14+
use Cake\Event\EventList;
15+
use Cake\Event\EventManager;
1416
use Cake\I18n\DateTime;
1517
use Cake\ORM\TableRegistry;
1618
use Cake\TestSuite\TestCase;
@@ -752,6 +754,20 @@ public function testGetStats() {
752754
$this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1);
753755
}
754756

757+
/**
758+
* Test that CakeSentry.Queue.enqueue event is fired when a job is created.
759+
*
760+
* @return void
761+
*/
762+
public function testEnqueueEventFired(): void {
763+
$eventList = new EventList();
764+
EventManager::instance()->setEventList($eventList);
765+
766+
$this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']);
767+
768+
$this->assertEventFired('CakeSentry.Queue.enqueue');
769+
}
770+
755771
/**
756772
* Helper method for skipping tests that need a real connection.
757773
*

tests/TestCase/Queue/ProcessorTest.php

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Queue\Model\Entity\QueuedJob;
1616
use Queue\Model\Table\QueuedJobsTable;
1717
use Queue\Queue\Processor;
18+
use Queue\Queue\Task\ExampleTask;
1819
use Queue\Queue\Task\RetryExampleTask;
1920
use ReflectionClass;
2021
use RuntimeException;
@@ -287,6 +288,106 @@ public function testWorkerTimeoutHandlingIntegration() {
287288
}
288289
}
289290

291+
/**
292+
* Test that CakeSentry.Queue.beforeExecute event is fired when job starts.
293+
*
294+
* @return void
295+
*/
296+
public function testCakeSentryBeforeExecuteEvent(): void {
297+
$eventList = new EventList();
298+
EventManager::instance()->setEventList($eventList);
299+
300+
// Create a job
301+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
302+
$job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]);
303+
304+
// Create processor with mock task
305+
$out = new ConsoleOutput();
306+
$err = new ConsoleOutput();
307+
$processor = $this->getMockBuilder(Processor::class)
308+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
309+
->onlyMethods(['loadTask'])
310+
->getMock();
311+
312+
$mockTask = $this->getMockBuilder(ExampleTask::class)
313+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
314+
->onlyMethods(['run'])
315+
->getMock();
316+
317+
$processor->method('loadTask')->willReturn($mockTask);
318+
319+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
320+
321+
$this->assertEventFired('CakeSentry.Queue.beforeExecute');
322+
}
323+
324+
/**
325+
* Test that CakeSentry.Queue.afterExecute event is fired when job completes successfully.
326+
*
327+
* @return void
328+
*/
329+
public function testCakeSentryAfterExecuteSuccessEvent(): void {
330+
$eventList = new EventList();
331+
EventManager::instance()->setEventList($eventList);
332+
333+
// Create a job
334+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
335+
$job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]);
336+
337+
// Create processor with mock task
338+
$out = new ConsoleOutput();
339+
$err = new ConsoleOutput();
340+
$processor = $this->getMockBuilder(Processor::class)
341+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
342+
->onlyMethods(['loadTask'])
343+
->getMock();
344+
345+
$mockTask = $this->getMockBuilder(ExampleTask::class)
346+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
347+
->onlyMethods(['run'])
348+
->getMock();
349+
350+
$processor->method('loadTask')->willReturn($mockTask);
351+
352+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
353+
354+
$this->assertEventFired('CakeSentry.Queue.afterExecute');
355+
}
356+
357+
/**
358+
* Test that CakeSentry.Queue.afterExecute event is fired with exception on failure.
359+
*
360+
* @return void
361+
*/
362+
public function testCakeSentryAfterExecuteFailureEvent(): void {
363+
$eventList = new EventList();
364+
EventManager::instance()->setEventList($eventList);
365+
366+
// Create a job
367+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
368+
$job = $QueuedJobs->createJob('Queue.RetryExample', ['test' => 'data'], ['priority' => 1]);
369+
370+
// Create processor with mock task that fails
371+
$out = new ConsoleOutput();
372+
$err = new ConsoleOutput();
373+
$processor = $this->getMockBuilder(Processor::class)
374+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
375+
->onlyMethods(['loadTask'])
376+
->getMock();
377+
378+
$mockTask = $this->getMockBuilder(RetryExampleTask::class)
379+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
380+
->onlyMethods(['run'])
381+
->getMock();
382+
$mockTask->method('run')->willThrowException(new RuntimeException('Task failed'));
383+
384+
$processor->method('loadTask')->willReturn($mockTask);
385+
386+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
387+
388+
$this->assertEventFired('CakeSentry.Queue.afterExecute');
389+
}
390+
290391
/**
291392
* Test setPhpTimeout with new config names
292393
*

0 commit comments

Comments
 (0)