Skip to content

Commit ef8c3f4

Browse files
committed
feat(instrumentation-kafkajs): update semantic conventions
Signed-off-by: Brian Phillips <[email protected]>
1 parent b520d04 commit ef8c3f4

File tree

6 files changed

+430
-93
lines changed

6 files changed

+430
-93
lines changed

package-lock.json

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/node/instrumentation-kafkajs/README.md

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ npm install --save @opentelemetry/instrumentation-kafkajs
2323

2424
```js
2525
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
26-
const { KafkaJsInstrumentation } = require('@opentelemetry/instrumentation-kafkajs');
26+
const {
27+
KafkaJsInstrumentation,
28+
} = require('@opentelemetry/instrumentation-kafkajs');
2729
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
2830

2931
const provider = new NodeTracerProvider();
@@ -42,22 +44,28 @@ registerInstrumentations({
4244

4345
You can set the following:
4446

45-
| Options | Type | Description |
46-
| ---------------------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------------------- |
47-
| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. |
48-
| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. |
47+
| Options | Type | Description |
48+
| -------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------- |
49+
| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. |
50+
| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. |
4951

5052
## Semantic Conventions
5153

52-
This package uses `@opentelemetry/semantic-conventions` version `1.24+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)
54+
This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which implements Semantic Convention [Version 1.30.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.30.0/docs/README.md)
5355

5456
Attributes collected:
5557

56-
| Attribute | Short Description |
57-
| -----------------------------| ----------------------------------------------------- |
58-
| `messaging.system` | An identifier for the messaging system being used. |
59-
| `messaging.destination` | The message destination name. |
60-
| `messaging.operation` | A string identifying the kind of messaging operation. |
58+
| Attribute | Short Description |
59+
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
60+
| `messaging.system` | An identifier for the messaging system being used (i.e. `"kafka"`). |
61+
| `messaging.destination.name` | The message destination name. |
62+
| `messaging.operation.type` | A string identifying the type of messaging operation. |
63+
| `messaging.operation.name` | The system-specific name of the messaging operation. |
64+
| `messaging.operation.name` | The system-specific name of the messaging operation. |
65+
| `messaging.kafka.message.key` | A stringified value representing the key of the Kafka message (if present). |
66+
| `messaging.kafka.message.tombstone` | A boolean that is true if the message is a tombstone. |
67+
| `messaging.kafka.offset` | The offset of a record in the corresponding Kafka partition. |
68+
| `messaging.destination.partition.id` | The identifier of the partition messages are sent to or received from, unique within the `messaging.destination.name`. **Note:** only available on producer spans. |
6169

6270
## Useful links
6371

plugins/node/instrumentation-kafkajs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
},
5757
"dependencies": {
5858
"@opentelemetry/instrumentation": "^0.57.2",
59-
"@opentelemetry/semantic-conventions": "^1.27.0"
59+
"@opentelemetry/semantic-conventions": "^1.30.0"
6060
},
6161
"homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/instrumentation-kafkajs#readme"
6262
}

plugins/node/instrumentation-kafkajs/src/instrumentation.ts

