Skip to content

Commit 145d856

Browse files
authored
Add pubsub.delivery_duration_ms to push subscriptions (#7205)
* add pubsub.delivery_duration_ms to push subscriptions * update push tests and names of span tags * update tests for push subscriptions * update tests: remove K_SERVICE delete * Update all tests and naming convention of inferred span * create global const for gc * remove ts error check for gc
1 parent 011ce7b commit 145d856

File tree

5 files changed

+149
-325
lines changed

5 files changed

+149
-325
lines changed

packages/datadog-plugin-google-cloud-pubsub/src/consumer.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
126126
'gcloud.project_id': subscription.pubsub.projectId,
127127
'pubsub.topic': topic,
128128
'span.kind': 'consumer',
129-
'pubsub.delivery_method': 'pull',
129+
'pubsub.subscription_type': 'pull',
130130
'pubsub.span_type': 'message_processing',
131131
'messaging.operation': 'receive',
132132
base_service: baseService,
@@ -135,9 +135,6 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
135135

136136
if (batchRequestTraceId && batchRequestSpanId) {
137137
meta['pubsub.batch.request_trace_id'] = batchRequestTraceId
138-
meta['pubsub.batch.request_span_id'] = batchRequestSpanId
139-
meta['_dd.pubsub_request.trace_id'] = batchRequestTraceId
140-
meta['_dd.pubsub_request.span_id'] = batchRequestSpanId
141138
// Use JSON format like producer for proper span link parsing
142139
meta['_dd.span_links'] = JSON.stringify([{
143140
trace_id: batchRequestTraceId,

packages/datadog-plugin-google-cloud-pubsub/src/pubsub-push-subscription.js

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ const SpanContext = require('../../dd-trace/src/opentracing/span_context')
55
const id = require('../../dd-trace/src/id')
66
const log = require('../../dd-trace/src/log')
77

8-
// WeakMap to track delivery spans by request
9-
const deliverySpans = new WeakMap()
8+
// WeakMap to track push receive spans by request
9+
const pushReceiveSpans = new WeakMap()
1010

1111
class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
1212
static get id () { return 'google-cloud-pubsub-push-subscription' }
@@ -16,10 +16,10 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
1616

1717
/**
1818
* PUSH SUBSCRIPTION: GCP sends HTTP POST requests to our service with message data in headers.
19-
* We intercept these requests to create a pubsub.delivery span that wraps the HTTP request.
19+
* We intercept these requests to create a pubsub.push.receive span that wraps the HTTP request.
2020
*
21-
* Flow: Detect push request -> Extract trace context -> Create delivery span -> Activate it
22-
* Hierarchy: pubsub.delivery (parent) -> http.request (child) -> express.middleware...
21+
* Flow: Detect push request -> Extract trace context -> Create receive span -> Activate it
22+
* Hierarchy: pubsub.push.receive (parent) -> http.request (child) -> express.middleware...
2323
*
2424
* Plugin load order (http/index.js) ensures we subscribe before HttpServerPlugin.
2525
*/
@@ -28,15 +28,15 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
2828
})
2929

3030
this.addSub('apm:http:server:request:finish', ({ req }) => {
31-
this.#finishDeliverySpan(req)
31+
this.#finishPushReceiveSpan(req)
3232
})
3333
}
3434

35-
#finishDeliverySpan (req) {
36-
const deliverySpan = deliverySpans.get(req)
37-
if (deliverySpan && !deliverySpan._duration) {
38-
deliverySpan.finish()
39-
deliverySpans.delete(req)
35+
#finishPushReceiveSpan (req) {
36+
const pushReceiveSpan = pushReceiveSpans.get(req)
37+
if (pushReceiveSpan && !pushReceiveSpan._duration) {
38+
pushReceiveSpan.finish()
39+
pushReceiveSpans.delete(req)
4040
}
4141
}
4242

@@ -48,18 +48,18 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
4848
}
4949

5050
if (req.headers['x-goog-pubsub-message-id']) {
51-
this.#createDeliverySpanAndActivate(ctx)
51+
this.#createPushReceiveSpanAndActivate(ctx)
5252
return true
5353
}
5454

5555
log.warn(
56-
'[PubSub] No x-goog-pubsub-* headers detected. pubsub.delivery spans will not be created. ' +
56+
'[PubSub] No x-goog-pubsub-* headers detected. pubsub.push.receive spans will not be created. ' +
5757
'Add --push-no-wrapper-write-metadata to your subscription.'
5858
)
5959
return false
6060
}
6161

62-
#createDeliverySpanAndActivate (ctx) {
62+
#createPushReceiveSpanAndActivate (ctx) {
6363
const { req, res } = ctx
6464
const messageData = this.#parseMessage(req)
6565
if (!messageData) {
@@ -72,24 +72,24 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
7272
originalContext?.toTraceId() === pubsubRequestContext.toTraceId()
7373

7474
/**
75-
* Create delivery span, choosing parent context:
75+
* Create receive span, choosing parent context:
7676
* - Same trace: use batch context (message is part of the batch trace)
7777
* - Different trace: use message context as parent, link to batch for observability
7878
*
7979
* this.enter() activates the span so the HTTP request span becomes its child.
8080
*/
81-
const deliverySpan = this.#createDeliverySpan(
81+
const pushReceiveSpan = this.#createPushReceiveSpan(
8282
messageData,
8383
isSameTrace ? pubsubRequestContext : originalContext,
8484
isSameTrace ? null : pubsubRequestContext
8585
)
8686

