Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/component-label-map.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ pkg:instrumentation-ioredis:
- plugins/node/opentelemetry-instrumentation-ioredis/**
- packages/opentelemetry-test-utils/**
- packages/opentelemetry-redis-common/**
pkg:instrumentation-kafkajs:
- changed-files:
- any-glob-to-any-file:
- plugins/node/instrumentation-kafkajs/**
- packages/opentelemetry-test-utils/**
pkg:instrumentation-knex:
- changed-files:
- any-glob-to-any-file:
Expand Down
2 changes: 1 addition & 1 deletion plugins/node/instrumentation-kafkajs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which i
| --------------------- | ------------------------------------- | ------------------------------------------------------------ |
| Consumer | `messaging.process.duration` | Duration of processing operation. [1] |
| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. |
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. |
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. (Only emitted for [email protected] and later.) |
| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. |

**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch.
Expand Down
11 changes: 7 additions & 4 deletions plugins/node/instrumentation-kafkajs/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,13 @@ export class KafkaJsInstrumentation extends InstrumentationBase<KafkaJsInstrumen
private _setKafkaEventListeners(kafkaObj: KafkaEventEmitter) {
if (kafkaObj[EVENT_LISTENERS_SET]) return;

kafkaObj.on(
kafkaObj.events.REQUEST,
this._recordClientDurationMetric.bind(this)
);
// The REQUEST Consumer event was added in [email protected].
if (kafkaObj.events?.REQUEST) {
kafkaObj.on(
kafkaObj.events.REQUEST,
this._recordClientDurationMetric.bind(this)
);
}

kafkaObj[EVENT_LISTENERS_SET] = true;
}
Expand Down
20 changes: 5 additions & 15 deletions plugins/node/instrumentation-kafkajs/test/kafkajs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,7 @@ describe('instrumentation-kafkajs', () => {
);
instrumentation.disable();
instrumentation.enable();
producer = kafka.producer({
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
});
producer = kafka.producer();
}
beforeEach(() => {
initializeProducer();
Expand Down Expand Up @@ -479,9 +477,7 @@ describe('instrumentation-kafkajs', () => {
});
instrumentation.disable();
instrumentation.enable();
producer = kafka.producer({
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
});
producer = kafka.producer();
});

it('error in send create failed span', async () => {
Expand Down Expand Up @@ -634,9 +630,7 @@ describe('instrumentation-kafkajs', () => {
instrumentation.disable();
instrumentation.setConfig(config);
instrumentation.enable();
producer = kafka.producer({
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
});
producer = kafka.producer();
});

it('producer hook add span attribute with value from message', async () => {
Expand Down Expand Up @@ -671,9 +665,7 @@ describe('instrumentation-kafkajs', () => {
instrumentation.disable();
instrumentation.setConfig(config);
instrumentation.enable();
producer = kafka.producer({
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
});
producer = kafka.producer();
});

it('producer hook add span attribute with value from message', async () => {
Expand Down Expand Up @@ -1227,9 +1219,7 @@ describe('instrumentation-kafkajs', () => {
storeRunConfig();
instrumentation.disable();
instrumentation.enable();
producer = kafka.producer({
createPartitioner: kafkajs.Partitioners.LegacyPartitioner,
});
producer = kafka.producer();
consumer = kafka.consumer({ groupId: 'testing-group-id' });
});

Expand Down