Lines changed: 111 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,24 @@ import {
2424
trace,
2525
context,
2626
ROOT_CONTEXT,
27+
Attributes,
2728
} from '@opentelemetry/api';
29+
import { ATTR_ERROR_TYPE } from '@opentelemetry/semantic-conventions';
2830
import {
29-
MESSAGINGOPERATIONVALUES_PROCESS,
30-
MESSAGINGOPERATIONVALUES_RECEIVE,
31-
SEMATTRS_MESSAGING_SYSTEM,
32-
SEMATTRS_MESSAGING_DESTINATION,
33-
SEMATTRS_MESSAGING_OPERATION,
34-
} from '@opentelemetry/semantic-conventions';
31+
ATTR_MESSAGING_BATCH_MESSAGE_COUNT,
32+
ATTR_MESSAGING_DESTINATION_NAME,
33+
ATTR_MESSAGING_DESTINATION_PARTITION_ID,
34+
ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE,
35+
ATTR_MESSAGING_KAFKA_OFFSET,
36+
ATTR_MESSAGING_KAFKA_MESSAGE_KEY,
37+
ATTR_MESSAGING_OPERATION_NAME,
38+
ATTR_MESSAGING_OPERATION_TYPE,
39+
ATTR_MESSAGING_SYSTEM,
40+
MESSAGING_OPERATION_TYPE_VALUE_PROCESS,
41+
MESSAGING_OPERATION_TYPE_VALUE_RECEIVE,
42+
MESSAGING_OPERATION_TYPE_VALUE_SEND,
43+
MESSAGING_SYSTEM_VALUE_KAFKA,
44+
} from './semconv';
3545
import type * as kafkaJs from 'kafkajs';
3646
import type {
3747
EachBatchHandler,
@@ -54,6 +64,15 @@ import {
5464
isWrapped,
5565
} from '@opentelemetry/instrumentation';
5666

67+
interface ConsumerSpanOptions {
68+
topic: string;
69+
message: KafkaMessage | undefined;
70+
operationType: string;
71+
attributes?: Attributes;
72+
context?: Context | undefined;
73+
link?: Link;
74+
}
75+
5776
export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumentationConfig> {
5877
constructor(config: KafkaJsInstrumentationConfig = {}) {
5978
super(PACKAGE_NAME, PACKAGE_VERSION, config);
@@ -194,12 +213,17 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
194213
payload.message.headers,
195214
bufferTextMapGetter
196215
);
197-
const span = instrumentation._startConsumerSpan(
198-
payload.topic,
199-
payload.message,
200-
MESSAGINGOPERATIONVALUES_PROCESS,
201-
propagatedContext
202-
);
216+
const span = instrumentation._startConsumerSpan({
217+
topic: payload.topic,
218+
message: payload.message,
219+
operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS,
220+
context: propagatedContext,
221+
attributes: {
222+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
223+
payload.partition
224+
),
225+
},
226+
});
203227

204228
const eachMessagePromise = context.with(
205229
trace.setSpan(propagatedContext, span),
@@ -221,12 +245,18 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
221245
): Promise<void> {
222246
const payload = args[0];
223247
// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
224-
const receivingSpan = instrumentation._startConsumerSpan(
225-
payload.batch.topic,
226-
undefined,
227-
MESSAGINGOPERATIONVALUES_RECEIVE,
228-
ROOT_CONTEXT
229-
);
248+
const receivingSpan = instrumentation._startConsumerSpan({
249+
topic: payload.batch.topic,
250+
message: undefined,
251+
operationType: MESSAGING_OPERATION_TYPE_VALUE_RECEIVE,
252+
context: ROOT_CONTEXT,
253+
attributes: {
254+
[ATTR_MESSAGING_BATCH_MESSAGE_COUNT]: payload.batch.messages.length,
255+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
256+
payload.batch.partition
257+
),
258+
},
259+
});
230260
return context.with(
231261
trace.setSpan(context.active(), receivingSpan),
232262
() => {
@@ -246,13 +276,17 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
246276
context: spanContext,
247277
};
248278
}
249-
return instrumentation._startConsumerSpan(
250-
payload.batch.topic,
279+
return instrumentation._startConsumerSpan({
280+
topic: payload.batch.topic,
251281
message,
252-
MESSAGINGOPERATIONVALUES_PROCESS,
253-
undefined,
254-
origSpanLink
255-
);
282+
operationType: MESSAGING_OPERATION_TYPE_VALUE_PROCESS,
283+
link: origSpanLink,
284+
attributes: {
285+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: String(
286+
payload.batch.partition
287+
),
288+
},
289+
});
256290
}
257291
);
258292
const batchMessagePromise: Promise<void> = original!.apply(
@@ -324,19 +358,24 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
324358
return Promise.resolve(sendPromise)
325359
.catch(reason => {
326360
let errorMessage: string;
327-
if (typeof reason === 'string') errorMessage = reason;
328-
else if (
361+
let errorType: string | undefined;
362+
if (typeof reason === 'string') {
363+
errorMessage = reason;
364+
} else if (
329365
typeof reason === 'object' &&
330366
Object.prototype.hasOwnProperty.call(reason, 'message')
331-
)
367+
) {
332368
errorMessage = reason.message;
369+
errorType = reason.constructor.name;
370+
}
333371

334-
spans.forEach(span =>
372+
spans.forEach(span => {
373+
if (errorType) span.setAttribute(ATTR_ERROR_TYPE, errorType);
335374
span.setStatus({
336375
code: SpanStatusCode.ERROR,
337376
message: errorMessage,
338-
})
339-
);
377+
});
378+
});
340379

341380
throw reason;
342381
})
@@ -345,21 +384,38 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
345384
});
346385
}
347386