87-
if (!deliverySpan) {
87+
if (!pushReceiveSpan) {
8888
return
8989
}
9090

91-
this.enter(deliverySpan, { req, res })
92-
deliverySpans.set(req, deliverySpan)
91+
this.enter(pushReceiveSpan, { req, res })
92+
pushReceiveSpans.set(req, pushReceiveSpan)
9393
}
9494

9595
#parseMessage (req) {
@@ -117,7 +117,7 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
117117
* - _dd.pubsub_request.p.tid: upper 64 bits of trace ID (hex, optional for 128-bit traces)
118118
*
119119
* This context represents the "pubsub.request" span on the producer side.
120-
* We use it to create span links, connecting each delivery span back to the original batch.
120+
* We use it to create span links, connecting each pubsub.push.receive span back to the original batch.
121121
*/
122122
const traceIdLower = attrs['_dd.pubsub_request.trace_id']
123123
const spanId = attrs['_dd.pubsub_request.span_id']
@@ -140,7 +140,7 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
140140
})
141141
}
142142

143-
#createDeliverySpan (messageData, parentContext, linkContext) {
143+
#createPushReceiveSpan (messageData, parentContext, linkContext) {
144144
const { message, subscription, topicName, attrs } = messageData
145145
const subscriptionName = subscription?.slice(subscription.lastIndexOf('/') + 1) ?? subscription
146146
const publishStartTime = attrs['x-dd-publish-start-time']
@@ -151,17 +151,17 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
151151
const serviceOverride = this.config.service ?? `${baseService}-pubsub`
152152

153153
// Use this.startSpan() which automatically activates the span
154-
const span = this.startSpan('pubsub.delivery', {
154+
const span = this.startSpan('pubsub.push.receive', {
155155
childOf: parentContext,
156156
startTime,
157157
kind: 'consumer',
158158
service: serviceOverride,
159159
meta: {
160160
component: 'google-cloud-pubsub',
161-
'pubsub.method': 'delivery',
161+
'pubsub.method': 'receive',
162162
'pubsub.subscription': subscription,
163163
'pubsub.message_id': message.messageId,
164-
'pubsub.delivery_method': 'push',
164+
'pubsub.subscription_type': 'push',
165165
'pubsub.topic': topicName,
166166
'_dd.base_service': baseService,
167167
'_dd.serviceoverride.type': 'integration',
@@ -173,6 +173,13 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
173173
return null
174174
}
175175

176+
span._integrationName = 'google-cloud-pubsub'
177+
// Calculate delivery latency (queue time from publish to delivery)
178+
if (publishStartTime) {
179+
const deliveryDuration = Date.now() - Number(publishStartTime)
180+
span.setTag('pubsub.delivery_duration_ms', deliveryDuration)
181+
}
182+
176183
this.#addBatchMetadata(span, attrs)
177184

178185
if (linkContext) {
@@ -201,10 +208,8 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
201208
span.setTag('pubsub.batch.description', `Message ${index + 1} of ${size}`)
202209

203210
const requestTraceId = attrs['_dd.pubsub_request.trace_id']
204-
const requestSpanId = attrs['_dd.pubsub_request.span_id']
205-
if (requestTraceId && requestSpanId) {
211+
if (requestTraceId) {
206212
span.setTag('pubsub.batch.request_trace_id', requestTraceId)
207-
span.setTag('pubsub.batch.request_span_id', requestSpanId)
208213
}
209214
}
210215
}

packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const { expectSomeSpan, withDefaults } = require('../../dd-trace/test/plugins/he
1414
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
1515
const { DataStreamsProcessor, ENTRY_PARENT_HASH } = require('../../dd-trace/src/datastreams/processor')
1616
const { expectedSchema, rawExpectedSchema } = require('./naming')
17+
const gc = global.gc ?? (() => {})
1718

1819
// The roundtrip to the pubsub emulator takes time. Sometimes a *long* time.
1920
const TIMEOUT = 30000
@@ -34,7 +35,7 @@ describe('Plugin', () => {
3435
after(() => {
3536
delete process.env.PUBSUB_EMULATOR_HOST
3637
delete process.env.DD_DATA_STREAMS_ENABLED
37-
delete process.env.K_SERVICE
38+
// Don't delete K_SERVICE - pubsub-push-subscription.spec.js needs it
3839
})
3940

4041
afterEach(() => {
@@ -514,11 +515,11 @@ describe('Plugin', () => {
514515
})()
515516

516517
// Force garbage collection multiple times
517-
global.gc()
518+
gc()
518519
await new Promise(resolve => setTimeout(resolve, 100))
519-
global.gc()
520+
gc()
520521
await new Promise(resolve => setTimeout(resolve, 100))
521-
global.gc()
522+
gc()
522523

523524
// Wait a bit for FinalizationRegistry callback
524525
await new Promise(resolve => setTimeout(resolve, 500))
@@ -564,9 +565,9 @@ describe('Plugin', () => {
564565
subscription.close()
565566

566567
// Force GC
567-
global.gc()
568+
gc()
568569
await new Promise(resolve => setTimeout(resolve, 100))
569-
global.gc()
570+
gc()
570571

571572
const afterMemory = process.memoryUsage().heapUsed
572573
const memoryIncrease = afterMemory - initialMemory

0 commit comments

Comments
 (0)