Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 43 additions & 12 deletions plugins/node/instrumentation-kafkajs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ npm install --save @opentelemetry/instrumentation-kafkajs

```js
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { KafkaJsInstrumentation } = require('@opentelemetry/instrumentation-kafkajs');
const {
KafkaJsInstrumentation,
} = require('@opentelemetry/instrumentation-kafkajs');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');

const provider = new NodeTracerProvider();
Expand All @@ -42,22 +44,51 @@ registerInstrumentations({

You can set the following:

| Options | Type | Description |
| ---------------------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------------------- |
| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. |
| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. |
| Options | Type | Description |
| -------------- | -------------------------------------- | -------------------------------------------------------------------------------------------------------- |
| `producerHook` | `KafkaProducerCustomAttributeFunction` | Function called before a producer message is sent. Allows for adding custom attributes to the span. |
| `consumerHook` | `KafkaConsumerCustomAttributeFunction` | Function called before a consumer message is processed. Allows for adding custom attributes to the span. |

## Semantic Conventions

This package uses `@opentelemetry/semantic-conventions` version `1.24+`, which implements Semantic Convention [Version 1.7.0](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/semantic_conventions/README.md)
This package uses `@opentelemetry/semantic-conventions` version `1.30+`, which implements Semantic Convention [Version 1.30.0](https://github.com/open-telemetry/semantic-conventions/blob/v1.30.0/docs/README.md)

Attributes collected:
### Spans Emitted

| Attribute | Short Description |
| -----------------------------| ----------------------------------------------------- |
| `messaging.system` | An identifier for the messaging system being used. |
| `messaging.destination` | The message destination name. |
| `messaging.operation` | A string identifying the kind of messaging operation. |
| KafkaJS Object | Action | Span Kind | Span Name | Operation Type / Name |
| -------------- | -------------------------- | --------- | -------------------------- | --------------------- |
| Consumer | `eachBatch` | Client | `poll <topic-name>` | `receive` / `poll` |
| Consumer | `eachBatch`, `eachMessage` | Consumer | `process <topic-name>` [1] | `process` / `process` |
| Producer | `send` | Producer | `send <topic-name>` | `send` / `send` |

**[1] `process <topic-name>`:** In the context of `eachBatch`, this span will be emitted for each message in the batch but the timing (start, end, duration) will reflect the timing of the batch.

### Metrics Emitted

| KafkaJS Object | Metric Name | Short Description |
| --------------------- | ------------------------------------- | ------------------------------------------------------------ |
| Consumer | `messaging.process.duration` | Duration of processing operation. [1] |
| Consumer | `messaging.client.consumed.messages` | Number of messages that were delivered to the application. |
| Consumer and Producer | `messaging.client.operation.duration` | Number of messages that were delivered to the application. |
| Producer | `messaging.client.sent.messages` | Number of messages producer attempted to send to the broker. |

**[1] `messaging.process.duration`:** In the context of `eachBatch`, this metric will be emitted once for each message but the value reflects the duration of the entire batch.

### Attributes Collected

These attributes are added to both spans and metrics, where possible.

| Attribute | Short Description |
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `messaging.system` | An identifier for the messaging system being used (i.e. `"kafka"`). |
| `messaging.destination.name` | The message destination name. |
| `messaging.operation.type` | A string identifying the type of messaging operation. |
| `messaging.operation.name` | The system-specific name of the messaging operation. |
| `messaging.operation.name` | The system-specific name of the messaging operation. |
| `messaging.kafka.message.key` | A stringified value representing the key of the Kafka message (if present). |
| `messaging.kafka.message.tombstone` | A boolean that is true if the message is a tombstone. |
| `messaging.kafka.offset` | The offset of a record in the corresponding Kafka partition. |
| `messaging.destination.partition.id` | The identifier of the partition messages are sent to or received from, unique within the `messaging.destination.name`. **Note:** only available on producer spans. |

## Useful links

Expand Down
4 changes: 2 additions & 2 deletions plugins/node/instrumentation-kafkajs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"types": "build/src/index.d.ts",
"repository": "open-telemetry/opentelemetry-js-contrib",
"scripts": {
"test": "mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'",
"test": "nyc mocha --require @opentelemetry/contrib-test-utils 'test/**/*.test.ts'",
"tdd": "npm run test -- --watch-extensions ts --watch",
"clean": "rimraf build/*",
"lint": "eslint . --ext .ts",
Expand Down Expand Up @@ -56,7 +56,7 @@
},
"dependencies": {
"@opentelemetry/instrumentation": "^0.57.2",
"@opentelemetry/semantic-conventions": "^1.27.0"
"@opentelemetry/semantic-conventions": "^1.30.0"
},
"homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/instrumentation-kafkajs#readme"
}
Loading