@@ -1451,92 +1451,107 @@ export class RunQueue {
14511451 shard : number ;
14521452 maxCount : number ;
14531453 } ) : Promise < DequeuedMessage [ ] > {
1454- const queueConcurrencyLimitKey = this . keys . queueConcurrencyLimitKeyFromQueue ( messageQueue ) ;
1455- const queueCurrentConcurrencyKey = this . keys . queueCurrentConcurrencyKeyFromQueue ( messageQueue ) ;
1456- const envConcurrencyLimitKey = this . keys . envConcurrencyLimitKeyFromQueue ( messageQueue ) ;
1457- const envConcurrencyLimitBurstFactorKey =
1458- this . keys . envConcurrencyLimitBurstFactorKeyFromQueue ( messageQueue ) ;
1459- const envCurrentConcurrencyKey = this . keys . envCurrentConcurrencyKeyFromQueue ( messageQueue ) ;
1460- const messageKeyPrefix = this . keys . messageKeyPrefixFromQueue ( messageQueue ) ;
1461- const envQueueKey = this . keys . envQueueKeyFromQueue ( messageQueue ) ;
1462- const masterQueueKey = this . keys . masterQueueKeyForShard ( shard ) ;
1463-
1464- this . logger . debug ( "#callDequeueMessagesFromQueue" , {
1465- messageQueue,
1466- queueConcurrencyLimitKey,
1467- envConcurrencyLimitKey,
1468- envConcurrencyLimitBurstFactorKey,
1469- queueCurrentConcurrencyKey,
1470- envCurrentConcurrencyKey,
1471- messageKeyPrefix,
1472- envQueueKey,
1473- masterQueueKey,
1474- shard,
1475- maxCount,
1476- } ) ;
1454+ return this . #trace( "callDequeueMessagesFromQueue" , async ( span ) => {
1455+ span . setAttributes ( {
1456+ messageQueue,
1457+ shard,
1458+ maxCount,
1459+ } ) ;
14771460
1478- const result = await this . redis . dequeueMessagesFromQueue (
1479- //keys
1480- messageQueue ,
1481- queueConcurrencyLimitKey ,
1482- envConcurrencyLimitKey ,
1483- envConcurrencyLimitBurstFactorKey ,
1484- queueCurrentConcurrencyKey ,
1485- envCurrentConcurrencyKey ,
1486- messageKeyPrefix ,
1487- envQueueKey ,
1488- masterQueueKey ,
1489- //args
1490- messageQueue ,
1491- String ( Date . now ( ) ) ,
1492- String ( this . options . defaultEnvConcurrency ) ,
1493- String ( this . options . defaultEnvConcurrencyBurstFactor ?? 1 ) ,
1494- this . options . redis . keyPrefix ?? "" ,
1495- String ( maxCount )
1496- ) ;
1461+ const queueConcurrencyLimitKey = this . keys . queueConcurrencyLimitKeyFromQueue ( messageQueue ) ;
1462+ const queueCurrentConcurrencyKey =
1463+ this . keys . queueCurrentConcurrencyKeyFromQueue ( messageQueue ) ;
1464+ const envConcurrencyLimitKey = this . keys . envConcurrencyLimitKeyFromQueue ( messageQueue ) ;
1465+ const envConcurrencyLimitBurstFactorKey =
1466+ this . keys . envConcurrencyLimitBurstFactorKeyFromQueue ( messageQueue ) ;
1467+ const envCurrentConcurrencyKey = this . keys . envCurrentConcurrencyKeyFromQueue ( messageQueue ) ;
1468+ const messageKeyPrefix = this . keys . messageKeyPrefixFromQueue ( messageQueue ) ;
1469+ const envQueueKey = this . keys . envQueueKeyFromQueue ( messageQueue ) ;
1470+ const masterQueueKey = this . keys . masterQueueKeyForShard ( shard ) ;
14971471
1498- if ( ! result ) {
1499- return [ ] ;
1500- }
1472+ this . logger . debug ( "#callDequeueMessagesFromQueue" , {
1473+ messageQueue,
1474+ queueConcurrencyLimitKey,
1475+ envConcurrencyLimitKey,
1476+ envConcurrencyLimitBurstFactorKey,
1477+ queueCurrentConcurrencyKey,
1478+ envCurrentConcurrencyKey,
1479+ messageKeyPrefix,
1480+ envQueueKey,
1481+ masterQueueKey,
1482+ shard,
1483+ maxCount,
1484+ } ) ;
15011485
1502- this . logger . debug ( "dequeueMessagesFromQueue raw result" , {
1503- result,
1504- service : this . name ,
1505- } ) ;
1486+ const result = await this . redis . dequeueMessagesFromQueue (
1487+ //keys
1488+ messageQueue ,
1489+ queueConcurrencyLimitKey ,
1490+ envConcurrencyLimitKey ,
1491+ envConcurrencyLimitBurstFactorKey ,
1492+ queueCurrentConcurrencyKey ,
1493+ envCurrentConcurrencyKey ,
1494+ messageKeyPrefix ,
1495+ envQueueKey ,
1496+ masterQueueKey ,
1497+ //args
1498+ messageQueue ,
1499+ String ( Date . now ( ) ) ,
1500+ String ( this . options . defaultEnvConcurrency ) ,
1501+ String ( this . options . defaultEnvConcurrencyBurstFactor ?? 1 ) ,
1502+ this . options . redis . keyPrefix ?? "" ,
1503+ String ( maxCount )
1504+ ) ;
15061505
1507- const messages = [ ] ;
1508- for ( let i = 0 ; i < result . length ; i += 3 ) {
1509- const messageId = result [ i ] ;
1510- const messageScore = result [ i + 1 ] ;
1511- const rawMessage = result [ i + 2 ] ;
1506+ if ( ! result ) {
1507+ span . setAttribute ( "message_count" , 0 ) ;
15121508
1513- //read message
1514- const parsedMessage = OutputPayload . safeParse ( JSON . parse ( rawMessage ) ) ;
1515- if ( ! parsedMessage . success ) {
1516- this . logger . error ( `[${ this . name } ] Failed to parse message` , {
1509+ return [ ] ;
1510+ }
1511+
1512+ this . logger . debug ( "dequeueMessagesFromQueue raw result" , {
1513+ result,
1514+ service : this . name ,
1515+ } ) ;
1516+
1517+ const messages = [ ] ;
1518+ for ( let i = 0 ; i < result . length ; i += 3 ) {
1519+ const messageId = result [ i ] ;
1520+ const messageScore = result [ i + 1 ] ;
1521+ const rawMessage = result [ i + 2 ] ;
1522+
1523+ //read message
1524+ const parsedMessage = OutputPayload . safeParse ( JSON . parse ( rawMessage ) ) ;
1525+ if ( ! parsedMessage . success ) {
1526+ this . logger . error ( `[${ this . name } ] Failed to parse message` , {
1527+ messageId,
1528+ error : parsedMessage . error ,
1529+ service : this . name ,
1530+ } ) ;
1531+
1532+ continue ;
1533+ }
1534+
1535+ const message = parsedMessage . data ;
1536+
1537+ messages . push ( {
15171538 messageId,
1518- error : parsedMessage . error ,
1519- service : this . name ,
1539+ messageScore ,
1540+ message ,
15201541 } ) ;
1521-
1522- continue ;
15231542 }
15241543
1525- const message = parsedMessage . data ;
1526-
1527- messages . push ( {
1528- messageId,
1529- messageScore,
1530- message,
1544+ this . logger . debug ( "dequeueMessagesFromQueue parsed result" , {
1545+ messages,
1546+ service : this . name ,
15311547 } ) ;
1532- }
15331548
1534- this . logger . debug ( "dequeueMessagesFromQueue parsed result" , {
1535- messages,
1536- service : this . name ,
1537- } ) ;
1549+ const filteredMessages = messages . filter ( Boolean ) as DequeuedMessage [ ] ;
1550+
1551+ span . setAttribute ( "message_count" , filteredMessages . length ) ;
15381552
1539- return messages . filter ( Boolean ) as DequeuedMessage [ ] ;
1553+ return filteredMessages ;
1554+ } ) ;
15401555 }
15411556
15421557 async #callDequeueMessageFromWorkerQueue( {
@@ -1569,7 +1584,16 @@ export class RunQueue {
15691584
15701585 this . abortController . signal . addEventListener ( "abort" , cleanup ) ;
15711586
1572- const result = await blockingClient . blpop ( workerQueueKey , blockingPopTimeoutSeconds ) ;
1587+ const result = await this . #trace( "popMessageFromWorkerQueue" , async ( span ) => {
1588+ span . setAttributes ( {
1589+ workerQueue,
1590+ workerQueueKey,
1591+ blockingPopTimeoutSeconds,
1592+ blocking : true ,
1593+ } ) ;
1594+
1595+ return await blockingClient . blpop ( workerQueueKey , blockingPopTimeoutSeconds ) ;
1596+ } ) ;
15731597
15741598 this . abortController . signal . removeEventListener ( "abort" , cleanup ) ;
15751599
@@ -1607,7 +1631,15 @@ export class RunQueue {
16071631
16081632 const [ , messageKey ] = result ;
16091633
1610- const workerQueueLength = await this . redis . llen ( workerQueueKey ) ;
1634+ const workerQueueLength = await this . #trace( "getWorkerQueueLength" , async ( span ) => {
1635+ span . setAttributes ( {
1636+ workerQueue,
1637+ workerQueueKey,
1638+ } ) ;
1639+
1640+ return await this . redis . llen ( workerQueueKey ) ;
1641+ } ) ;
1642+
16111643 const message = await this . #dequeueMessageFromKey( messageKey ) ;
16121644
16131645 if ( ! message ) {
@@ -1626,7 +1658,15 @@ export class RunQueue {
16261658 workerQueueKey,
16271659 } ) ;
16281660
1629- const result = await this . redis . dequeueMessageFromWorkerQueueNonBlocking ( workerQueueKey ) ;
1661+ const result = await this . #trace( "popMessageFromWorkerQueue" , async ( span ) => {
1662+ span . setAttributes ( {
1663+ workerQueue,
1664+ workerQueueKey,
1665+ blocking : false ,
1666+ } ) ;
1667+
1668+ return await this . redis . dequeueMessageFromWorkerQueueNonBlocking ( workerQueueKey ) ;
1669+ } ) ;
16301670
16311671 if ( ! result ) {
16321672 return ;
@@ -2070,27 +2110,42 @@ export class RunQueue {
20702110 }
20712111
20722112 async #dequeueMessageFromKey( messageKey : string ) {
2073- const rawMessage = await this . redis . dequeueMessageFromKey (
2074- messageKey ,
2075- this . options . redis . keyPrefix ?? ""
2076- ) ;
2113+ return this . #trace ( " dequeueMessageFromKey" , async ( span ) => {
2114+ span . setAttributes ( {
2115+ messageKey ,
2116+ } ) ;
20772117
2078- if ( ! rawMessage ) {
2079- return ;
2080- }
2118+ const rawMessage = await this . redis . dequeueMessageFromKey (
2119+ messageKey ,
2120+ this . options . redis . keyPrefix ?? ""
2121+ ) ;
20812122
2082- const [ error , message ] = parseRawMessage ( rawMessage ) ;
2123+ if ( ! rawMessage ) {
2124+ span . setAttribute ( "result" , "NO_MESSAGE" ) ;
20832125
2084- if ( error ) {
2085- this . logger . error ( `[${ this . name } ] Failed to parse message` , {
2086- messageKey,
2087- error,
2088- service : this . name ,
2089- message : message ?? rawMessage ,
2090- } ) ;
2091- }
2126+ return ;
2127+ }
20922128
2093- return message ;
2129+ const [ error , message ] = parseRawMessage ( rawMessage ) ;
2130+
2131+ if ( error ) {
2132+ this . logger . error ( `[${ this . name } ] Failed to parse message` , {
2133+ messageKey,
2134+ error,
2135+ service : this . name ,
2136+ message : message ?? rawMessage ,
2137+ } ) ;
2138+ }
2139+
2140+ if ( message ) {
2141+ span . setAttribute ( "result" , "SUCCESS" ) ;
2142+ span . setAttribute ( "messageId" , message . runId ) ;
2143+ } else {
2144+ span . setAttribute ( "result" , "NO_MESSAGE" ) ;
2145+ }
2146+
2147+ return message ;
2148+ } ) ;
20942149 }
20952150
20962151 #registerCommands( ) {
0 commit comments