Skip to content

Commit 58ff034

Browse files
authored
Add attemptDequeue span to marqs (#2433)
* Add attemptDequeue span to marqs More telemetry to figure out why dequeuing is slow * More spans * use "receive" for all operation names
1 parent 84750f6 commit 58ff034

File tree

1 file changed

+118
-40
lines changed

1 file changed

+118
-40
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 118 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -589,48 +589,126 @@ export class MarQS {
589589
for (const messageQueue of env.queues) {
590590
attemptedQueues++;
591591

592-
try {
593-
const messageData = await this.#callDequeueMessage({
594-
messageQueue,
595-
parentQueue,
596-
});
597-
598-
if (!messageData) {
599-
continue; // Try next queue if no message was dequeued
592+
const result = await this.#trace(
593+
"attemptDequeue",
594+
async (innerSpan) => {
595+
try {
596+
innerSpan.setAttributes({
597+
[SemanticAttributes.QUEUE]: messageQueue,
598+
[SemanticAttributes.PARENT_QUEUE]: parentQueue,
599+
});
600+
601+
const messageData = await this.#trace(
602+
"callDequeueMessage",
603+
async (dequeueSpan) => {
604+
dequeueSpan.setAttributes({
605+
[SemanticAttributes.QUEUE]: messageQueue,
606+
[SemanticAttributes.PARENT_QUEUE]: parentQueue,
607+
});
608+
609+
return await this.#callDequeueMessage({
610+
messageQueue,
611+
parentQueue,
612+
});
613+
},
614+
{
615+
kind: SpanKind.CONSUMER,
616+
attributes: {
617+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
618+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
619+
},
620+
}
621+
);
622+
623+
if (!messageData) {
624+
return null; // Try next queue if no message was dequeued
625+
}
626+
627+
const message = await this.readMessage(messageData.messageId);
628+
629+
if (message) {
630+
const attributes = {
631+
[SEMATTRS_MESSAGE_ID]: message.messageId,
632+
[SemanticAttributes.QUEUE]: message.queue,
633+
[SemanticAttributes.MESSAGE_ID]: message.messageId,
634+
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
635+
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
636+
attempted_queues: attemptedQueues, // How many queues we tried before success
637+
attempted_envs: attemptedEnvs, // How many environments we tried before success
638+
message_timestamp: message.timestamp,
639+
message_age: this.#calculateMessageAge(message),
640+
message_priority: message.priority,
641+
message_enqueue_method: message.enqueueMethod,
642+
message_available_at: message.availableAt,
643+
...flattenAttributes(message.data, "message.data"),
644+
};
645+
646+
span.setAttributes(attributes);
647+
innerSpan.setAttributes(attributes);
648+
649+
await this.#trace(
650+
"messageDequeued",
651+
async (subscriberSpan) => {
652+
subscriberSpan.setAttributes({
653+
[SemanticAttributes.MESSAGE_ID]: message.messageId,
654+
[SemanticAttributes.QUEUE]: message.queue,
655+
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
656+
});
657+
658+
return await this.options.subscriber?.messageDequeued(message);
659+
},
660+
{
661+
kind: SpanKind.INTERNAL,
662+
attributes: {
663+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
664+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
665+
},
666+
}
667+
);
668+
669+
await this.#trace(
670+
"startHeartbeat",
671+
async (heartbeatSpan) => {
672+
heartbeatSpan.setAttributes({
673+
[SemanticAttributes.MESSAGE_ID]: messageData.messageId,
674+
visibility_timeout_ms: this.visibilityTimeoutInMs,
675+
});
676+
677+
return await this.options.visibilityTimeoutStrategy.startHeartbeat(
678+
messageData.messageId,
679+
this.visibilityTimeoutInMs
680+
);
681+
},
682+
{
683+
kind: SpanKind.INTERNAL,
684+
attributes: {
685+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
686+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
687+
},
688+
}
689+
);
690+
691+
return message;
692+
}
693+
} catch (error) {
694+
// Log error but continue trying other queues
695+
logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, {
696+
error,
697+
});
698+
return null;
699+
}
700+
},
701+
{
702+
kind: SpanKind.CONSUMER,
703+
attributes: {
704+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
705+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
706+
},
600707
}
708+
);
601709

602-
const message = await this.readMessage(messageData.messageId);
603-
604-
if (message) {
605-
span.setAttributes({
606-
[SEMATTRS_MESSAGE_ID]: message.messageId,
607-
[SemanticAttributes.QUEUE]: message.queue,
608-
[SemanticAttributes.MESSAGE_ID]: message.messageId,
609-
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
610-
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
611-
attempted_queues: attemptedQueues, // How many queues we tried before success
612-
attempted_envs: attemptedEnvs, // How many environments we tried before success
613-
message_timestamp: message.timestamp,
614-
message_age: this.#calculateMessageAge(message),
615-
message_priority: message.priority,
616-
message_enqueue_method: message.enqueueMethod,
617-
message_available_at: message.availableAt,
618-
...flattenAttributes(message.data, "message.data"),
619-
});
620-
621-
await this.options.subscriber?.messageDequeued(message);
622-
623-
await this.options.visibilityTimeoutStrategy.startHeartbeat(
624-
messageData.messageId,
625-
this.visibilityTimeoutInMs
626-
);
627-
628-
return message;
629-
}
630-
} catch (error) {
631-
// Log error but continue trying other queues
632-
logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { error });
633-
continue;
710+
if (result) {
711+
return result;
634712
}
635713
}
636714
}

0 commit comments

Comments
 (0)