Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
'gcloud.project_id': subscription.pubsub.projectId,
'pubsub.topic': topic,
'span.kind': 'consumer',
'pubsub.delivery_method': 'pull',
'pubsub.subscription_type': 'pull',
'pubsub.span_type': 'message_processing',
'messaging.operation': 'receive',
base_service: baseService,
Expand All @@ -135,9 +135,6 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {

if (batchRequestTraceId && batchRequestSpanId) {
meta['pubsub.batch.request_trace_id'] = batchRequestTraceId
meta['pubsub.batch.request_span_id'] = batchRequestSpanId
meta['_dd.pubsub_request.trace_id'] = batchRequestTraceId
meta['_dd.pubsub_request.span_id'] = batchRequestSpanId
// Use JSON format like producer for proper span link parsing
meta['_dd.span_links'] = JSON.stringify([{
trace_id: batchRequestTraceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const SpanContext = require('../../dd-trace/src/opentracing/span_context')
const id = require('../../dd-trace/src/id')
const log = require('../../dd-trace/src/log')

// WeakMap to track delivery spans by request
const deliverySpans = new WeakMap()
// WeakMap to track push receive spans by request
const pushReceiveSpans = new WeakMap()

class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
static get id () { return 'google-cloud-pubsub-push-subscription' }
Expand All @@ -16,10 +16,10 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {

/**
* PUSH SUBSCRIPTION: GCP sends HTTP POST requests to our service with message data in headers.
* We intercept these requests to create a pubsub.delivery span that wraps the HTTP request.
* We intercept these requests to create a pubsub.push.receive span that wraps the HTTP request.
*
* Flow: Detect push request -> Extract trace context -> Create delivery span -> Activate it
* Hierarchy: pubsub.delivery (parent) -> http.request (child) -> express.middleware...
* Flow: Detect push request -> Extract trace context -> Create receive span -> Activate it
* Hierarchy: pubsub.push.receive (parent) -> http.request (child) -> express.middleware...
*
* Plugin load order (http/index.js) ensures we subscribe before HttpServerPlugin.
*/
Expand All @@ -28,15 +28,15 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
})

this.addSub('apm:http:server:request:finish', ({ req }) => {
this.#finishDeliverySpan(req)
this.#finishPushReceiveSpan(req)
})
}

#finishDeliverySpan (req) {
const deliverySpan = deliverySpans.get(req)
if (deliverySpan && !deliverySpan._duration) {
deliverySpan.finish()
deliverySpans.delete(req)
#finishPushReceiveSpan (req) {
const pushReceiveSpan = pushReceiveSpans.get(req)
if (pushReceiveSpan && !pushReceiveSpan._duration) {
pushReceiveSpan.finish()
pushReceiveSpans.delete(req)
}
}

Expand All @@ -48,18 +48,18 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
}

if (req.headers['x-goog-pubsub-message-id']) {
this.#createDeliverySpanAndActivate(ctx)
this.#createPushReceiveSpanAndActivate(ctx)
return true
}

log.warn(
'[PubSub] No x-goog-pubsub-* headers detected. pubsub.delivery spans will not be created. ' +
'[PubSub] No x-goog-pubsub-* headers detected. pubsub.push.receive spans will not be created. ' +
'Add --push-no-wrapper-write-metadata to your subscription.'
)
return false
}

#createDeliverySpanAndActivate (ctx) {
#createPushReceiveSpanAndActivate (ctx) {
const { req, res } = ctx
const messageData = this.#parseMessage(req)
if (!messageData) {
Expand All @@ -72,24 +72,24 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
originalContext?.toTraceId() === pubsubRequestContext.toTraceId()

/**
* Create delivery span, choosing parent context:
* Create receive span, choosing parent context:
* - Same trace: use batch context (message is part of the batch trace)
* - Different trace: use message context as parent, link to batch for observability
*
* this.enter() activates the span so the HTTP request span becomes its child.
*/
const deliverySpan = this.#createDeliverySpan(
const pushReceiveSpan = this.#createPushReceiveSpan(
messageData,
isSameTrace ? pubsubRequestContext : originalContext,
isSameTrace ? null : pubsubRequestContext
)

if (!deliverySpan) {
if (!pushReceiveSpan) {
return
}

this.enter(deliverySpan, { req, res })
deliverySpans.set(req, deliverySpan)
this.enter(pushReceiveSpan, { req, res })
pushReceiveSpans.set(req, pushReceiveSpan)
}

#parseMessage (req) {
Expand Down Expand Up @@ -117,7 +117,7 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
* - _dd.pubsub_request.p.tid: upper 64 bits of trace ID (hex, optional for 128-bit traces)
*
* This context represents the "pubsub.request" span on the producer side.
* We use it to create span links, connecting each delivery span back to the original batch.
* We use it to create span links, connecting each pubsub.push.receive span back to the original batch.
*/
const traceIdLower = attrs['_dd.pubsub_request.trace_id']
const spanId = attrs['_dd.pubsub_request.span_id']
Expand All @@ -140,7 +140,7 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
})
}

#createDeliverySpan (messageData, parentContext, linkContext) {
#createPushReceiveSpan (messageData, parentContext, linkContext) {
const { message, subscription, topicName, attrs } = messageData
const subscriptionName = subscription?.slice(subscription.lastIndexOf('/') + 1) ?? subscription
const publishStartTime = attrs['x-dd-publish-start-time']
Expand All @@ -151,17 +151,17 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
const serviceOverride = this.config.service ?? `${baseService}-pubsub`

// Use this.startSpan() which automatically activates the span
const span = this.startSpan('pubsub.delivery', {
const span = this.startSpan('pubsub.push.receive', {
childOf: parentContext,
startTime,
kind: 'consumer',
service: serviceOverride,
meta: {
component: 'google-cloud-pubsub',
'pubsub.method': 'delivery',
'pubsub.method': 'receive',
'pubsub.subscription': subscription,
'pubsub.message_id': message.messageId,
'pubsub.delivery_method': 'push',
'pubsub.subscription_type': 'push',
'pubsub.topic': topicName,
'_dd.base_service': baseService,
'_dd.serviceoverride.type': 'integration',
Expand All @@ -173,6 +173,13 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
return null
}

span._integrationName = 'google-cloud-pubsub'
// Calculate delivery latency (queue time from publish to delivery)
if (publishStartTime) {
const deliveryDuration = Date.now() - Number(publishStartTime)
span.setTag('pubsub.delivery_duration_ms', deliveryDuration)
}

this.#addBatchMetadata(span, attrs)

if (linkContext) {
Expand Down Expand Up @@ -201,10 +208,8 @@ class GoogleCloudPubsubPushSubscriptionPlugin extends TracingPlugin {
span.setTag('pubsub.batch.description', `Message ${index + 1} of ${size}`)

const requestTraceId = attrs['_dd.pubsub_request.trace_id']
const requestSpanId = attrs['_dd.pubsub_request.span_id']
if (requestTraceId && requestSpanId) {
if (requestTraceId) {
span.setTag('pubsub.batch.request_trace_id', requestTraceId)
span.setTag('pubsub.batch.request_span_id', requestSpanId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Plugin', () => {
after(() => {
delete process.env.PUBSUB_EMULATOR_HOST
delete process.env.DD_DATA_STREAMS_ENABLED
delete process.env.K_SERVICE
// Don't delete K_SERVICE - pubsub-push-subscription.spec.js needs it
})

afterEach(() => {
Expand Down
Loading