Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 9b6e0c0

Browse files
author
Amir Blum
authored
fix(kafkajs): make changes to conform to MessagingSystems spec (#48)
1 parent 5866c65 commit 9b6e0c0

File tree

5 files changed

+74
-40
lines changed

5 files changed

+74
-40
lines changed

packages/plugin-kafkajs/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"dependencies": {
4343
"@opentelemetry/api": "^0.13.0",
4444
"@opentelemetry/core": "^0.13.0",
45+
"@opentelemetry/semantic-conventions": "^0.14.0",
4546
"shimmer": "^1.2.1"
4647
},
4748
"jest": {

packages/plugin-kafkajs/src/enums.ts

Lines changed: 0 additions & 6 deletions
This file was deleted.

packages/plugin-kafkajs/src/kafkajs.ts

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
11
import { BasePlugin } from '@opentelemetry/core';
2-
import {
3-
SpanKind,
4-
Span,
5-
StatusCode,
6-
Context,
7-
propagation,
8-
Link,
9-
defaultTextMapGetter,
10-
getActiveSpan,
11-
} from '@opentelemetry/api';
2+
import { SpanKind, Span, StatusCode, Context, propagation, Link, getActiveSpan } from '@opentelemetry/api';
123
import { ROOT_CONTEXT } from '@opentelemetry/context-base';
4+
import { MessagingAttribute, MessagingOperationName } from '@opentelemetry/semantic-conventions';
135
import * as shimmer from 'shimmer';
146
import * as kafkaJs from 'kafkajs';
157
import {
@@ -22,12 +14,11 @@ import {
2214
EachMessagePayload,
2315
KafkaMessage,
2416
EachBatchPayload,
25-
IHeaders,
2617
Consumer,
2718
} from 'kafkajs';
2819
import { KafkaJsPluginConfig } from './types';
29-
import { AttributeNames } from './enums';
3020
import { VERSION } from './version';
21+
import { bufferTextMapGetter } from './propagtor';
3122

3223
export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
3324
protected _config!: KafkaJsPluginConfig;
@@ -88,10 +79,15 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
8879
return function (payload: EachMessagePayload): Promise<void> {
8980
const propagatedContext: Context = propagation.extract(
9081
payload.message.headers,
91-
defaultTextMapGetter,
82+
bufferTextMapGetter,
9283
ROOT_CONTEXT
9384
);
94-
const span = thisPlugin._startConsumerSpan(payload.topic, payload.message, propagatedContext);
85+
const span = thisPlugin._startConsumerSpan(
86+
payload.topic,
87+
payload.message,
88+
MessagingOperationName.PROCESS,
89+
propagatedContext
90+
);
9591

9692
const eachMessagePromise = thisPlugin._tracer.withSpan(span, () => {
9793
return original.apply(this, arguments);
@@ -104,12 +100,17 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
104100
const thisPlugin = this;
105101
return function (payload: EachBatchPayload): Promise<void> {
106102
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
107-
const receivingSpan = thisPlugin._startConsumerSpan(payload.batch.topic, undefined, ROOT_CONTEXT);
103+
const receivingSpan = thisPlugin._startConsumerSpan(
104+
payload.batch.topic,
105+
undefined,
106+
MessagingOperationName.RECEIVE,
107+
ROOT_CONTEXT
108+
);
108109
return thisPlugin._tracer.withSpan(receivingSpan, () => {
109110
const spans = payload.batch.messages.map((message: KafkaMessage) => {
110111
const propagatedContext: Context = propagation.extract(
111112
message.headers,
112-
defaultTextMapGetter,
113+
bufferTextMapGetter,
113114
ROOT_CONTEXT
114115
);
115116
const spanContext = getActiveSpan(propagatedContext)?.context();
@@ -119,7 +120,13 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
119120
context: spanContext,
120121
};
121122
}
122-
return thisPlugin._startConsumerSpan(payload.batch.topic, message, undefined, origSpanLink);
123+
return thisPlugin._startConsumerSpan(
124+
payload.batch.topic,
125+
message,
126+
MessagingOperationName.PROCESS,
127+
undefined,
128+
origSpanLink
129+
);
123130
});
124131
const batchMessagePromise: Promise<void> = original.apply(this, arguments);
125132
spans.unshift(receivingSpan);
@@ -173,15 +180,22 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
173180
});
174181
}
175182

176-
private _startConsumerSpan(topic: string, message: KafkaMessage, context: Context, link?: Link) {
183+
private _startConsumerSpan(
184+
topic: string,
185+
message: KafkaMessage,
186+
operation: string,
187+
context: Context,
188+
link?: Link
189+
) {
177190
const span = this._tracer.startSpan(
178191
topic,
179192
{
180193
kind: SpanKind.CONSUMER,
181194
attributes: {
182-
[AttributeNames.MESSAGING_SYSTEM]: 'kafka',
183-
[AttributeNames.MESSAGING_DESTINATION]: topic,
184-
[AttributeNames.MESSAGING_DESTINATIONKIND]: 'topic',
195+
[MessagingAttribute.MESSAGING_SYSTEM]: 'kafka',
196+
[MessagingAttribute.MESSAGING_DESTINATION]: topic,
197+
[MessagingAttribute.MESSAGING_DESTINATION_KIND]: 'topic',
198+
[MessagingAttribute.MESSAGING_OPERATION]: operation,
185199
},
186200
links: link ? [link] : [],
187201
},
@@ -199,9 +213,9 @@ export class KafkaJsPlugin extends BasePlugin<typeof kafkaJs> {
199213
const span = this._tracer.startSpan(topic, {
200214
kind: SpanKind.PRODUCER,
201215
attributes: {
202-
[AttributeNames.MESSAGING_SYSTEM]: 'kafka',
203-
[AttributeNames.MESSAGING_DESTINATION]: topic,
204-
[AttributeNames.MESSAGING_DESTINATIONKIND]: 'topic',
216+
[MessagingAttribute.MESSAGING_SYSTEM]: 'kafka',
217+
[MessagingAttribute.MESSAGING_DESTINATION]: topic,
218+
[MessagingAttribute.MESSAGING_DESTINATION_KIND]: 'topic',
205219
},
206220
});
207221

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { TextMapGetter } from "@opentelemetry/api";
2+
3+
/*
4+
same as open telemetry's `defaultTextMapGetter`,
5+
but also handle case where header is buffer,
6+
adding toString() to make sure string is returned
7+
*/
8+
export const bufferTextMapGetter: TextMapGetter = {
9+
get(carrier, key) {
10+
return carrier?.[key]?.toString();
11+
},
12+
13+
keys(carrier) {
14+
return carrier ? Object.keys(carrier) : [];
15+
},
16+
};
17+

packages/plugin-kafkajs/test/kafkajs.spec.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { NodeTracerProvider } from '@opentelemetry/node';
55
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
66
import { ContextManager } from '@opentelemetry/context-base';
77
import { context, propagation, SpanKind, StatusCode, Span } from '@opentelemetry/api';
8+
import { MessagingAttribute } from '@opentelemetry/semantic-conventions';
89
import * as kafkajs from 'kafkajs';
910
import {
1011
Kafka,
@@ -19,7 +20,6 @@ import {
1920
EachMessagePayload,
2021
KafkaMessage,
2122
} from 'kafkajs';
22-
import { AttributeNames } from '../src/enums';
2323
import { DummyPropagation } from './DummyPropagation';
2424

2525
describe('plugin-kafkajs', () => {
@@ -128,9 +128,9 @@ describe('plugin-kafkajs', () => {
128128
expect(span.kind).toStrictEqual(SpanKind.PRODUCER);
129129
expect(span.name).toStrictEqual('topic-name-1');
130130
expect(span.status.code).toStrictEqual(StatusCode.UNSET);
131-
expect(span.attributes[AttributeNames.MESSAGING_SYSTEM]).toStrictEqual('kafka');
132-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATIONKIND]).toStrictEqual('topic');
133-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
131+
expect(span.attributes[MessagingAttribute.MESSAGING_SYSTEM]).toStrictEqual('kafka');
132+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION_KIND]).toStrictEqual('topic');
133+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
134134

135135
expect(messagesSent.length).toBe(1);
136136
expectKafkaHeadersToMatchSpanContext(messagesSent[0], span);
@@ -406,9 +406,10 @@ describe('plugin-kafkajs', () => {
406406
expect(span.parentSpanId).toBeUndefined();
407407
expect(span.kind).toStrictEqual(SpanKind.CONSUMER);
408408
expect(span.status.code).toStrictEqual(StatusCode.UNSET);
409-
expect(span.attributes[AttributeNames.MESSAGING_SYSTEM]).toStrictEqual('kafka');
410-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATIONKIND]).toStrictEqual('topic');
411-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
409+
expect(span.attributes[MessagingAttribute.MESSAGING_SYSTEM]).toStrictEqual('kafka');
410+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION_KIND]).toStrictEqual('topic');
411+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
412+
expect(span.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('process');
412413
});
413414
});
414415

@@ -569,14 +570,21 @@ describe('plugin-kafkajs', () => {
569570
spans.forEach((span) => {
570571
expect(span.name).toStrictEqual('topic-name-1');
571572
expect(span.status.code).toStrictEqual(StatusCode.UNSET);
572-
expect(span.attributes[AttributeNames.MESSAGING_SYSTEM]).toStrictEqual('kafka');
573-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATIONKIND]).toStrictEqual('topic');
574-
expect(span.attributes[AttributeNames.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
573+
expect(span.attributes[MessagingAttribute.MESSAGING_SYSTEM]).toStrictEqual('kafka');
574+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION_KIND]).toStrictEqual('topic');
575+
expect(span.attributes[MessagingAttribute.MESSAGING_DESTINATION]).toStrictEqual('topic-name-1');
575576
});
577+
576578
const [recvSpan, msg1Span, msg2Span] = spans;
579+
577580
expect(recvSpan.parentSpanId).toBeUndefined();
581+
expect(recvSpan.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('receive');
582+
578583
expect(msg1Span.parentSpanId).toStrictEqual(recvSpan.spanContext.spanId);
584+
expect(msg1Span.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('process');
585+
579586
expect(msg2Span.parentSpanId).toStrictEqual(recvSpan.spanContext.spanId);
587+
expect(msg2Span.attributes[MessagingAttribute.MESSAGING_OPERATION]).toStrictEqual('process');
580588
});
581589
});
582590
});

0 commit comments

Comments
 (0)