Skip to content

Commit 612963e

Browse files
feat(instrumentation-amqp): add stable consume span name
1 parent 6249c9b commit 612963e

File tree

3 files changed

+229
-13
lines changed

3 files changed

+229
-13
lines changed

packages/instrumentation-amqplib/src/amqplib.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import {
5858
getConnectionAttributesFromServer,
5959
getConnectionAttributesFromUrl,
6060
getConsumeAttributes,
61+
getConsumeSpanName,
6162
getPublishAttributes,
6263
getPublishSpanName,
6364
InstrumentationConsumeChannel,
@@ -436,7 +437,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<AmqplibInstrumen
436437
}
437438
}
438439
const span = self.tracer.startSpan(
439-
`${queue} process`,
440+
getConsumeSpanName(queue, msg, self._semconvStability),
440441
{
441442
kind: SpanKind.CONSUMER,
442443
attributes: {

packages/instrumentation-amqplib/src/utils.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,21 @@ const getPublishDestinationName = (
263263
const normalizeExchange = (exchangeName: string) =>
264264
exchangeName !== '' ? exchangeName : '<default>';
265265

266+
export const getConsumeSpanName = (
267+
queue: string,
268+
msg: amqp.ConsumeMessage,
269+
semconvStability: SemconvStability
270+
): string => {
271+
if (semconvStability & SemconvStability.STABLE) {
272+
return `consume ${getConsumeDestinationName(
273+
msg.fields?.exchange,
274+
msg.fields?.routingKey,
275+
queue
276+
)}`;
277+
}
278+
return `${queue} process`;
279+
};
280+
266281
export const getConsumeAttributes = (
267282
queue: string,
268283
msg: amqp.ConsumeMessage,
@@ -306,18 +321,12 @@ const getConsumeDestinationName = (
306321
routingKey: string,
307322
queue: string
308323
): string => {
309-
if (exchange && routingKey && queue) {
310-
return routingKey === queue
311-
? `${exchange}:${routingKey}`
312-
: `${exchange}:${routingKey}:${queue}`;
313-
}
314-
if (exchange && routingKey) return `${exchange}:${routingKey}`;
315-
if (exchange && queue) return `${exchange}:${queue}`;
316-
if (routingKey && queue) return `${routingKey}:${queue}`;
317-
if (exchange) return exchange;
318-
if (routingKey) return routingKey;
319-
if (queue) return queue;
320-
return 'amq.default';
324+
const parts: string[] = [];
325+
if (exchange && !parts.includes(exchange)) parts.push(exchange);
326+
if (routingKey && !parts.includes(routingKey)) parts.push(routingKey);
327+
if (queue && !parts.includes(queue)) parts.push(queue);
328+
329+
return parts.length ? parts.join(':') : 'amq.default';
321330
};
322331

323332
export const markConfirmChannelTracing = (context: Context) => {

packages/instrumentation-amqplib/test/utils.test.ts

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
getConnectionAttributesFromServer,
2020
getConnectionAttributesFromUrl,
2121
getConsumeAttributes,
22+
getConsumeSpanName,
2223
getPublishAttributes,
2324
getPublishSpanName,
2425
} from '../src/utils';
@@ -803,6 +804,211 @@ describe('utils', () => {
803804
});
804805
});
805806

807+
describe('getConsumeSpanName', () => {
808+
describe('Old attributes', () => {
809+
it('should return "<queue> process" with a basic queue name', () => {
810+
expect(
811+
getConsumeSpanName(
812+
'test-queue',
813+
{} as amqp.ConsumeMessage,
814+
SemconvStability.OLD
815+
)
816+
).toBe('test-queue process');
817+
});
818+
819+
it('should handle empty queue', () => {
820+
expect(
821+
getConsumeSpanName(
822+
'',
823+
{} as amqp.ConsumeMessage,
824+
SemconvStability.OLD
825+
)
826+
).toBe(' process');
827+
});
828+
829+
it('should allow special characters in queue name', () => {
830+
expect(
831+
getConsumeSpanName(
832+
'queue.with-special_chars',
833+
{} as amqp.ConsumeMessage,
834+
SemconvStability.OLD
835+
)
836+
).toBe('queue.with-special_chars process');
837+
});
838+
839+
it('should handle long queue name', () => {
840+
expect(
841+
getConsumeSpanName(
842+
'very-long-queue-name-with-many-characters',
843+
{} as amqp.ConsumeMessage,
844+
SemconvStability.OLD
845+
)
846+
).toBe('very-long-queue-name-with-many-characters process');
847+
});
848+
849+
it('should allow underscores in queue name', () => {
850+
expect(
851+
getConsumeSpanName(
852+
'my_queue_name',
853+
{} as amqp.ConsumeMessage,
854+
SemconvStability.OLD
855+
)
856+
).toBe('my_queue_name process');
857+
});
858+
});
859+
860+
describe('Stable attributes', () => {
861+
it('should return "consume <queue>" with a queue', () => {
862+
const msg = {
863+
fields: { exchange: '', routingKey: '' },
864+
} as amqp.ConsumeMessage;
865+
expect(
866+
getConsumeSpanName('my-special-queue', msg, SemconvStability.STABLE)
867+
).toBe('consume my-special-queue');
868+
});
869+
870+
it('should use "amq.default" when queue is empty and fields empty', () => {
871+
const msg = {
872+
fields: { exchange: '', routingKey: '' },
873+
} as amqp.ConsumeMessage;
874+
expect(getConsumeSpanName('', msg, SemconvStability.STABLE)).toBe(
875+
'consume amq.default'
876+
);
877+
});
878+
879+
it('should return "consume exchange:routingKey" when queue and routingKey equals and exchange is present', () => {
880+
const msg = {
881+
fields: { exchange: 'a', routingKey: 'b' },
882+
} as amqp.ConsumeMessage;
883+
expect(getConsumeSpanName('b', msg, SemconvStability.STABLE)).toBe(
884+
'consume a:b'
885+
);
886+
});
887+
888+
it('should return "consume queue" when queue==routingKey and no exchange', () => {
889+
const msg = {
890+
fields: { exchange: '', routingKey: 'c' },
891+
} as amqp.ConsumeMessage;
892+
expect(getConsumeSpanName('c', msg, SemconvStability.STABLE)).toBe(
893+
'consume c'
894+
);
895+
});
896+
897+
it('should return "consume exchange" when exchange==queue==routingKey', () => {
898+
const msg = {
899+
fields: { exchange: 'x', routingKey: 'x' },
900+
} as amqp.ConsumeMessage;
901+
expect(getConsumeSpanName('x', msg, SemconvStability.STABLE)).toBe(
902+
'consume x'
903+
);
904+
});
905+
906+
it('should return "consume exchange:routingKey:queue" when all are different', () => {
907+
const msg = {
908+
fields: { exchange: 'test-exchange', routingKey: 'routing.key' },
909+
} as amqp.ConsumeMessage;
910+
expect(
911+
getConsumeSpanName('different-queue', msg, SemconvStability.STABLE)
912+
).toBe('consume test-exchange:routing.key:different-queue');
913+
});
914+
915+
it('should return "consume exchange:routingKey" when queue is empty', () => {
916+
const msg = {
917+
fields: { exchange: 'ex', routingKey: 'rk' },
918+
} as amqp.ConsumeMessage;
919+
expect(getConsumeSpanName('', msg, SemconvStability.STABLE)).toBe(
920+
'consume ex:rk'
921+
);
922+
});
923+
924+
it('should return "consume exchange:queue" when routingKey is empty', () => {
925+
const msg = {
926+
fields: { exchange: 'ex', routingKey: '' },
927+
} as amqp.ConsumeMessage;
928+
expect(getConsumeSpanName('q', msg, SemconvStability.STABLE)).toBe(
929+
'consume ex:q'
930+
);
931+
});
932+
933+
it('should return "consume routingKey:queue" when exchange is empty', () => {
934+
const msg = {
935+
fields: { exchange: '', routingKey: 'r' },
936+
} as amqp.ConsumeMessage;
937+
expect(getConsumeSpanName('q', msg, SemconvStability.STABLE)).toBe(
938+
'consume r:q'
939+
);
940+
});
941+
942+
it('should allow special characters in queue name', () => {
943+
const msg = {
944+
fields: { exchange: '', routingKey: '' },
945+
} as amqp.ConsumeMessage;
946+
expect(
947+
getConsumeSpanName(
948+
'queue.with-special_chars',
949+
msg,
950+
SemconvStability.STABLE
951+
)
952+
).toBe('consume queue.with-special_chars');
953+
});
954+
955+
it('should handle long queue names', () => {
956+
const msg = {
957+
fields: { exchange: '', routingKey: '' },
958+
} as amqp.ConsumeMessage;
959+
expect(
960+
getConsumeSpanName(
961+
'very-long-queue-name-with-many-characters',
962+
msg,
963+
SemconvStability.STABLE
964+
)
965+
).toBe('consume very-long-queue-name-with-many-characters');
966+
});
967+
968+
it('should handle numeric queue name', () => {
969+
expect(
970+
getConsumeSpanName(
971+
'123456',
972+
{} as amqp.ConsumeMessage,
973+
SemconvStability.STABLE
974+
)
975+
).toBe('consume 123456');
976+
});
977+
978+
it('should handle dotted queue name', () => {
979+
expect(
980+
getConsumeSpanName(
981+
'app.service-worker.queue',
982+
{} as amqp.ConsumeMessage,
983+
SemconvStability.STABLE
984+
)
985+
).toBe('consume app.service-worker.queue');
986+
});
987+
988+
it('should handle queue name with unicode', () => {
989+
expect(
990+
getConsumeSpanName(
991+
'queue-émojis-🚀',
992+
{} as amqp.ConsumeMessage,
993+
SemconvStability.STABLE
994+
)
995+
).toBe('consume queue-émojis-🚀');
996+
});
997+
});
998+
999+
describe('Both old and stable attributes', () => {
1000+
it('should prioritize stable format over old format', () => {
1001+
const spanName = getConsumeSpanName(
1002+
'my-queue',
1003+
{} as amqp.ConsumeMessage,
1004+
SemconvStability.OLD | SemconvStability.STABLE
1005+
);
1006+
expect(spanName).toBe('consume my-queue');
1007+
expect(spanName).not.toBe('my-queue process');
1008+
});
1009+
});
1010+
});
1011+
8061012
describe('getConsumeAttributes', () => {
8071013
describe('Old attributes', () => {
8081014
it('should return minimal attributes', () => {

0 commit comments

Comments
 (0)