Skip to content

Commit 450253c

Browse files
authored
fix(otel): use job timestamps for process span attributes (#3832)
1 parent 17e7210 commit 450253c

File tree

5 files changed

+395
-5
lines changed

5 files changed

+395
-5
lines changed

docs/gitbook/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
- [Create Custom Events](guide/events/create-custom-events.md)
6464
- [Telemetry](guide/telemetry/README.md)
6565
- [Getting started](guide/telemetry/getting-started.md)
66+
- [Traces](guide/telemetry/traces.md)
6667
- [Metrics](guide/telemetry/metrics.md)
6768
- [Running Jaeger](guide/telemetry/running-jaeger.md)
6869
- [Running a simple example](guide/telemetry/running-a-simple-example.md)
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
# Traces
2+
3+
BullMQ provides comprehensive distributed tracing support through OpenTelemetry. Traces allow you to track the flow of jobs through your system, identify bottlenecks, and debug issues across distributed services.
4+
5+
## Enabling Traces
6+
7+
To enable tracing, pass a telemetry instance when creating Queue, Worker, or FlowProducer:
8+
9+
```typescript
10+
import { Queue, Worker } from 'bullmq';
11+
import { BullMQOtel } from 'bullmq-otel';
12+
13+
const telemetry = new BullMQOtel({
14+
tracerName: 'my-app',
15+
version: '1.0.0',
16+
});
17+
18+
const queue = new Queue('myQueue', {
19+
connection: {
20+
host: '127.0.0.1',
21+
port: 6379,
22+
},
23+
telemetry,
24+
});
25+
26+
const worker = new Worker(
27+
'myQueue',
28+
async job => {
29+
return 'some value';
30+
},
31+
{
32+
connection: {
33+
host: '127.0.0.1',
34+
port: 6379,
35+
},
36+
telemetry,
37+
},
38+
);
39+
```
40+
41+
## Span Kinds
42+
43+
BullMQ uses different span kinds to categorize operations:
44+
45+
| Span Kind | Description |
46+
| ---------- | ------------------------------------------------------------------- |
47+
| `PRODUCER` | Operations that add jobs to a queue (producing work) |
48+
| `CONSUMER` | Operations that process jobs from a queue (consuming work) |
49+
| `INTERNAL` | Internal operations like pausing, resuming, or managing queue state |
50+
51+
## Available Traces
52+
53+
BullMQ automatically creates spans for the following operations:
54+
55+
### Queue Class
56+
57+
| Operation | Span Name | Span Kind | Description |
58+
| ------------------------ | ------------------------------------ | --------- | ------------------------------------ |
59+
| `add` | `{queueName}.add` | PRODUCER | Adding a single job to the queue |
60+
| `addBulk` | `{queueName}.addBulk` | PRODUCER | Adding multiple jobs to the queue |
61+
| `pause` | `{queueName}.pause` | INTERNAL | Pausing the queue |
62+
| `resume` | `{queueName}.resume` | INTERNAL | Resuming the queue |
63+
| `close` | `{queueName}.close` | INTERNAL | Closing the queue connection |
64+
| `rateLimit` | `{queueName}.rateLimit` | INTERNAL | Setting rate limit on the queue |
65+
| `removeRepeatable` | `{queueName}.removeRepeatable` | INTERNAL | Removing a repeatable job by options |
66+
| `removeRepeatableByKey` | `{queueName}.removeRepeatableByKey` | INTERNAL | Removing a repeatable job by key |
67+
| `removeDebounceKey` | `{queueName}.removeDebounceKey` | INTERNAL | Removing a debounce key |
68+
| `removeDeduplicationKey` | `{queueName}.removeDeduplicationKey` | INTERNAL | Removing a deduplication key |
69+
| `remove` | `{queueName}.remove` | INTERNAL | Removing a job from the queue |
70+
| `updateJobProgress` | `{queueName}.updateJobProgress` | INTERNAL | Updating job progress |
71+
| `drain` | `{queueName}.drain` | INTERNAL | Draining the queue |
72+
| `clean` | `{queueName}.clean` | INTERNAL | Cleaning jobs from the queue |
73+
| `obliterate` | `{queueName}.obliterate` | INTERNAL | Obliterating the queue (all data) |
74+
| `retryJobs` | `{queueName}.retryJobs` | PRODUCER | Retrying failed jobs |
75+
| `promoteJobs` | `{queueName}.promoteJobs` | INTERNAL | Promoting delayed jobs |
76+
| `trimEvents` | `{queueName}.trimEvents` | INTERNAL | Trimming events from the queue |
77+
78+
### Worker Class
79+
80+
| Operation | Span Name | Span Kind | Description |
81+
| ------------------------ | ------------------------------------ | --------- | -------------------------------------- |
82+
| `getNextJob` | `{queueName}.getNextJob` | INTERNAL | Fetching the next job to process |
83+
| `rateLimit` | `{queueName}.rateLimit` | INTERNAL | Worker rate limiting |
84+
| `processJob` | `{queueName}.{jobName}` | CONSUMER | Processing a job (main processor span) |
85+
| `pause` | `{queueName}.pause` | INTERNAL | Pausing the worker |
86+
| `resume` | `{queueName}.resume` | INTERNAL | Resuming the worker |
87+
| `close` | `{queueName}.close` | INTERNAL | Closing the worker |
88+
| `startStalledCheckTimer` | `{queueName}.startStalledCheckTimer` | INTERNAL | Starting stalled job check timer |
89+
| `moveStalledJobsToWait` | `{queueName}.moveStalledJobsToWait` | INTERNAL | Moving stalled jobs back to waiting |
90+
| `extendLocks` | `{queueName}.extendLocks` | INTERNAL | Extending locks on active jobs |
91+
92+
### Job Class
93+
94+
| Operation | Span Name | Span Kind | Description |
95+
| ----------------- | ---------------------- | --------- | ------------------------------------------------ |
96+
| `moveToCompleted` | `{queueName}.complete` | INTERNAL | Completing a job successfully |
97+
| `moveToFailed` | `{queueName}.{state}` | INTERNAL | Job failure handling (state: fail, delay, retry) |
98+
99+
### JobScheduler Class
100+
101+
| Operation | Span Name | Span Kind | Description |
102+
| --------- | -------------------------------- | --------- | ------------------------- |
103+
| `add` | `{queueName}.upsertJobScheduler` | PRODUCER | Upserting a job scheduler |
104+
105+
### FlowProducer Class
106+
107+
| Operation | Span Name | Span Kind | Description |
108+
| --------- | --------------------- | --------- | ---------------------------------- |
109+
| `add` | `{queueName}.addFlow` | PRODUCER | Adding a flow (tree of jobs) |
110+
| `addBulk` | `addBulkFlows` | PRODUCER | Adding multiple flows |
111+
| `addNode` | `{queueName}.addNode` | PRODUCER | Adding a node in a flow (internal) |
112+
113+
## Trace Attributes
114+
115+
Traces include various attributes for filtering and debugging:
116+
117+
### Common Attributes
118+
119+
| Attribute | Key | Description |
120+
| --------------- | ------------------------ | --------------------------------- |
121+
| Queue Name | `bullmq.queue.name` | Name of the queue |
122+
| Queue Operation | `bullmq.queue.operation` | Type of operation being performed |
123+
124+
### Job Attributes
125+
126+
| Attribute | Key | Description |
127+
| ----------------------- | --------------------------------------- | ---------------------------------------------- |
128+
| Job Name | `bullmq.job.name` | Name of the job |
129+
| Job ID | `bullmq.job.id` | Unique identifier of the job |
130+
| Job Key | `bullmq.job.key` | Redis key of the job |
131+
| Job IDs | `bullmq.job.ids` | Multiple job IDs (bulk ops) |
132+
| Job Options | `bullmq.job.options` | Serialized job options |
133+
| Job Progress | `bullmq.job.progress` | Current job progress value |
134+
| Job Type | `bullmq.job.type` | Type/state of the job |
135+
| Job Attempts Made | `bullmq.job.attempts.made` | Number of attempts made |
136+
| Job Result | `bullmq.job.result` | Result returned by the job |
137+
| Job Failed Reason | `bullmq.job.failed.reason` | Reason for job failure |
138+
| Job Attempt Finished | `bullmq.job.attempt_finished_timestamp` | When the processing attempt ended |
139+
| Job Finished Timestamp | `bullmq.job.finished.timestamp` | When the processing attempt ended (deprecated) |
140+
| Job Processed Timestamp | `bullmq.job.processed.timestamp` | When the job was processed |
141+
| Deduplication Key | `bullmq.job.deduplication.key` | Deduplication key if set |
142+
143+
### Bulk Operation Attributes
144+
145+
| Attribute | Key | Description |
146+
| ---------- | ----------------------- | -------------------------------- |
147+
| Bulk Count | `bullmq.job.bulk.count` | Number of jobs in bulk operation |
148+
| Bulk Names | `bullmq.job.bulk.names` | Comma-separated job names |
149+
150+
### Worker Attributes
151+
152+
| Attribute | Key | Description |
153+
| -------------------- | ------------------------------------ | ------------------------------- |
154+
| Worker Name | `bullmq.worker.name` | Name of the worker |
155+
| Worker ID | `bullmq.worker.id` | Unique identifier of the worker |
156+
| Worker Options | `bullmq.worker.options` | Serialized worker options |
157+
| Worker Rate Limit | `bullmq.worker.rate.limit` | Rate limit duration |
158+
| Do Not Wait Active | `bullmq.worker.do.not.wait.active` | Whether to wait for active jobs |
159+
| Force Close | `bullmq.worker.force.close` | Whether closing is forced |
160+
| Stalled Jobs | `bullmq.worker.stalled.jobs` | Number of stalled jobs detected |
161+
| Failed Jobs | `bullmq.worker.failed.jobs` | Number of failed stalled jobs |
162+
| Jobs to Extend Locks | `bullmq.worker.jobs.to.extend.locks` | Jobs needing lock extension |
163+
164+
### Queue Operation Attributes
165+
166+
| Attribute | Key | Description |
167+
| ---------------- | ------------------------------- | --------------------------- |
168+
| Drain Delay | `bullmq.queue.drain.delay` | Whether to delay drain |
169+
| Grace Period | `bullmq.queue.grace` | Grace period for clean op |
170+
| Clean Limit | `bullmq.queue.clean.limit` | Maximum jobs to clean |
171+
| Rate Limit | `bullmq.queue.rate.limit` | Rate limit settings |
172+
| Queue Options | `bullmq.queue.options` | Serialized queue options |
173+
| Event Max Length | `bullmq.queue.event.max.length` | Maximum event stream length |
174+
175+
### Flow Attributes
176+
177+
| Attribute | Key | Description |
178+
| --------- | ------------------ | ---------------- |
179+
| Flow Name | `bullmq.flow.name` | Name of the flow |
180+
181+
### Scheduler Attributes
182+
183+
| Attribute | Key | Description |
184+
| ---------------- | ------------------------- | ----------------------- |
185+
| Job Scheduler ID | `bullmq.job.scheduler.id` | ID of the job scheduler |
186+
187+
## Context Propagation
188+
189+
BullMQ automatically propagates trace context when jobs are added and processed. This allows you to track jobs across services:
190+
191+
1. **Producer side**: When adding a job, the trace context is captured and stored with the job data
192+
2. **Consumer side**: When processing a job, the trace context is extracted and used to continue the trace
193+
194+
### Controlling Context Propagation
195+
196+
You can control context propagation per job using the `telemetry` job option:
197+
198+
```typescript
199+
// Include trace context (default behavior)
200+
await queue.add('job', data);
201+
202+
// Explicitly include context
203+
await queue.add('job', data, {
204+
telemetry: {
205+
omitContext: false,
206+
},
207+
});
208+
209+
// Omit trace context (start fresh trace when processing)
210+
await queue.add('job', data, {
211+
telemetry: {
212+
omitContext: true,
213+
},
214+
});
215+
216+
// Provide custom metadata
217+
await queue.add('job', data, {
218+
telemetry: {
219+
metadata: customContextData,
220+
},
221+
});
222+
```
223+
224+
## Exporting Traces
225+
226+
To export traces to an observability backend, configure an OpenTelemetry trace exporter:
227+
228+
```typescript
229+
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
230+
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
231+
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
232+
import { trace } from '@opentelemetry/api';
233+
234+
// Configure the trace exporter
235+
const traceExporter = new OTLPTraceExporter({
236+
url: 'http://localhost:4318/v1/traces',
237+
});
238+
239+
const provider = new NodeTracerProvider();
240+
provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter));
241+
provider.register();
242+
243+
// Now BullMQOtel will automatically use the registered provider
244+
```
245+
246+
## Example Trace Visualization
247+
248+
When properly configured, you can see traces in your observability platform showing the complete lifecycle of jobs:
249+
250+
```
251+
├─ myQueue.add (PRODUCER)
252+
│ └─ myQueue.myJob (CONSUMER)
253+
│ └─ myQueue.complete (INTERNAL)
254+
```
255+
256+
For flows with parent-child relationships:
257+
258+
```
259+
├─ myQueue.addFlow (PRODUCER)
260+
│ ├─ childQueue.addNode (PRODUCER)
261+
│ │ └─ childQueue.childJob (CONSUMER)
262+
│ │ └─ childQueue.complete (INTERNAL)
263+
│ └─ parentQueue.addNode (PRODUCER)
264+
│ └─ parentQueue.parentJob (CONSUMER)
265+
│ └─ parentQueue.complete (INTERNAL)
266+
```

src/classes/worker.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -962,12 +962,10 @@ will never work with more accuracy than 1ms. */
962962

963963
this.emit('active', job, 'waiting');
964964

965-
const processedOn = Date.now();
966-
967965
const abortController = this.lockManager.trackJob(
968966
job.id,
969967
token,
970-
processedOn,
968+
job.processedOn,
971969
this.processorAcceptsSignal,
972970
);
973971

@@ -1040,10 +1038,13 @@ will never work with more accuracy than 1ms. */
10401038
return failed;
10411039
} finally {
10421040
this.lockManager.untrackJob(job.id);
1041+
const now = Date.now();
10431042

10441043
span?.setAttributes({
1045-
[TelemetryAttributes.JobFinishedTimestamp]: Date.now(),
1046-
[TelemetryAttributes.JobProcessedTimestamp]: processedOn,
1044+
[TelemetryAttributes.JobFinishedTimestamp]: now,
1045+
[TelemetryAttributes.JobAttemptFinishedTimestamp]:
1046+
job.finishedOn || now,
1047+
[TelemetryAttributes.JobProcessedTimestamp]: job.processedOn,
10471048
});
10481049
}
10491050
},

src/enums/telemetry-attributes.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ export enum TelemetryAttributes {
2727
WorkerStalledJobs = 'bullmq.worker.stalled.jobs',
2828
WorkerFailedJobs = 'bullmq.worker.failed.jobs',
2929
WorkerJobsToExtendLocks = 'bullmq.worker.jobs.to.extend.locks',
30+
/**
31+
* @deprecated Use JobAttemptFinishedTimestamp instead. Will be removed in a future version.
32+
*/
3033
JobFinishedTimestamp = 'bullmq.job.finished.timestamp',
34+
JobAttemptFinishedTimestamp = 'bullmq.job.attempt_finished_timestamp',
3135
JobProcessedTimestamp = 'bullmq.job.processed.timestamp',
3236
JobResult = 'bullmq.job.result',
3337
JobFailedReason = 'bullmq.job.failed.reason',

0 commit comments

Comments
 (0)