Skip to content

Commit ea54b8a

Browse files
committed
More spans
1 parent f5eb541 commit ea54b8a

File tree

1 file changed

+60
-8
lines changed

1 file changed

+60
-8
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -598,10 +598,27 @@ export class MarQS {
598598
[SemanticAttributes.PARENT_QUEUE]: parentQueue,
599599
});
600600

601-
const messageData = await this.#callDequeueMessage({
602-
messageQueue,
603-
parentQueue,
604-
});
601+
const messageData = await this.#trace(
602+
"callDequeueMessage",
603+
async (dequeueSpan) => {
604+
dequeueSpan.setAttributes({
605+
[SemanticAttributes.QUEUE]: messageQueue,
606+
[SemanticAttributes.PARENT_QUEUE]: parentQueue,
607+
});
608+
609+
return await this.#callDequeueMessage({
610+
messageQueue,
611+
parentQueue,
612+
});
613+
},
614+
{
615+
kind: SpanKind.CONSUMER,
616+
attributes: {
617+
[SEMATTRS_MESSAGING_OPERATION]: "dequeue",
618+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
619+
},
620+
}
621+
);
605622

606623
if (!messageData) {
607624
return null; // Try next queue if no message was dequeued
@@ -629,11 +646,46 @@ export class MarQS {
629646
span.setAttributes(attributes);
630647
innerSpan.setAttributes(attributes);
631648

632-
await this.options.subscriber?.messageDequeued(message);
649+
await this.#trace(
650+
"messageDequeued",
651+
async (subscriberSpan) => {
652+
subscriberSpan.setAttributes({
653+
[SemanticAttributes.MESSAGE_ID]: message.messageId,
654+
[SemanticAttributes.QUEUE]: message.queue,
655+
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
656+
});
657+
658+
return await this.options.subscriber?.messageDequeued(message);
659+
},
660+
{
661+
kind: SpanKind.INTERNAL,
662+
attributes: {
663+
[SEMATTRS_MESSAGING_OPERATION]: "notify",
664+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
665+
},
666+
}
667+
);
633668

634-
await this.options.visibilityTimeoutStrategy.startHeartbeat(
635-
messageData.messageId,
636-
this.visibilityTimeoutInMs
669+
await this.#trace(
670+
"startHeartbeat",
671+
async (heartbeatSpan) => {
672+
heartbeatSpan.setAttributes({
673+
[SemanticAttributes.MESSAGE_ID]: messageData.messageId,
674+
visibility_timeout_ms: this.visibilityTimeoutInMs,
675+
});
676+
677+
return await this.options.visibilityTimeoutStrategy.startHeartbeat(
678+
messageData.messageId,
679+
this.visibilityTimeoutInMs
680+
);
681+
},
682+
{
683+
kind: SpanKind.INTERNAL,
684+
attributes: {
685+
[SEMATTRS_MESSAGING_OPERATION]: "heartbeat",
686+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
687+
},
688+
}
637689
);
638690

639691
return message;

0 commit comments

Comments
 (0)