348-
private _startConsumerSpan(
349-
topic: string,
350-
message: KafkaMessage | undefined,
351-
operation: string,
352-
context: Context | undefined,
353-
link?: Link
354-
) {
387+
private _startConsumerSpan({
388+
topic,
389+
message,
390+
operationType,
391+
context,
392+
link,
393+
attributes = {},
394+
}: ConsumerSpanOptions) {
395+
const operationName =
396+
operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE
397+
? 'poll' // for batch processing spans
398+
: operationType; // for individual message processing spans
399+
355400
const span = this.tracer.startSpan(
356-
topic,
401+
`${operationName} ${topic}`,
357402
{
358-
kind: SpanKind.CONSUMER,
403+
kind:
404+
operationType === MESSAGING_OPERATION_TYPE_VALUE_RECEIVE
405+
? SpanKind.CLIENT
406+
: SpanKind.CONSUMER,
359407
attributes: {
360-
[SEMATTRS_MESSAGING_SYSTEM]: 'kafka',
361-
[SEMATTRS_MESSAGING_DESTINATION]: topic,
362-
[SEMATTRS_MESSAGING_OPERATION]: operation,
408+
...attributes,
409+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
410+
[ATTR_MESSAGING_DESTINATION_NAME]: topic,
411+
[ATTR_MESSAGING_OPERATION_TYPE]: operationType,
412+
[ATTR_MESSAGING_OPERATION_NAME]: operationName,
413+
[ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message?.key
414+
? String(message.key)
415+
: undefined,
416+
[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]:
417+
message?.key && message?.value === null ? true : undefined,
418+
[ATTR_MESSAGING_KAFKA_OFFSET]: message?.offset,
363419
},
364420
links: link ? [link] : [],
365421
},
@@ -381,11 +437,21 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
381437
}
382438

383439
private _startProducerSpan(topic: string, message: Message) {
384-
const span = this.tracer.startSpan(topic, {
440+
const span = this.tracer.startSpan(`send ${topic}`, {
385441
kind: SpanKind.PRODUCER,
386442
attributes: {
387-
[SEMATTRS_MESSAGING_SYSTEM]: 'kafka',
388-
[SEMATTRS_MESSAGING_DESTINATION]: topic,
443+
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_KAFKA,
444+
[ATTR_MESSAGING_DESTINATION_NAME]: topic,
445+
[ATTR_MESSAGING_KAFKA_MESSAGE_KEY]: message.key
446+
? String(message.key)
447+
: undefined,
448+
[ATTR_MESSAGING_KAFKA_MESSAGE_TOMBSTONE]:
449+
message.key && message.value === null ? true : undefined,
450+
[ATTR_MESSAGING_DESTINATION_PARTITION_ID]: message.partition
451+
? String(message.partition)
452+
: undefined,
453+
[ATTR_MESSAGING_OPERATION_NAME]: 'send',
454+
[ATTR_MESSAGING_OPERATION_TYPE]: MESSAGING_OPERATION_TYPE_VALUE_SEND,
389455
},
390456
});
391457

0 commit comments

Comments
 (0)