Skip to content

Commit a274bbe

Browse files
committed
fix: queue handling
1 parent 9226328 commit a274bbe

File tree

1 file changed

+185
-100
lines changed

1 file changed

+185
-100
lines changed

agent/fw_laravel_queue.c

Lines changed: 185 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -233,143 +233,125 @@ static char* nr_laravel_queue_job_txn_name(zval* job TSRMLS_DC) {
233233
}
234234

235235
/*
236-
* Handle: Illuminate\Queue\SyncQueue::executeJob
237-
* /**
238-
* Execute a given job synchronously.
239-
*
240-
* @param string $job
241-
* @param mixed $data
242-
* @param string|null $queue
243-
* @return int
244-
*
245-
* @throws \Throwable
246-
protected function executeJob($job, $data = '', $queue = null)
236+
* Handle:
237+
* Illuminate\\Queue\\Worker::raiseBeforeJobEvent
238+
* Raise the before queue job event.
239+
*
240+
* @param string $connectionName
241+
* @param \Illuminate\Contracts\Queue\Job $job
242+
* @return void
243+
* protected function raiseBeforeJobEvent($connectionName, $job)
244+
*
245+
* Illuminate\\Queue\\SyncQueue::raiseBeforeJobEvent
246+
* Raise the before queue job event.
247+
*
248+
* @param \Illuminate\Contracts\Queue\Job $job
249+
* @return void
250+
* protected function raiseBeforeJobEvent(Job $job)
251+
*
252+
* The reason these functions are used for txn naming:
253+
* 1. It has been a consistent API called directly before the Job across Laravel
254+
* going back to at leaset 6
255+
* 2. It allows us to have a reusable function callback for both sync/async.
256+
* 3. Sync jobs don't use the job that is passed in, they create a brand new job
257+
* based off of the passed in job that then then attempt to run.
247258
*/
248-
NR_PHP_WRAPPER(nr_laravel_queue_syncqueue_executeJob_before) {
259+
NR_PHP_WRAPPER(nr_laravel_queue_worker_raiseBeforeJobEvent_before) {
249260
zval* job = NULL;
250261
nr_segment_t* segment = NULL;
262+
char* txn_name = NULL;
263+
size_t argc = 0;
264+
int job_param_num = 2; /* Most likely case of async job. */
251265

252266
NR_UNUSED_SPECIALFN;
253267

254268
NR_PHP_WRAPPER_REQUIRE_FRAMEWORK(NR_FW_LARAVEL);
255269

256-
/*
257-
* End the current txn in preparation for the Job txn.
270+
/* For raiseBeforeJobEvent:
271+
* For Async jobs, Job is the second parameter.
272+
* For Sync jobs, Job is the first and only parameter.
258273
*/
259-
nr_php_txn_end(1, 0);
260-
261-
/*
262-
* Laravel 7+ passes Job as the first parameter.
263-
*/
264-
char* txn_name = NULL;
265-
266-
job = nr_php_arg_get(1, NR_EXECUTE_ORIG_ARGS);
274+
argc = nr_php_get_user_func_arg_count(NR_EXECUTE_ORIG_ARGS);
275+
if (1 == argc) {
276+
/* This is a sync job*/
277+
job_param_num = 1;
278+
}
267279

268-
/* txn_name needs to be freed by the caller. */
280+
job = nr_php_arg_get(job_param_num, NR_EXECUTE_ORIG_ARGS);
269281
txn_name = nr_laravel_queue_job_txn_name(job);
270282

271-
/*
272-
* Begin the transaction we'll actually record.
273-
*/
274-
275-
if (NR_SUCCESS == nr_php_txn_begin(NULL, NULL)) {
276-
nr_txn_set_as_background_job(NRPRG(txn), "Laravel job");
283+
if (NULL == txn_name) {
284+
txn_name = nr_strdup("unknown");
285+
}
277286

278-
segment = nr_txn_get_current_segment(NRPRG(txn), NULL);
279-
/*
280-
* Transfer the old wraprec to the newly created segment inside the brand
281-
* new txn so that the agent will be able to call any _after or _clean
282-
* callbacks.
283-
*/
284-
if (NULL != segment) {
285-
segment->wraprec = wraprec;
286-
}
287+
segment = NRPRG(txn)->segment_root;
288+
if (NULL != segment) {
289+
nr_segment_set_name(segment, txn_name);
290+
}
287291

288-
if (NULL == txn_name) {
289-
txn_name = nr_strdup("unknown");
290-
}
292+
nr_laravel_queue_set_cat_txn(job TSRMLS_CC);
291293

292-
nr_laravel_queue_set_cat_txn(job TSRMLS_CC);
294+
nr_txn_set_path("Laravel", NRPRG(txn), txn_name, NR_PATH_TYPE_CUSTOM,
295+
NR_OK_TO_OVERWRITE);
293296

294-
nr_txn_set_path("Laravel", NRPRG(txn), txn_name, NR_PATH_TYPE_CUSTOM,
295-
NR_OK_TO_OVERWRITE);
296-
}
297-
nr_php_arg_release(&job);
298297
nr_free(txn_name);
298+
nr_php_arg_release(&job);
299299
NR_PHP_WRAPPER_CALL;
300300
}
301301
NR_PHP_WRAPPER_END
302302

303303
/*
304-
* Handle:
304+
* Handles:
305305
* Illuminate\\Queue\\Worker::process
306-
* @param string $connectionName
307-
* @param \Illuminate\Contracts\Queue\Job $job
308-
* @param \Illuminate\Queue\WorkerOptions $options
309-
* @return void
306+
* Illuminate\\Queue\\SyncQueue::executeJob (laravel 11+)
307+
* Illuminate\\Queue\\SyncQueue::push (before laravel 11)
310308
*
311-
* @throws \Throwable
309+
* Ends/discards the unneeded worker txn
310+
* Starts the job txn
312311
*/
313-
NR_PHP_WRAPPER(nr_laravel_queue_worker_process_before) {
314-
zval* job = NULL;
312+
313+
NR_PHP_WRAPPER(nr_laravel_queue_worker_before) {
315314
nr_segment_t* segment = NULL;
316315

317316
NR_UNUSED_SPECIALFN;
318317

319318
NR_PHP_WRAPPER_REQUIRE_FRAMEWORK(NR_FW_LARAVEL);
320319

321320
/*
322-
* End the current txn to prepare for the Job txn.
321+
* End and discard the current txn to prepare for the Job txn.
323322
*/
324323
nr_php_txn_end(1, 0 TSRMLS_CC);
325-
/*
326-
* Job is the second parameter.
327-
*/
328-
char* txn_name = NULL;
329-
330-
job = nr_php_arg_get(2, NR_EXECUTE_ORIG_ARGS);
331-
txn_name = nr_laravel_queue_job_txn_name(job);
332324

333325
/*
334326
* Begin the transaction we'll actually record.
335327
*/
336328

337329
if (NR_SUCCESS == nr_php_txn_begin(NULL, NULL)) {
338330
nr_txn_set_as_background_job(NRPRG(txn), "Laravel job");
331+
339332
segment = nr_txn_get_current_segment(NRPRG(txn), NULL);
340-
/*
341-
* Transfer the old wraprec to the newly created segment inside the brand
342-
* new txn so that the agent will be able to call any _after or _clean
343-
* callbacks.
344-
*/
345333
if (NULL != segment) {
346334
segment->wraprec = wraprec;
347335
}
348-
if (NULL == txn_name) {
349-
txn_name = nr_strdup("unknown");
350-
}
351-
352-
nr_laravel_queue_set_cat_txn(job TSRMLS_CC);
353-
354-
nr_txn_set_path("Laravel", NRPRG(txn), txn_name, NR_PATH_TYPE_CUSTOM,
355-
NR_OK_TO_OVERWRITE);
356336
}
357-
358-
nr_free(txn_name);
359-
nr_php_arg_release(&job);
360337
NR_PHP_WRAPPER_CALL;
361338
}
362339
NR_PHP_WRAPPER_END
363340

364341
/*
365342
* Handles:
366343
* Illuminate\\Queue\\Worker::process
367-
* Illuminate\\Queue\\SyncQueue::executeJob
344+
* Illuminate\\Queue\\SyncQueue::executeJob (laravel 11+)
345+
* Illuminate\\Queue\\SyncQueue::push (before laravel 11)
346+
*
347+
* Closes the job txn
348+
* Records any exceptions as needed
349+
* Restarts the txn instrumentation
368350
*/
369351

370352
NR_PHP_WRAPPER(nr_laravel_queue_worker_after) {
371353
NR_UNUSED_SPECIALFN;
372-
(void)wraprec;
354+
nr_php_execute_metadata_t metadata;
373355
NR_PHP_WRAPPER_REQUIRE_FRAMEWORK(NR_FW_LARAVEL);
374356

375357
/*
@@ -378,6 +360,53 @@ NR_PHP_WRAPPER(nr_laravel_queue_worker_after) {
378360
* transaction either when Illuminate\\Queue\\Worker::process or
379361
* Illuminate\\Queue\\SyncQueue::executeJob is called again
380362
*/
363+
364+
if (NULL != auto_segment->error) {
365+
/* An exception occurred we need to record it on the txn if it isn't already
366+
* there. */
367+
if (NULL == NRPRG(txn)->error) {
368+
/* Since we are ending this txn within the wrapper, and we know it has an
369+
error, apply it to the txn; otherwise, the
370+
way it is handled in Laravel means the exception is caught by an internal
371+
job exceptions handler and then thrown again AFTER we've already ended the
372+
txn for the job
373+
*
374+
*/
375+
zval exception;
376+
ZVAL_OBJ(&exception, EG(exception));
377+
nr_status_t st;
378+
st = nr_php_error_record_exception(
379+
NRPRG(txn), &exception, 50, false /* add to segment */,
380+
NULL /* use default prefix */, &NRPRG(exception_filters));
381+
382+
if (NR_FAILURE == st) {
383+
nrl_verbosedebug(NRL_FRAMEWORK, "%s: unable to record exception",
384+
__func__);
385+
}
386+
}
387+
}
388+
389+
nr_php_txn_end(0, 0 TSRMLS_CC);
390+
nr_php_txn_begin(NULL, NULL TSRMLS_CC);
391+
}
392+
NR_PHP_WRAPPER_END
393+
394+
/*
395+
* Handles:
396+
* Illuminate\\Queue\\Worker::process
397+
* Illuminate\\Queue\\SyncQueue::executeJob
398+
*/
399+
400+
NR_PHP_WRAPPER(nr_laravel_queue_worker_stop_after) {
401+
NR_UNUSED_SPECIALFN;
402+
(void)wraprec;
403+
NR_PHP_WRAPPER_REQUIRE_FRAMEWORK(NR_FW_LARAVEL);
404+
/*
405+
* Anytime the worker is considering whether to stop or not (or actually
406+
* stopping), we can ignore everything that's happened previously (sleeping
407+
* etc) that is non job processing related and avoid a useless txn.
408+
*/
409+
381410
nr_php_txn_end(0, 0 TSRMLS_CC);
382411
nr_php_txn_begin(NULL, NULL TSRMLS_CC);
383412
}
@@ -676,12 +705,11 @@ NR_PHP_WRAPPER(nr_laravel_queue_worker_process) {
676705
nr_php_arg_release(&job);
677706

678707
/*
679-
* End the real transaction and then start a new transaction so our
708+
* End the useless transaction and then start a new transaction so our
680709
* instrumentation continues to fire, knowing that we'll ignore that
681-
* transaction either when Worker::process() is called again or when
682-
* WorkCommand::handle() exits.
710+
* transaction if the worker continues when Worker::process() is called again.
683711
*/
684-
nr_php_txn_end(0, 0 TSRMLS_CC);
712+
nr_php_txn_end(1, 0 TSRMLS_CC);
685713
nr_php_txn_begin(NULL, NULL TSRMLS_CC);
686714
}
687715
NR_PHP_WRAPPER_END
@@ -817,9 +845,9 @@ NR_PHP_WRAPPER(nr_laravel_queue_queue_createpayload) {
817845
}
818846

819847
/*
820-
* The payload should be a JSON string: in essence, we want to decode it, add
821-
* our attributes, and then re-encode it. Unfortunately, the payload will
822-
* include NULL bytes for closures, and this causes nro to choke badly
848+
* The payload should be a JSON string: in essence, we want to decode it,
849+
* add our attributes, and then re-encode it. Unfortunately, the payload
850+
* will include NULL bytes for closures, and this causes nro to choke badly
823851
* because it can't handle NULLs in strings, so we'll call back into PHP's
824852
* own JSON functions.
825853
*/
@@ -876,35 +904,92 @@ void nr_laravel_queue_enable(TSRMLS_D) {
876904
&& !defined OVERWRITE_ZEND_EXECUTE_DATA
877905

878906
/*
879-
* Here's the problem: we want to record individual transactions for each job
880-
* that is executed, but don't want to record a transaction for the actual
881-
* queue:work command, since it spends most of its time sleeping. The naive
882-
* approach would be to end the transaction immediately and instrument
907+
* Here's the problem: we want to record individual transactions for each
908+
* job that is executed, but don't want to record a transaction for the
909+
* actual queue:work command, since it spends most of its time sleeping. The
910+
* naive approach would be to end the transaction immediately and instrument
883911
* Worker::process(). The issue with that is that instrumentation hooks
884912
* aren't executed if we're not actually in a transaction.
885913
*
886914
* So instead, what we'll do is to keep recording, but ensure that we ignore
887915
* the transaction before and after
888916
* Illuminate\\Queue\\Worker::process
889-
* Illuminate\\Queue\\SyncQueue::executeJob
890-
* This ensures that we instrument the entirety of the job (including any
891-
* handle/failed functions)
917+
* Illuminate\\Queue\\SyncQueue::executeJob/push
918+
* and we'll use raiseBeforeJobEvent to ensure we have the most up to date Job
919+
* info. This ensures that we instrument the entirety of the job (including
920+
* any handle/failed functions and also exceptions).
921+
*
922+
*
923+
* Why so many?
924+
* 1. The main reason is because of the trickiness when starting/stopping txns
925+
* within an OAPI wrapped func. OAPI handling assumes a segment is created
926+
* with w/associated wraprecs in func_begin. However, when we end/discard the
927+
* txn, we discard all those previous segments, so any functions that had
928+
* wrapped callbacks will all go away after a txn end. We have only one
929+
* opportunity to preserve any wraprecs and that is when we stop/start in a
930+
* wrapped func that has both a before and an after/clean. Because we are
931+
* starting/ending txns within the wrapper, it requires much more handling for
932+
* OAPI compatibility. Namely, you would need to transfer the old wraprec
933+
* from the old segment to the newly created segment inside the brand new txn
934+
* so that the agent will be able to call any _after or _clean callbacks AND
935+
* you'd have to modify one of the checks in end_func to account for the fact
936+
* that a root segment is okay to encounter if it has a wraprec.
937+
*
938+
* 2. Sync doesn't have all the resolved queue info at the beginning (or end)
939+
* or executeJob/push. It creates a temporary job that it uses internally.
940+
*
941+
* Why not just use the raiseAfterEventJob listener to end the txn?
942+
* It's not called all the time. For instance, it is not called in the case
943+
* of exceptions.
892944
*/
893945

894946
/*
895-
* Wrap:
896-
* Illuminate\\Queue\\Worker::process
897-
* Illuminate\\Queue\\SyncQueue::executeJob
947+
* The before callbacks will handle:
948+
* 1) ending the unneeded worker txn
949+
* 2) starting
950+
* the after/clean callbacks will handle:
951+
* 1) ending the job txn
952+
* 2) handling any exception
953+
* 3) restarting the txn instrumentation going
898954
*/
955+
/*Laravel 11+*/
899956
nr_php_wrap_user_function_before_after_clean(
900957
NR_PSTR("Illuminate\\Queue\\SyncQueue::executeJob"),
901-
nr_laravel_queue_syncqueue_executeJob_before,
902-
nr_laravel_queue_worker_after, nr_laravel_queue_worker_after);
958+
nr_laravel_queue_worker_before, nr_laravel_queue_worker_after,
959+
nr_laravel_queue_worker_after);
960+
/* Laravel below 11*/
961+
nr_php_wrap_user_function_before_after_clean(
962+
NR_PSTR("Illuminate\\Queue\\SyncQueue::push"),
963+
nr_laravel_queue_worker_before, nr_laravel_queue_worker_after,
964+
nr_laravel_queue_worker_after);
903965
nr_php_wrap_user_function_before_after_clean(
904966
NR_PSTR("Illuminate\\Queue\\Worker::process"),
905-
nr_laravel_queue_worker_process_before, nr_laravel_queue_worker_after,
967+
nr_laravel_queue_worker_before, nr_laravel_queue_worker_after,
906968
nr_laravel_queue_worker_after);
907969

970+
/* These wrappers will handle naming the job txn*/
971+
nr_php_wrap_user_function_before_after_clean(
972+
NR_PSTR("Illuminate\\Queue\\Worker::raiseBeforeJobEvent"),
973+
nr_laravel_queue_worker_raiseBeforeJobEvent_before, NULL, NULL);
974+
nr_php_wrap_user_function_before_after_clean(
975+
NR_PSTR("Illuminate\\Queue\\SyncQueue::raiseBeforeJobEvent"),
976+
nr_laravel_queue_worker_raiseBeforeJobEvent_before, NULL, NULL);
977+
978+
/*
979+
* Additional cleanup so we don't help prevent a useless excess txn after the
980+
* worker ends. Anytime the worker is questioning whether it should stop or is
981+
* going to stop, we can end the txn.
982+
*/
983+
nr_php_wrap_user_function_before_after_clean(
984+
NR_PSTR("Illuminate\\Queue\\Worker::stopIfNecessary"), NULL,
985+
nr_laravel_queue_worker_stop_after, NULL);
986+
nr_php_wrap_user_function_before_after_clean(
987+
NR_PSTR("Illuminate\\Queue\\Worker::stopWorkerIfLostConnection"), NULL,
988+
nr_laravel_queue_worker_stop_after, NULL);
989+
nr_php_wrap_user_function_before_after_clean(
990+
NR_PSTR("Illuminate\\Queue\\Worker::stop"), NULL,
991+
nr_laravel_queue_worker_stop_after, NULL);
992+
908993
#else
909994

910995
/*

0 commit comments

Comments
 (0)