@@ -589,48 +589,74 @@ export class MarQS {
589589 for ( const messageQueue of env . queues ) {
590590 attemptedQueues ++ ;
591591
592- try {
593- const messageData = await this . #callDequeueMessage( {
594- messageQueue,
595- parentQueue,
596- } ) ;
597-
598- if ( ! messageData ) {
599- continue ; // Try next queue if no message was dequeued
592+ const result = await this . #trace(
593+ "attemptDequeue" ,
594+ async ( innerSpan ) => {
595+ try {
596+ innerSpan . setAttributes ( {
597+ [ SemanticAttributes . QUEUE ] : messageQueue ,
598+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueue ,
599+ } ) ;
600+
601+ const messageData = await this . #callDequeueMessage( {
602+ messageQueue,
603+ parentQueue,
604+ } ) ;
605+
606+ if ( ! messageData ) {
607+ return null ; // Try next queue if no message was dequeued
608+ }
609+
610+ const message = await this . readMessage ( messageData . messageId ) ;
611+
612+ if ( message ) {
613+ const attributes = {
614+ [ SEMATTRS_MESSAGE_ID ] : message . messageId ,
615+ [ SemanticAttributes . QUEUE ] : message . queue ,
616+ [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
617+ [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
618+ [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
619+ attempted_queues : attemptedQueues , // How many queues we tried before success
620+ attempted_envs : attemptedEnvs , // How many environments we tried before success
621+ message_timestamp : message . timestamp ,
622+ message_age : this . #calculateMessageAge( message ) ,
623+ message_priority : message . priority ,
624+ message_enqueue_method : message . enqueueMethod ,
625+ message_available_at : message . availableAt ,
626+ ...flattenAttributes ( message . data , "message.data" ) ,
627+ } ;
628+
629+ span . setAttributes ( attributes ) ;
630+ innerSpan . setAttributes ( attributes ) ;
631+
632+ await this . options . subscriber ?. messageDequeued ( message ) ;
633+
634+ await this . options . visibilityTimeoutStrategy . startHeartbeat (
635+ messageData . messageId ,
636+ this . visibilityTimeoutInMs
637+ ) ;
638+
639+ return message ;
640+ }
641+ } catch ( error ) {
642+ // Log error but continue trying other queues
643+ logger . warn ( `[${ this . name } ] Failed to dequeue from queue ${ messageQueue } ` , {
644+ error,
645+ } ) ;
646+ return null ;
647+ }
648+ } ,
649+ {
650+ kind : SpanKind . CONSUMER ,
651+ attributes : {
652+ [ SEMATTRS_MESSAGING_OPERATION ] : "dequeue" ,
653+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
654+ } ,
600655 }
656+ ) ;
601657
602- const message = await this . readMessage ( messageData . messageId ) ;
603-
604- if ( message ) {
605- span . setAttributes ( {
606- [ SEMATTRS_MESSAGE_ID ] : message . messageId ,
607- [ SemanticAttributes . QUEUE ] : message . queue ,
608- [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
609- [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
610- [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
611- attempted_queues : attemptedQueues , // How many queues we tried before success
612- attempted_envs : attemptedEnvs , // How many environments we tried before success
613- message_timestamp : message . timestamp ,
614- message_age : this . #calculateMessageAge( message ) ,
615- message_priority : message . priority ,
616- message_enqueue_method : message . enqueueMethod ,
617- message_available_at : message . availableAt ,
618- ...flattenAttributes ( message . data , "message.data" ) ,
619- } ) ;
620-
621- await this . options . subscriber ?. messageDequeued ( message ) ;
622-
623- await this . options . visibilityTimeoutStrategy . startHeartbeat (
624- messageData . messageId ,
625- this . visibilityTimeoutInMs
626- ) ;
627-
628- return message ;
629- }
630- } catch ( error ) {
631- // Log error but continue trying other queues
632- logger . warn ( `[${ this . name } ] Failed to dequeue from queue ${ messageQueue } ` , { error } ) ;
633- continue ;
658+ if ( result ) {
659+ return result ;
634660 }
635661 }
636662 }
0 commit comments