From de0b597ccc5ae2da8075492d32313331e6369301 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Tue, 5 Aug 2025 18:38:34 -0400 Subject: [PATCH 01/10] push subs gcp Please enter the commit message for your changes. Lines starting --- .../src/http/server.js | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/packages/datadog-instrumentations/src/http/server.js b/packages/datadog-instrumentations/src/http/server.js index 0624c886787..0aabfaedb4a 100644 --- a/packages/datadog-instrumentations/src/http/server.js +++ b/packages/datadog-instrumentations/src/http/server.js @@ -19,6 +19,207 @@ const requestFinishedSet = new WeakSet() const httpNames = ['http', 'node:http'] const httpsNames = ['https', 'node:https'] +function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { + const isCloudEvent = req.headers['content-type']?.includes('application/cloudevents+json') || + req.headers['ce-specversion'] + const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' + console.log(`[DD-TRACE] HTTP server handling ${eventType} request (framework-agnostic)`) + + // Get tracer from global reference (avoids circular dependencies) + const tracer = global._ddtrace + if (!tracer) { + console.warn('[DD-TRACE] Tracer not available, skipping PubSub handling') + return emit.apply(server, originalArgs) + } + + // Collect raw body for PubSub message parsing with error handling + const chunks = [] + const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB limit for large Pub/Sub payloads + let bodySize = 0 + + const cleanup = () => { + req.removeAllListeners('data') + req.removeAllListeners('end') + req.removeAllListeners('error') + } + + const handleError = (error) => { + console.warn(`[DD-TRACE] Error processing ${eventType}:`, error.message) + cleanup() + emit.apply(server, originalArgs) + } + + req.on('error', handleError) + + req.on('data', chunk => { + bodySize += chunk.length + if (bodySize > MAX_BODY_SIZE) { + handleError(new Error(`Request body too large: ${bodySize} bytes (limit: ${MAX_BODY_SIZE})`)) + return + } + chunks.push(chunk) + }) + + req.on('end', () => { + try { + // Efficiently combine chunks for large payloads + const body = Buffer.concat(chunks).toString('utf8') + + // Log large payload handling for monitoring + if (bodySize > 10 * 1024 * 1024) { // Log if > 10MB + console.log(`[DD-TRACE] Processing large ${eventType} payload: ${(bodySize / 1024 / 1024).toFixed(1)}MB`) + } + + const json = JSON.parse(body) + req.body = json // Set parsed body for framework use + req._pubsubBodyParsed = true // Flag to skip body-parser + + // Extract message and attributes based on format + let message, subscription, attrs + + if (isCloudEvent) { + if (req.headers['ce-specversion']) { + // Binary Content Mode - message in body, trace context in headers + message = json + attrs = { ...message?.attributes } + subscription = req.headers['ce-subscription'] || 'cloud-event-subscription' + + // Merge trace context from headers + const ceTraceParent = req.headers['ce-traceparent'] || req.headers['traceparent'] + const ceTraceState = req.headers['ce-tracestate'] || req.headers['tracestate'] + if (ceTraceParent) attrs.traceparent = ceTraceParent + if (ceTraceState) attrs.tracestate = ceTraceState + } else { + // Structured Content Mode - message in data field + message = json.data?.message || json + subscription = json.data?.subscription || json.subscription || 'cloud-event-subscription' + attrs = { ...message?.attributes } + + // Add Cloud Events context + if (json.source) attrs['ce-source'] = json.source + if (json.type) attrs['ce-type'] = json.type + } + } else { + // Traditional PubSub push format + message = json.message + subscription = json.subscription + attrs = message?.attributes || {} + } + + if (!attrs || typeof attrs !== 'object' || Object.keys(attrs).length === 0) { + console.warn('[DD-TRACE] No valid message attributes found') + cleanup() + return emit.apply(server, originalArgs) + } + + console.log(`[DD-TRACE] Creating span with distributed trace context (${eventType})`) + + // Extract trace context from PubSub message attributes (optimized) + const carrier = {} + const traceHeaders = ['traceparent', 'tracestate', 'x-datadog-trace-id', 'x-datadog-parent-id', 'x-datadog-sampling-priority', 'x-datadog-tags'] + for (const header of traceHeaders) { + if (attrs[header]) { + carrier[header] = attrs[header] + } + } + + // Extract parent span context (key for distributed tracing!) + const parent = tracer.extract('text_map', carrier) + + // Extract project ID and topic from subscription path if not in attributes + let projectId = attrs['gcloud.project_id'] + let topicName = attrs['pubsub.topic'] + + if (!projectId && subscription) { + // Extract from subscription path: projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME + const match = subscription.match(/projects\/([^\/]+)\/subscriptions/) + if (match) projectId = match[1] + } + + if (!topicName) { + topicName = 'push-subscription-topic' + } + + console.log(`[DD-TRACE] Using project_id: ${projectId}, topic: ${topicName}`) + + // Create PubSub consumer span with error handling + let span + try { + span = tracer.startSpan('google-cloud-pubsub.receive', { + childOf: parent, // ✅ This creates the distributed trace link! + tags: { + 'component': 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'span.type': 'worker', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': isCloudEvent ? 'cloud-event' : 'push', + 'pubsub.ack': 1 // Push subscriptions auto-ack + } + }) + + console.log('[DD-TRACE] Created PubSub span with type: worker, parent trace:', parent ? parent.toTraceId() : 'none') + } catch (spanError) { + console.warn('[DD-TRACE] Failed to create PubSub span:', spanError.message) + cleanup() + return emit.apply(server, originalArgs) + } + + // Attach span to request for application code + req._datadog = { span } + req._eventType = eventType + + // Activate span scope and continue with error handling + const scope = tracer.scope() + try { + scope.activate(span, () => { + // Finish span when response completes (with error handling) + const finishSpan = () => { + try { + console.log('[DD-TRACE] Finishing PubSub span') + if (span && !span.finished) { + span.finish() + } + } catch (finishError) { + console.warn('[DD-TRACE] Error finishing span:', finishError.message) + } + cleanup() + } + + // Handle both success and error cases + res.on('finish', finishSpan) + res.on('close', finishSpan) + res.on('error', (resError) => { + console.warn('[DD-TRACE] Response error:', resError.message) + if (span && !span.finished) { + span.setTag('error', true) + span.setTag('error.message', resError.message) + } + finishSpan() + }) + + // Continue with normal request processing + emit.apply(server, originalArgs) + }) + } catch (activationError) { + console.warn('[DD-TRACE] Error activating span scope:', activationError.message) + if (span && !span.finished) { + span.finish() + } + cleanup() + emit.apply(server, originalArgs) + } + + } catch (e) { + console.warn('[DD-TRACE] Failed to parse PubSub push body:', e.message) + cleanup() + emit.apply(server, originalArgs) + } + }) +} + addHook({ name: httpNames }, http => { shimmer.wrap(http.ServerResponse.prototype, 'emit', wrapResponseEmit) shimmer.wrap(http.Server.prototype, 'emit', wrapEmit) @@ -54,6 +255,7 @@ function wrapResponseEmit (emit) { return emit.apply(this, arguments) } } + function wrapEmit (emit) { return function (eventName, req, res) { if (!startServerCh.hasSubscribers) { @@ -61,6 +263,20 @@ function wrapEmit (emit) { } if (eventName === 'request') { + // Handle PubSub push AND Cloud Events at HTTP server level - works with ANY framework + const isPubSubOrCloudEvent = req.method === 'POST' && ( + // Traditional PubSub push + (req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google')) || + // Cloud Events + req.headers['content-type']?.includes('application/cloudevents+json') || + req.headers['ce-specversion'] // Binary Content Mode + ) + + if (isPubSubOrCloudEvent) { + return handlePubSubOrCloudEvent(req, res, emit, this, arguments) + } + res.req = req const abortController = new AbortController() From 40836586bcf537869eaed74a2963b8aadd359def Mon Sep 17 00:00:00 2001 From: nina9753 Date: Tue, 5 Aug 2025 18:44:57 -0400 Subject: [PATCH 02/10] Create Synthetic Span for GCP Push Subscriptions --- packages/datadog-instrumentations/src/body-parser.js | 10 ++++++++++ .../datadog-plugin-google-cloud-pubsub/src/producer.js | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/packages/datadog-instrumentations/src/body-parser.js b/packages/datadog-instrumentations/src/body-parser.js index ab51accb44b..29d49495b03 100644 --- a/packages/datadog-instrumentations/src/body-parser.js +++ b/packages/datadog-instrumentations/src/body-parser.js @@ -26,6 +26,11 @@ addHook({ versions: ['>=1.4.0 <1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { + // Skip body parsing if PubSub/Cloud Event middleware already parsed it + if (req._pubsubBodyParsed) { + console.log('[DD-TRACE] Skipping body-parser for PubSub/Cloud Event request - already parsed') + return next() + } const nextResource = new AsyncResource('bound-anonymous-fn') arguments[2] = nextResource.bind(publishRequestBodyAndNext(req, res, next)) return read.apply(this, arguments) @@ -38,6 +43,11 @@ addHook({ versions: ['>=1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { + // Skip body parsing if PubSub/Cloud Event middleware already parsed it + if (req._pubsubBodyParsed) { + console.log('[DD-TRACE] Skipping body-parser for PubSub/Cloud Event request - already parsed') + return next() + } arguments[2] = publishRequestBodyAndNext(req, res, next) return read.apply(this, arguments) }) diff --git a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js index 94644e4e063..346fe3c4b50 100644 --- a/packages/datadog-plugin-google-cloud-pubsub/src/producer.js +++ b/packages/datadog-plugin-google-cloud-pubsub/src/producer.js @@ -28,6 +28,11 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin { msg.attributes = {} } this.tracer.inject(span, 'text_map', msg.attributes) + + // Also inject project_id and topic for consumer correlation + msg.attributes['gcloud.project_id'] = projectId + msg.attributes['pubsub.topic'] = topic + if (this.config.dsmEnabled) { const payloadSize = getHeadersSize(msg) const dataStreamsContext = this.tracer From 49be7c289dfe9f86b98b2874b05a1b1521e9437e Mon Sep 17 00:00:00 2001 From: nina9753 Date: Wed, 6 Aug 2025 10:36:06 -0400 Subject: [PATCH 03/10] update with yarn lint --- .../src/body-parser.js | 2 - .../src/http/server.js | 65 ++++++------------- 2 files changed, 21 insertions(+), 46 deletions(-) diff --git a/packages/datadog-instrumentations/src/body-parser.js b/packages/datadog-instrumentations/src/body-parser.js index 29d49495b03..2317dd43042 100644 --- a/packages/datadog-instrumentations/src/body-parser.js +++ b/packages/datadog-instrumentations/src/body-parser.js @@ -28,7 +28,6 @@ addHook({ return shimmer.wrapFunction(read, read => function (req, res, next) { // Skip body parsing if PubSub/Cloud Event middleware already parsed it if (req._pubsubBodyParsed) { - console.log('[DD-TRACE] Skipping body-parser for PubSub/Cloud Event request - already parsed') return next() } const nextResource = new AsyncResource('bound-anonymous-fn') @@ -45,7 +44,6 @@ addHook({ return shimmer.wrapFunction(read, read => function (req, res, next) { // Skip body parsing if PubSub/Cloud Event middleware already parsed it if (req._pubsubBodyParsed) { - console.log('[DD-TRACE] Skipping body-parser for PubSub/Cloud Event request - already parsed') return next() } arguments[2] = publishRequestBodyAndNext(req, res, next) diff --git a/packages/datadog-instrumentations/src/http/server.js b/packages/datadog-instrumentations/src/http/server.js index 0aabfaedb4a..8a32d465d47 100644 --- a/packages/datadog-instrumentations/src/http/server.js +++ b/packages/datadog-instrumentations/src/http/server.js @@ -19,16 +19,14 @@ const requestFinishedSet = new WeakSet() const httpNames = ['http', 'node:http'] const httpsNames = ['https', 'node:https'] -function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { +function handlePubSubOrCloudEvent (req, res, emit, server, originalArgs) { const isCloudEvent = req.headers['content-type']?.includes('application/cloudevents+json') || req.headers['ce-specversion'] const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' - console.log(`[DD-TRACE] HTTP server handling ${eventType} request (framework-agnostic)`) // Get tracer from global reference (avoids circular dependencies) const tracer = global._ddtrace if (!tracer) { - console.warn('[DD-TRACE] Tracer not available, skipping PubSub handling') return emit.apply(server, originalArgs) } @@ -43,8 +41,8 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { req.removeAllListeners('error') } + // eslint-disable-next-line n/handle-callback-err const handleError = (error) => { - console.warn(`[DD-TRACE] Error processing ${eventType}:`, error.message) cleanup() emit.apply(server, originalArgs) } @@ -64,12 +62,6 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { try { // Efficiently combine chunks for large payloads const body = Buffer.concat(chunks).toString('utf8') - - // Log large payload handling for monitoring - if (bodySize > 10 * 1024 * 1024) { // Log if > 10MB - console.log(`[DD-TRACE] Processing large ${eventType} payload: ${(bodySize / 1024 / 1024).toFixed(1)}MB`) - } - const json = JSON.parse(body) req.body = json // Set parsed body for framework use req._pubsubBodyParsed = true // Flag to skip body-parser @@ -85,8 +77,8 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { subscription = req.headers['ce-subscription'] || 'cloud-event-subscription' // Merge trace context from headers - const ceTraceParent = req.headers['ce-traceparent'] || req.headers['traceparent'] - const ceTraceState = req.headers['ce-tracestate'] || req.headers['tracestate'] + const ceTraceParent = req.headers['ce-traceparent'] || req.headers.traceparent + const ceTraceState = req.headers['ce-tracestate'] || req.headers.tracestate if (ceTraceParent) attrs.traceparent = ceTraceParent if (ceTraceState) attrs.tracestate = ceTraceState } else { @@ -107,16 +99,15 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { } if (!attrs || typeof attrs !== 'object' || Object.keys(attrs).length === 0) { - console.warn('[DD-TRACE] No valid message attributes found') cleanup() return emit.apply(server, originalArgs) } - console.log(`[DD-TRACE] Creating span with distributed trace context (${eventType})`) - // Extract trace context from PubSub message attributes (optimized) const carrier = {} - const traceHeaders = ['traceparent', 'tracestate', 'x-datadog-trace-id', 'x-datadog-parent-id', 'x-datadog-sampling-priority', 'x-datadog-tags'] + const traceHeaders = ['traceparent', 'tracestate', + 'x-datadog-trace-id', 'x-datadog-parent-id', + 'x-datadog-sampling-priority', 'x-datadog-tags'] for (const header of traceHeaders) { if (attrs[header]) { carrier[header] = attrs[header] @@ -132,7 +123,7 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { if (!projectId && subscription) { // Extract from subscription path: projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME - const match = subscription.match(/projects\/([^\/]+)\/subscriptions/) + const match = subscription.match(/projects\/([^\\/]+)\/subscriptions/) if (match) projectId = match[1] } @@ -140,15 +131,13 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { topicName = 'push-subscription-topic' } - console.log(`[DD-TRACE] Using project_id: ${projectId}, topic: ${topicName}`) - // Create PubSub consumer span with error handling let span try { span = tracer.startSpan('google-cloud-pubsub.receive', { - childOf: parent, // ✅ This creates the distributed trace link! + childOf: parent, tags: { - 'component': 'google-cloud-pubsub', + component: 'google-cloud-pubsub', 'span.kind': 'consumer', 'span.type': 'worker', 'gcloud.project_id': projectId || 'unknown', @@ -159,10 +148,7 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { 'pubsub.ack': 1 // Push subscriptions auto-ack } }) - - console.log('[DD-TRACE] Created PubSub span with type: worker, parent trace:', parent ? parent.toTraceId() : 'none') - } catch (spanError) { - console.warn('[DD-TRACE] Failed to create PubSub span:', spanError.message) + } catch { cleanup() return emit.apply(server, originalArgs) } @@ -173,26 +159,20 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { // Activate span scope and continue with error handling const scope = tracer.scope() + const finishSpan = () => { + try { + if (span && !span.finished) { + span.finish() + } + } catch {} + cleanup() + } try { scope.activate(span, () => { // Finish span when response completes (with error handling) - const finishSpan = () => { - try { - console.log('[DD-TRACE] Finishing PubSub span') - if (span && !span.finished) { - span.finish() - } - } catch (finishError) { - console.warn('[DD-TRACE] Error finishing span:', finishError.message) - } - cleanup() - } - - // Handle both success and error cases res.on('finish', finishSpan) res.on('close', finishSpan) res.on('error', (resError) => { - console.warn('[DD-TRACE] Response error:', resError.message) if (span && !span.finished) { span.setTag('error', true) span.setTag('error.message', resError.message) @@ -203,17 +183,14 @@ function handlePubSubOrCloudEvent(req, res, emit, server, originalArgs) { // Continue with normal request processing emit.apply(server, originalArgs) }) - } catch (activationError) { - console.warn('[DD-TRACE] Error activating span scope:', activationError.message) + } catch { if (span && !span.finished) { span.finish() } cleanup() emit.apply(server, originalArgs) } - - } catch (e) { - console.warn('[DD-TRACE] Failed to parse PubSub push body:', e.message) + } catch { cleanup() emit.apply(server, originalArgs) } From c51d76f69850e107c714e35d9d99f603bd9bbb57 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Wed, 6 Aug 2025 13:28:18 -0400 Subject: [PATCH 04/10] update with tests and helper function --- .../src/http/server.js | 187 ++++++++++++------ .../test/http.spec.js | 177 +++++++++++++++++ 2 files changed, 303 insertions(+), 61 deletions(-) diff --git a/packages/datadog-instrumentations/src/http/server.js b/packages/datadog-instrumentations/src/http/server.js index 8a32d465d47..40dad004748 100644 --- a/packages/datadog-instrumentations/src/http/server.js +++ b/packages/datadog-instrumentations/src/http/server.js @@ -19,17 +19,112 @@ const requestFinishedSet = new WeakSet() const httpNames = ['http', 'node:http'] const httpsNames = ['https', 'node:https'] -function handlePubSubOrCloudEvent (req, res, emit, server, originalArgs) { - const isCloudEvent = req.headers['content-type']?.includes('application/cloudevents+json') || - req.headers['ce-specversion'] - const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' +function parseCloudEventMessage (json, req) { + // Eventarc only uses Binary Content Mode with ce-specversion header + // Payload structure: {"message": {...}, "subscription": "..."} + const message = json.message || json + const attrs = { ...message?.attributes } + const subscription = json.subscription || req.headers['ce-subscription'] || 'cloud-event-subscription' + + // For Eventarc: prioritize message attributes (original trace) over transport headers + // Only use CE headers if message attributes don't have trace context + if (!attrs.traceparent) { + const ceTraceParent = req.headers['ce-traceparent'] || req.headers.traceparent + if (ceTraceParent) attrs.traceparent = ceTraceParent + } + if (!attrs.tracestate) { + const ceTraceState = req.headers['ce-tracestate'] || req.headers.tracestate + if (ceTraceState) attrs.tracestate = ceTraceState + } + + // Add Cloud Event context from headers to attributes for span tags + if (req.headers['ce-source']) attrs['ce-source'] = req.headers['ce-source'] + if (req.headers['ce-type']) attrs['ce-type'] = req.headers['ce-type'] + return { message, subscription, attrs } +} + +function parsePubSubMessage (json) { + // Traditional PubSub push format + const message = json.message + const subscription = json.subscription + const attrs = message?.attributes || {} + return { message, subscription, attrs } +} + +function createCloudEventSpan (tracer, parent, topicName, projectId, subscription, message, attrs, req) { + const spanTags = { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': 'eventarc', + 'eventarc.trigger': 'pubsub', + } + + // Add Cloud Event specific tags + if (attrs['ce-source']) spanTags['eventarc.source'] = attrs['ce-source'] + if (attrs['ce-type']) spanTags['eventarc.type'] = attrs['ce-type'] + if (req.headers['ce-id']) spanTags['eventarc.id'] = req.headers['ce-id'] + if (req.headers['ce-specversion']) spanTags['eventarc.specversion'] = req.headers['ce-specversion'] + if (req.headers['ce-time']) spanTags['eventarc.time'] = req.headers['ce-time'] + + return tracer.startSpan('google-cloud-pubsub.receive', { + childOf: parent, + resource: topicName, + type: 'worker', + tags: spanTags, + metrics: { + 'pubsub.ack': 1 + } + }) +} + +function createPubSubSpan (tracer, parent, topicName, projectId, subscription, message, attrs) { + const spanTags = { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': 'push' + } + return tracer.startSpan('google-cloud-pubsub.receive', { + childOf: parent, + resource: topicName, + type: 'worker', + tags: spanTags, + metrics: { + 'pubsub.ack': 1 + } + }) +} +function handleCloudEvent (req, res, emit, server, originalArgs) { // Get tracer from global reference (avoids circular dependencies) const tracer = global._ddtrace if (!tracer) { return emit.apply(server, originalArgs) } + return processEventRequest(req, res, emit, server, originalArgs, tracer, true) +} + +function handlePubSubPush (req, res, emit, server, originalArgs) { + // Get tracer from global reference (avoids circular dependencies) + const tracer = global._ddtrace + if (!tracer) { + return emit.apply(server, originalArgs) + } + + return processEventRequest(req, res, emit, server, originalArgs, tracer, false) +} + +function processEventRequest (req, res, emit, server, originalArgs, tracer, isCloudEvent) { + const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' + // Collect raw body for PubSub message parsing with error handling const chunks = [] const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB limit for large Pub/Sub payloads @@ -66,38 +161,18 @@ function handlePubSubOrCloudEvent (req, res, emit, server, originalArgs) { req.body = json // Set parsed body for framework use req._pubsubBodyParsed = true // Flag to skip body-parser - // Extract message and attributes based on format - let message, subscription, attrs - - if (isCloudEvent) { - if (req.headers['ce-specversion']) { - // Binary Content Mode - message in body, trace context in headers - message = json - attrs = { ...message?.attributes } - subscription = req.headers['ce-subscription'] || 'cloud-event-subscription' - - // Merge trace context from headers - const ceTraceParent = req.headers['ce-traceparent'] || req.headers.traceparent - const ceTraceState = req.headers['ce-tracestate'] || req.headers.tracestate - if (ceTraceParent) attrs.traceparent = ceTraceParent - if (ceTraceState) attrs.tracestate = ceTraceState - } else { - // Structured Content Mode - message in data field - message = json.data?.message || json - subscription = json.data?.subscription || json.subscription || 'cloud-event-subscription' - attrs = { ...message?.attributes } - - // Add Cloud Events context - if (json.source) attrs['ce-source'] = json.source - if (json.type) attrs['ce-type'] = json.type - } - } else { - // Traditional PubSub push format - message = json.message - subscription = json.subscription - attrs = message?.attributes || {} + // Parse message based on event type + const parsedEvent = isCloudEvent + ? parseCloudEventMessage(json, req) + : parsePubSubMessage(json) + + if (!parsedEvent) { + cleanup() + return emit.apply(server, originalArgs) } + const { message, subscription, attrs } = parsedEvent + if (!attrs || typeof attrs !== 'object' || Object.keys(attrs).length === 0) { cleanup() return emit.apply(server, originalArgs) @@ -113,7 +188,6 @@ function handlePubSubOrCloudEvent (req, res, emit, server, originalArgs) { carrier[header] = attrs[header] } } - // Extract parent span context (key for distributed tracing!) const parent = tracer.extract('text_map', carrier) @@ -131,23 +205,12 @@ function handlePubSubOrCloudEvent (req, res, emit, server, originalArgs) { topicName = 'push-subscription-topic' } - // Create PubSub consumer span with error handling + // Create span based on event type let span try { - span = tracer.startSpan('google-cloud-pubsub.receive', { - childOf: parent, - tags: { - component: 'google-cloud-pubsub', - 'span.kind': 'consumer', - 'span.type': 'worker', - 'gcloud.project_id': projectId || 'unknown', - 'pubsub.topic': topicName || 'unknown', - 'pubsub.subscription': subscription, - 'pubsub.message_id': message?.messageId, - 'pubsub.delivery_method': isCloudEvent ? 'cloud-event' : 'push', - 'pubsub.ack': 1 // Push subscriptions auto-ack - } - }) + span = isCloudEvent + ? createCloudEventSpan(tracer, parent, topicName, projectId, subscription, message, attrs, req) + : createPubSubSpan(tracer, parent, topicName, projectId, subscription, message, attrs) } catch { cleanup() return emit.apply(server, originalArgs) @@ -241,17 +304,19 @@ function wrapEmit (emit) { if (eventName === 'request') { // Handle PubSub push AND Cloud Events at HTTP server level - works with ANY framework - const isPubSubOrCloudEvent = req.method === 'POST' && ( - // Traditional PubSub push - (req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google')) || - // Cloud Events - req.headers['content-type']?.includes('application/cloudevents+json') || - req.headers['ce-specversion'] // Binary Content Mode - ) - - if (isPubSubOrCloudEvent) { - return handlePubSubOrCloudEvent(req, res, emit, this, arguments) + if (req.method === 'POST') { + // Cloud Events detection (Eventarc uses Binary Content Mode with ce-specversion header) + const isCloudEvent = req.headers['ce-specversion'] + + // Traditional PubSub push detection + const isPubSubPush = req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google') + + if (isCloudEvent) { + return handleCloudEvent(req, res, emit, this, arguments) + } else if (isPubSubPush) { + return handlePubSubPush(req, res, emit, this, arguments) + } } res.req = req diff --git a/packages/datadog-instrumentations/test/http.spec.js b/packages/datadog-instrumentations/test/http.spec.js index ec4e989876f..2b20f6ba508 100644 --- a/packages/datadog-instrumentations/test/http.spec.js +++ b/packages/datadog-instrumentations/test/http.spec.js @@ -182,3 +182,180 @@ describe('client', () => { }) }) }) + +describe('server', () => { + let http, server, port + let startServerCh, startServerSpy + + before(async () => { + await agent.load('http') + }) + + after(() => { + return agent.close() + }) + + beforeEach(() => { + http = require('http') + startServerCh = dc.channel('apm:http:server:request:start') + startServerSpy = sinon.stub() + startServerCh.subscribe(startServerSpy) + + // Mock global tracer for server-side handling + global._ddtrace = require('../../dd-trace') + }) + + afterEach((done) => { + startServerCh.unsubscribe(startServerSpy) + if (server) { + server.close(done) + } else { + done() + } + }) + + describe('PubSub detection integration', () => { + beforeEach((done) => { + server = http.createServer((req, res) => { + res.writeHead(200) + res.end('OK') + }) + server.listen(0, () => { + port = server.address().port + done() + }) + }) + + it('should publish to startServerCh for PubSub requests', (done) => { + const pubsubPayload = JSON.stringify({ + message: { + data: Buffer.from('test').toString('base64'), + messageId: 'test-id', + attributes: { 'pubsub.topic': 'test-topic' } + }, + subscription: 'projects/test/subscriptions/test' + }) + + const req = http.request({ + hostname: 'localhost', + port: port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)', + 'Content-Length': Buffer.byteLength(pubsubPayload) + } + }, (res) => { + res.on('data', () => {}) + res.on('end', () => { + // Verify the channel was called + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.write(pubsubPayload) + req.end() + }) + + it('should publish to startServerCh for Eventarc Cloud Events', (done) => { + const eventarcPayload = JSON.stringify({ + message: { + data: Buffer.from('test').toString('base64'), + messageId: 'test-eventarc-id', + attributes: { + 'pubsub.topic': 'test-topic', + traceparent: '00-abc123-def456-01' + } + }, + subscription: 'projects/test/subscriptions/eventarc-sub' + }) + + const req = http.request({ + hostname: 'localhost', + port: port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'ce-specversion': '1.0', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished', + 'ce-source': '//pubsub.googleapis.com/projects/test/topics/test-topic', + 'ce-id': 'test-eventarc-id', + 'Content-Length': Buffer.byteLength(eventarcPayload) + } + }, (res) => { + res.on('data', () => {}) + res.on('end', () => { + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.write(eventarcPayload) + req.end() + }) + + it('should publish to startServerCh for regular HTTP requests', (done) => { + const req = http.request({ + hostname: 'localhost', + port: port, + path: '/', + method: 'GET' + }, (res) => { + res.on('data', () => {}) + res.on('end', () => { + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.end() + }) + }) + + describe('error handling for server', () => { + beforeEach((done) => { + server = http.createServer((req, res) => { + res.writeHead(200) + res.end('OK') + }) + server.listen(0, () => { + port = server.address().port + done() + }) + }) + + it('should handle request errors gracefully', (done) => { + const req = http.request({ + hostname: 'localhost', + port: port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)' + } + }, (res) => { + res.on('data', () => {}) + res.on('end', done) + }) + + // Simulate request error + req.on('error', () => { + // Error should be handled gracefully + done() + }) + + req.write('invalid') + req.destroy(new Error('Simulated error')) + }) + }) +}) From 996839ecd63f67b65e205451d57709fcddab2624 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Wed, 6 Aug 2025 14:19:46 -0400 Subject: [PATCH 05/10] add push sub specfic tests --- .../test/http-server-pubsub.spec.js | 477 ++++++++++++++++++ .../test/http.spec.js | 8 +- 2 files changed, 481 insertions(+), 4 deletions(-) create mode 100644 packages/datadog-instrumentations/test/http-server-pubsub.spec.js diff --git a/packages/datadog-instrumentations/test/http-server-pubsub.spec.js b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js new file mode 100644 index 00000000000..4605b54abc4 --- /dev/null +++ b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js @@ -0,0 +1,477 @@ +'use strict' + +const { expect } = require('chai') +const sinon = require('sinon') +const dc = require('dc-polyfill') +const http = require('http') + +describe('HTTP Server PubSub Instrumentation Tests', () => { + let tracer, startServerCh, startServerSpy + + before(() => { + // Mock tracer for unit tests + tracer = { + startSpan: sinon.stub(), + extract: sinon.stub(), + scope: sinon.stub().returns({ + activate: sinon.stub().callsArg(1) + }) + } + }) + + beforeEach(() => { + startServerCh = dc.channel('apm:http:server:request:start') + startServerSpy = sinon.spy() + if (startServerCh) { + startServerCh.subscribe(startServerSpy) + } + + global._ddtrace = tracer + sinon.stub(console, 'log') + sinon.stub(console, 'warn') + }) + + afterEach(() => { + if (startServerCh && startServerSpy) { + startServerCh.unsubscribe(startServerSpy) + } + sinon.restore() + delete global._ddtrace + }) + + describe('Request Detection Logic', () => { + it('should detect traditional PubSub push requests', () => { + const req = { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'user-agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)' + } + } + + // Test the actual detection logic used in server.js + const isCloudEvent = req.headers['ce-specversion'] + const isPubSubPush = req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google') + + expect(isCloudEvent).to.be.undefined + expect(isPubSubPush).to.be.true + }) + + it('should detect Eventarc Cloud Events', () => { + const req = { + method: 'POST', + headers: { + 'ce-specversion': '1.0', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished', + 'ce-source': '//pubsub.googleapis.com/projects/test/topics/test-topic', + 'content-type': 'application/json' + } + } + + const isCloudEvent = req.headers['ce-specversion'] + const isPubSubPush = req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google') + + expect(isCloudEvent).to.equal('1.0') + expect(isPubSubPush).to.be.false // Cloud Events take precedence + }) + + it('should not detect regular HTTP requests', () => { + const req = { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'user-agent': 'Mozilla/5.0' + } + } + + const isCloudEvent = req.headers['ce-specversion'] + const isPubSubPush = req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google') + + expect(isCloudEvent).to.be.undefined + expect(isPubSubPush).to.be.false + }) + }) + + describe('Message Parsing Logic', () => { + it('should parse traditional PubSub message format', () => { + const json = { + message: { + data: Buffer.from('test message').toString('base64'), + messageId: 'test-message-id', + attributes: { + 'pubsub.topic': 'test-topic', + traceparent: '00-12345-67890-01' + } + }, + subscription: 'projects/test-project/subscriptions/test-sub' + } + + // Test parsePubSubMessage logic + const message = json.message + const subscription = json.subscription + const attrs = message?.attributes || {} + + expect(message.messageId).to.equal('test-message-id') + expect(subscription).to.equal('projects/test-project/subscriptions/test-sub') + expect(attrs['pubsub.topic']).to.equal('test-topic') + expect(attrs.traceparent).to.equal('00-12345-67890-01') + }) + + it('should parse Eventarc Cloud Events format', () => { + const json = { + message: { + data: Buffer.from('test message').toString('base64'), + messageId: 'test-message-id', + attributes: { + traceparent: '00-12345-67890-01', + 'pubsub.topic': 'test-topic' + } + }, + subscription: 'projects/test-project/subscriptions/eventarc-sub' + } + + const req = { + headers: { + 'ce-specversion': '1.0', + 'ce-source': '//pubsub.googleapis.com/projects/test-project/topics/test-topic', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished', + 'ce-id': 'test-message-id' + } + } + + // Test parseCloudEventMessage logic + const message = json.message || json + const attrs = { ...message?.attributes } + const subscription = json.subscription || req.headers['ce-subscription'] || 'cloud-event-subscription' + + // Add Cloud Event context from headers + if (req.headers['ce-source']) attrs['ce-source'] = req.headers['ce-source'] + if (req.headers['ce-type']) attrs['ce-type'] = req.headers['ce-type'] + + expect(message.messageId).to.equal('test-message-id') + expect(subscription).to.equal('projects/test-project/subscriptions/eventarc-sub') + expect(attrs.traceparent).to.equal('00-12345-67890-01') + expect(attrs['ce-source']).to.equal('//pubsub.googleapis.com/projects/test-project/topics/test-topic') + expect(attrs['ce-type']).to.equal('google.cloud.pubsub.topic.v1.messagePublished') + }) + }) + + describe('Trace Context Extraction', () => { + it('should extract trace headers efficiently', () => { + const attrs = { + traceparent: '00-12345-67890-01', + tracestate: 'dd=s:1', + 'x-datadog-trace-id': '123456789', + 'x-datadog-parent-id': '987654321', + 'pubsub.topic': 'test-topic', + 'custom-attr': 'value' + } + + // Optimized extraction logic + const carrier = {} + const traceHeaders = ['traceparent', 'tracestate', + 'x-datadog-trace-id', 'x-datadog-parent-id', + 'x-datadog-sampling-priority', 'x-datadog-tags'] + for (const header of traceHeaders) { + if (attrs[header]) { + carrier[header] = attrs[header] + } + } + + expect(carrier).to.deep.equal({ + traceparent: '00-12345-67890-01', + tracestate: 'dd=s:1', + 'x-datadog-trace-id': '123456789', + 'x-datadog-parent-id': '987654321' + }) + + // Should not include non-trace headers + expect(carrier['pubsub.topic']).to.be.undefined + expect(carrier['custom-attr']).to.be.undefined + }) + }) + + describe('Project ID Extraction', () => { + it('should extract project ID from subscription path', () => { + const subscription = 'projects/my-gcp-project/subscriptions/my-subscription' + + const match = subscription.match(/projects\/([^\\/]+)\/subscriptions/) + const projectId = match ? match[1] : null + + expect(projectId).to.equal('my-gcp-project') + }) + + it('should handle invalid subscription paths', () => { + const subscription = 'invalid-subscription-format' + + const match = subscription.match(/projects\/([^\\/]+)\/subscriptions/) + const projectId = match ? match[1] : null + + expect(projectId).to.be.null + }) + }) + + describe('Span Tag Creation', () => { + it('should create proper span tags for PubSub', () => { + const message = { messageId: 'test-msg-123' } + const subscription = 'projects/test-project/subscriptions/test-sub' + const projectId = 'test-project' + const topicName = 'test-topic' + + // Test createPubSubSpan tag logic + const spanTags = { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': 'push' + } + + expect(spanTags.component).to.equal('google-cloud-pubsub') + expect(spanTags['span.kind']).to.equal('consumer') + expect(spanTags['pubsub.delivery_method']).to.equal('push') + expect(spanTags['pubsub.message_id']).to.equal('test-msg-123') + }) + + it('should create proper span tags for Cloud Events', () => { + const message = { messageId: 'test-msg-123' } + const subscription = 'projects/test-project/subscriptions/eventarc-sub' + const projectId = 'test-project' + const topicName = 'test-topic' + const attrs = { + 'ce-source': '//pubsub.googleapis.com/projects/test/topics/test-topic', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished' + } + const req = { + headers: { + 'ce-id': 'test-msg-123', + 'ce-specversion': '1.0', + 'ce-time': '2023-01-01T00:00:00Z' + } + } + + // Test createCloudEventSpan tag logic + const spanTags = { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': 'eventarc' + } + + // Add Cloud Event specific tags + if (attrs['ce-source']) spanTags['cloudevents.source'] = attrs['ce-source'] + if (attrs['ce-type']) spanTags['cloudevents.type'] = attrs['ce-type'] + if (req.headers['ce-id']) spanTags['cloudevents.id'] = req.headers['ce-id'] + if (req.headers['ce-specversion']) spanTags['cloudevents.specversion'] = req.headers['ce-specversion'] + if (req.headers['ce-time']) spanTags['cloudevents.time'] = req.headers['ce-time'] + spanTags['eventarc.trigger'] = 'pubsub' + + expect(spanTags['pubsub.delivery_method']).to.equal('eventarc') + expect(spanTags['cloudevents.source']).to.equal('//pubsub.googleapis.com/projects/test/topics/test-topic') + expect(spanTags['cloudevents.type']).to.equal('google.cloud.pubsub.topic.v1.messagePublished') + expect(spanTags['cloudevents.id']).to.equal('test-msg-123') + expect(spanTags['cloudevents.specversion']).to.equal('1.0') + expect(spanTags['eventarc.trigger']).to.equal('pubsub') + }) + }) + + describe('HTTP Server Integration Tests', () => { + let server, port + + beforeEach((done) => { + server = http.createServer((req, res) => { + res.writeHead(200) + res.end('OK') + }) + server.listen(0, () => { + port = server.address().port + done() + }) + }) + + afterEach((done) => { + if (server) { + server.close(done) + } else { + done() + } + }) + + it('should handle PubSub push requests', (done) => { + const pubsubPayload = JSON.stringify({ + message: { + data: Buffer.from('test message').toString('base64'), + messageId: 'test-message-id', + attributes: { + traceparent: '00-12345678901234567890123456789012-1234567890123456-01', + 'pubsub.topic': 'test-topic' + } + }, + subscription: 'projects/test-project/subscriptions/test-sub' + }) + + const req = http.request({ + hostname: 'localhost', + port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)', + 'Content-Length': Buffer.byteLength(pubsubPayload) + } + }, (res) => { + expect(res.statusCode).to.equal(200) + res.on('data', () => {}) + res.on('end', () => { + // Verify channel was called for PubSub detection + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.on('error', done) + req.write(pubsubPayload) + req.end() + }) + + it('should handle Eventarc Cloud Events', (done) => { + const eventarcPayload = JSON.stringify({ + message: { + data: Buffer.from('test message').toString('base64'), + messageId: 'test-eventarc-id', + attributes: { + traceparent: '00-abc123-def456-01', + 'pubsub.topic': 'test-topic' + } + }, + subscription: 'projects/test-project/subscriptions/eventarc-sub' + }) + + const req = http.request({ + hostname: 'localhost', + port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'ce-specversion': '1.0', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished', + 'ce-source': '//pubsub.googleapis.com/projects/test/topics/test-topic', + 'ce-id': 'test-eventarc-id', + 'Content-Length': Buffer.byteLength(eventarcPayload) + } + }, (res) => { + expect(res.statusCode).to.equal(200) + res.on('data', () => {}) + res.on('end', () => { + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.on('error', done) + req.write(eventarcPayload) + req.end() + }) + + it('should handle regular HTTP requests normally', (done) => { + const req = http.request({ + hostname: 'localhost', + port, + path: '/', + method: 'GET' + }, (res) => { + expect(res.statusCode).to.equal(200) + res.on('data', () => {}) + res.on('end', () => { + setTimeout(() => { + expect(startServerSpy).to.have.been.called + done() + }, 50) + }) + }) + + req.on('error', done) + req.end() + }) + + it('should handle invalid JSON gracefully', (done) => { + const req = http.request({ + hostname: 'localhost', + port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)', + 'Content-Length': Buffer.byteLength('invalid json') + } + }, (res) => { + expect(res.statusCode).to.equal(200) + res.on('data', () => {}) + res.on('end', done) + }) + + req.on('error', done) + req.write('invalid json') + req.end() + }) + + it('should handle large payloads within limit', (done) => { + const largeMessage = 'x'.repeat(1024 * 1024) // 1MB message + const largePayload = JSON.stringify({ + message: { + data: Buffer.from(largeMessage).toString('base64'), + messageId: 'large-message-id', + attributes: { + 'pubsub.topic': 'test-topic' + } + }, + subscription: 'projects/test-project/subscriptions/test-sub' + }) + + const req = http.request({ + hostname: 'localhost', + port, + path: '/', + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'User-Agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)', + 'Content-Length': Buffer.byteLength(largePayload) + } + }, (res) => { + expect(res.statusCode).to.equal(200) + res.on('data', () => {}) + res.on('end', done) + }) + + req.on('error', done) + req.write(largePayload) + req.end() + }) + }) + + describe('Utility Functions', () => { + it('should validate body size limits', () => { + const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB + + expect(MAX_BODY_SIZE).to.equal(10485760) + expect(1024 * 1024).to.be.lessThan(MAX_BODY_SIZE) // 1MB < 10MB + expect(MAX_BODY_SIZE + 1).to.be.greaterThan(MAX_BODY_SIZE) + }) + }) +}) diff --git a/packages/datadog-instrumentations/test/http.spec.js b/packages/datadog-instrumentations/test/http.spec.js index 2b20f6ba508..c226df8a143 100644 --- a/packages/datadog-instrumentations/test/http.spec.js +++ b/packages/datadog-instrumentations/test/http.spec.js @@ -238,7 +238,7 @@ describe('server', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -276,7 +276,7 @@ describe('server', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -304,7 +304,7 @@ describe('server', () => { it('should publish to startServerCh for regular HTTP requests', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'GET' }, (res) => { @@ -336,7 +336,7 @@ describe('server', () => { it('should handle request errors gracefully', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { From b53886c073c32b44673204968e3dc6ff563ad319 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 7 Aug 2025 18:42:21 -0400 Subject: [PATCH 06/10] move pubsub push logic to its own file --- .../src/gcp-pubsub-push.js | 281 ++++++++++++++++ .../src/http/server.js | 307 +++--------------- .../test/http-server-pubsub.spec.js | 177 ++++++++-- .../test/http.spec.js | 69 +++- 4 files changed, 534 insertions(+), 300 deletions(-) create mode 100644 packages/datadog-instrumentations/src/gcp-pubsub-push.js diff --git a/packages/datadog-instrumentations/src/gcp-pubsub-push.js b/packages/datadog-instrumentations/src/gcp-pubsub-push.js new file mode 100644 index 00000000000..668328a554d --- /dev/null +++ b/packages/datadog-instrumentations/src/gcp-pubsub-push.js @@ -0,0 +1,281 @@ +'use strict' + +// Detection functions +function isPubSubRequest (req) { + return req.method === 'POST' && + req.headers['content-type']?.includes('application/json') && + req.headers['user-agent']?.includes('APIs-Google') +} + +function isCloudEventRequest (req) { + return req.method === 'POST' && !!req.headers['ce-specversion'] +} + +// Message parsing functions +function parseCloudEventMessage (json, req) { + // Eventarc only uses Binary Content Mode with ce-specversion header + const message = json.message || json + const attrs = { ...message?.attributes } + const subscription = json.subscription || req.headers['ce-subscription'] || 'cloud-event-subscription' + + // For Eventarc: prioritize message attributes (original trace) over transport headers + if (!attrs.traceparent) { + const ceTraceParent = req.headers['ce-traceparent'] || req.headers.traceparent + if (ceTraceParent) attrs.traceparent = ceTraceParent + } + if (!attrs.tracestate) { + const ceTraceState = req.headers['ce-tracestate'] || req.headers.tracestate + if (ceTraceState) attrs.tracestate = ceTraceState + } + + // Add Cloud Event context from headers to attributes for span tags + if (req.headers['ce-source']) attrs['ce-source'] = req.headers['ce-source'] + if (req.headers['ce-type']) attrs['ce-type'] = req.headers['ce-type'] + return { message, subscription, attrs } +} + +function parsePubSubMessage (json) { + // Traditional PubSub push format + const message = json.message + const subscription = json.subscription + const attrs = message?.attributes || {} + return { message, subscription, attrs } +} + +// Utility functions +function extractTraceContext (tracer, attrs) { + const carrier = {} + const traceHeaders = ['traceparent', 'tracestate', + 'x-datadog-trace-id', 'x-datadog-parent-id', + 'x-datadog-sampling-priority', 'x-datadog-tags'] + + for (const header of traceHeaders) { + if (attrs[header]) { + carrier[header] = attrs[header] + } + } + + return tracer.extract('text_map', carrier) || null +} + +function extractProjectAndTopic (attrs, subscription) { + let projectId = attrs['gcloud.project_id'] + let topicName = attrs['pubsub.topic'] + + if (!projectId && subscription) { + const match = subscription.match(/projects\/([^\\/]+)\/subscriptions/) + if (match) projectId = match[1] + } + + if (!topicName) { + topicName = 'push-subscription-topic' + } + + return { projectId, topicName } +} + +function createSpan (tracer, parent, topicName, projectId, subscription, message, attrs, req, isCloudEvent) { + const spanTags = { + component: 'google-cloud-pubsub', + 'span.kind': 'consumer', + 'gcloud.project_id': projectId || 'unknown', + 'pubsub.topic': topicName || 'unknown', + 'pubsub.subscription': subscription, + 'pubsub.message_id': message?.messageId, + 'pubsub.delivery_method': isCloudEvent ? 'eventarc' : 'push' + } + + // Add Cloud Event specific tags + if (isCloudEvent) { + if (attrs['ce-source']) spanTags['cloudevents.source'] = attrs['ce-source'] + if (attrs['ce-type']) spanTags['cloudevents.type'] = attrs['ce-type'] + if (req.headers['ce-id']) spanTags['cloudevents.id'] = req.headers['ce-id'] + if (req.headers['ce-specversion']) spanTags['cloudevents.specversion'] = req.headers['ce-specversion'] + if (req.headers['ce-time']) spanTags['cloudevents.time'] = req.headers['ce-time'] + spanTags['eventarc.trigger'] = 'pubsub' + } + // Try different approaches to set the parent using a ternary expression + const spanOptions = { + ...(parent && parent._spanId ? { childOf: parent } : {}), + resource: topicName, + type: 'worker', + tags: spanTags, + metrics: { + 'pubsub.ack': 1 + } + } + const span = tracer.startSpan('pubsub.receive', spanOptions) + + // CRITICAL FIX: If parent ID is still undefined, manually set it + if (!span.context().parentId && parent && parent._spanId) { + // Force the parent relationship + span.context()._parentId = parent._spanId + span.context()._traceId = parent._traceId + } + + return span +} + +// Main event processing function - creates spans but doesn't emit (wrapper handles that) +function processEventRequest (req, res, emit, server, originalArgs, isCloudEvent) { + const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' + + // Get tracer from global reference + const tracer = global._ddtrace + if (!tracer) { + return // Let wrapper handle emit + } + + // Collect raw body for message parsing with error handling + const chunks = [] + const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB limit + let bodySize = 0 + + const cleanup = () => { + req.removeAllListeners('data') + req.removeAllListeners('end') + req.removeAllListeners('error') + } + + const handleError = () => { + cleanup() + // Let wrapper handle emit + } + + req.on('error', handleError) + + req.on('data', chunk => { + bodySize += chunk.length + if (bodySize > MAX_BODY_SIZE) { + handleError(new Error(`Request body too large: ${bodySize} bytes (limit: ${MAX_BODY_SIZE})`)) + return + } + chunks.push(chunk) + }) + + req.on('end', () => { + try { + const body = Buffer.concat(chunks).toString('utf8') + const json = JSON.parse(body) + req.body = json + req._pubsubBodyParsed = true + + // Parse message based on event type + const parsedEvent = isCloudEvent + ? parseCloudEventMessage(json, req) + : parsePubSubMessage(json) + + if (!parsedEvent) { + cleanup() + return // Let wrapper handle emit + } + + const { message, subscription, attrs } = parsedEvent + + if (!attrs || typeof attrs !== 'object' || Object.keys(attrs).length === 0) { + cleanup() + return // Let wrapper handle emit + } + // Extract trace context and project/topic info + const parent = extractTraceContext(tracer, attrs) + const { projectId, topicName } = extractProjectAndTopic(attrs, subscription) + // Create span + let span + try { + span = createSpan(tracer, parent, topicName, projectId, subscription, message, attrs, req, isCloudEvent) + } catch { + cleanup() + return // Let wrapper handle emit + } + + // Attach span to request for application code + req._datadog = { span } + req._eventType = eventType + req._pubsubSpanCreated = true + + // Set PubSub span as parent for HTTP and Express spans + req._parentSpan = span + + // CRITICAL: Activate span scope and emit like the old working code + const scope = tracer.scope() + const finishSpan = () => { + try { + if (span && !span.finished) { + span.finish() + } + } catch {} + cleanup() + } + + // Set up span finishing when response completes + res.on('finish', finishSpan) + res.on('close', finishSpan) + res.on('error', (resError) => { + if (span && !span.finished) { + span.setTag('error', true) + span.setTag('error.message', resError.message) + } + finishSpan() + }) + + try { + scope.activate(span, () => { + // CRITICAL: Inject PubSub span context into request headers + // This ensures HTTP plugin creates HTTP span as child of PubSub span + const spanContext = span.context() + tracer.inject(spanContext, 'http_headers', req.headers) + + // CRITICAL: Manually create HTTP span as child of PubSub span + // Since plugin subscriptions run outside our activated context, we create it directly + const httpSpan = tracer.startSpan('http.request', { + childOf: span, + tags: { + 'http.method': req.method, + 'http.url': `${req.headers['x-forwarded-proto'] || 'http'}://${req.headers.host}${req.url}`, + 'span.kind': 'server', + component: 'http' + } + }) + + // CRITICAL: Set up web context to use our HTTP span so HTTP plugin doesn't create a new one + const web = require('../../dd-trace/src/plugins/util/web') + const context = web.patch(req) + context.span = httpSpan + context.tracer = tracer + context.res = res + + // Set up HTTP span finishing + const finishHttpSpan = () => { + if (httpSpan && !httpSpan.finished) { + httpSpan.setTag('http.status_code', res.statusCode) + httpSpan.finish() + } + } + res.on('finish', finishHttpSpan) + res.on('close', finishHttpSpan) + + // CRITICAL: Activate HTTP span and call emit so Express inherits from HTTP span + scope.activate(httpSpan, () => { + emit.apply(server, originalArgs) + }) + }) + } catch { + if (span && !span.finished) { + span.finish() + } + cleanup() + emit.apply(server, originalArgs) + } + } catch { + cleanup() + // Let wrapper handle emit + } + }) +} + +// Export functions for use by server.js +module.exports = { + isPubSubRequest, + isCloudEventRequest, + processEventRequest +} diff --git a/packages/datadog-instrumentations/src/http/server.js b/packages/datadog-instrumentations/src/http/server.js index 40dad004748..4b31f433a7e 100644 --- a/packages/datadog-instrumentations/src/http/server.js +++ b/packages/datadog-instrumentations/src/http/server.js @@ -6,6 +6,16 @@ const { } = require('../helpers/instrument') const shimmer = require('../../../datadog-shimmer') +const httpNames = ['http', 'node:http'] +const httpsNames = ['https', 'node:https'] + +// Import GCP PubSub Push & Cloud Events functionality +const { + isPubSubRequest, + isCloudEventRequest, + processEventRequest +} = require('../gcp-pubsub-push') + const startServerCh = channel('apm:http:server:request:start') const exitServerCh = channel('apm:http:server:request:exit') const errorServerCh = channel('apm:http:server:request:error') @@ -16,253 +26,11 @@ const startSetHeaderCh = channel('datadog:http:server:response:set-header:start' const requestFinishedSet = new WeakSet() -const httpNames = ['http', 'node:http'] -const httpsNames = ['https', 'node:https'] - -function parseCloudEventMessage (json, req) { - // Eventarc only uses Binary Content Mode with ce-specversion header - // Payload structure: {"message": {...}, "subscription": "..."} - const message = json.message || json - const attrs = { ...message?.attributes } - const subscription = json.subscription || req.headers['ce-subscription'] || 'cloud-event-subscription' - - // For Eventarc: prioritize message attributes (original trace) over transport headers - // Only use CE headers if message attributes don't have trace context - if (!attrs.traceparent) { - const ceTraceParent = req.headers['ce-traceparent'] || req.headers.traceparent - if (ceTraceParent) attrs.traceparent = ceTraceParent - } - if (!attrs.tracestate) { - const ceTraceState = req.headers['ce-tracestate'] || req.headers.tracestate - if (ceTraceState) attrs.tracestate = ceTraceState - } - - // Add Cloud Event context from headers to attributes for span tags - if (req.headers['ce-source']) attrs['ce-source'] = req.headers['ce-source'] - if (req.headers['ce-type']) attrs['ce-type'] = req.headers['ce-type'] - return { message, subscription, attrs } -} - -function parsePubSubMessage (json) { - // Traditional PubSub push format - const message = json.message - const subscription = json.subscription - const attrs = message?.attributes || {} - return { message, subscription, attrs } -} - -function createCloudEventSpan (tracer, parent, topicName, projectId, subscription, message, attrs, req) { - const spanTags = { - component: 'google-cloud-pubsub', - 'span.kind': 'consumer', - 'gcloud.project_id': projectId || 'unknown', - 'pubsub.topic': topicName || 'unknown', - 'pubsub.subscription': subscription, - 'pubsub.message_id': message?.messageId, - 'pubsub.delivery_method': 'eventarc', - 'eventarc.trigger': 'pubsub', - } - - // Add Cloud Event specific tags - if (attrs['ce-source']) spanTags['eventarc.source'] = attrs['ce-source'] - if (attrs['ce-type']) spanTags['eventarc.type'] = attrs['ce-type'] - if (req.headers['ce-id']) spanTags['eventarc.id'] = req.headers['ce-id'] - if (req.headers['ce-specversion']) spanTags['eventarc.specversion'] = req.headers['ce-specversion'] - if (req.headers['ce-time']) spanTags['eventarc.time'] = req.headers['ce-time'] - - return tracer.startSpan('google-cloud-pubsub.receive', { - childOf: parent, - resource: topicName, - type: 'worker', - tags: spanTags, - metrics: { - 'pubsub.ack': 1 - } - }) -} - -function createPubSubSpan (tracer, parent, topicName, projectId, subscription, message, attrs) { - const spanTags = { - component: 'google-cloud-pubsub', - 'span.kind': 'consumer', - 'gcloud.project_id': projectId || 'unknown', - 'pubsub.topic': topicName || 'unknown', - 'pubsub.subscription': subscription, - 'pubsub.message_id': message?.messageId, - 'pubsub.delivery_method': 'push' - } - return tracer.startSpan('google-cloud-pubsub.receive', { - childOf: parent, - resource: topicName, - type: 'worker', - tags: spanTags, - metrics: { - 'pubsub.ack': 1 - } - }) -} - -function handleCloudEvent (req, res, emit, server, originalArgs) { - // Get tracer from global reference (avoids circular dependencies) - const tracer = global._ddtrace - if (!tracer) { - return emit.apply(server, originalArgs) - } - - return processEventRequest(req, res, emit, server, originalArgs, tracer, true) -} - -function handlePubSubPush (req, res, emit, server, originalArgs) { - // Get tracer from global reference (avoids circular dependencies) - const tracer = global._ddtrace - if (!tracer) { - return emit.apply(server, originalArgs) - } - - return processEventRequest(req, res, emit, server, originalArgs, tracer, false) -} - -function processEventRequest (req, res, emit, server, originalArgs, tracer, isCloudEvent) { - const eventType = isCloudEvent ? 'Cloud Event' : 'PubSub push' - - // Collect raw body for PubSub message parsing with error handling - const chunks = [] - const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB limit for large Pub/Sub payloads - let bodySize = 0 - - const cleanup = () => { - req.removeAllListeners('data') - req.removeAllListeners('end') - req.removeAllListeners('error') - } - - // eslint-disable-next-line n/handle-callback-err - const handleError = (error) => { - cleanup() - emit.apply(server, originalArgs) - } - - req.on('error', handleError) - - req.on('data', chunk => { - bodySize += chunk.length - if (bodySize > MAX_BODY_SIZE) { - handleError(new Error(`Request body too large: ${bodySize} bytes (limit: ${MAX_BODY_SIZE})`)) - return - } - chunks.push(chunk) - }) - - req.on('end', () => { - try { - // Efficiently combine chunks for large payloads - const body = Buffer.concat(chunks).toString('utf8') - const json = JSON.parse(body) - req.body = json // Set parsed body for framework use - req._pubsubBodyParsed = true // Flag to skip body-parser - - // Parse message based on event type - const parsedEvent = isCloudEvent - ? parseCloudEventMessage(json, req) - : parsePubSubMessage(json) - - if (!parsedEvent) { - cleanup() - return emit.apply(server, originalArgs) - } - - const { message, subscription, attrs } = parsedEvent - - if (!attrs || typeof attrs !== 'object' || Object.keys(attrs).length === 0) { - cleanup() - return emit.apply(server, originalArgs) - } - - // Extract trace context from PubSub message attributes (optimized) - const carrier = {} - const traceHeaders = ['traceparent', 'tracestate', - 'x-datadog-trace-id', 'x-datadog-parent-id', - 'x-datadog-sampling-priority', 'x-datadog-tags'] - for (const header of traceHeaders) { - if (attrs[header]) { - carrier[header] = attrs[header] - } - } - // Extract parent span context (key for distributed tracing!) - const parent = tracer.extract('text_map', carrier) - - // Extract project ID and topic from subscription path if not in attributes - let projectId = attrs['gcloud.project_id'] - let topicName = attrs['pubsub.topic'] - - if (!projectId && subscription) { - // Extract from subscription path: projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME - const match = subscription.match(/projects\/([^\\/]+)\/subscriptions/) - if (match) projectId = match[1] - } - - if (!topicName) { - topicName = 'push-subscription-topic' - } - - // Create span based on event type - let span - try { - span = isCloudEvent - ? createCloudEventSpan(tracer, parent, topicName, projectId, subscription, message, attrs, req) - : createPubSubSpan(tracer, parent, topicName, projectId, subscription, message, attrs) - } catch { - cleanup() - return emit.apply(server, originalArgs) - } - - // Attach span to request for application code - req._datadog = { span } - req._eventType = eventType - - // Activate span scope and continue with error handling - const scope = tracer.scope() - const finishSpan = () => { - try { - if (span && !span.finished) { - span.finish() - } - } catch {} - cleanup() - } - try { - scope.activate(span, () => { - // Finish span when response completes (with error handling) - res.on('finish', finishSpan) - res.on('close', finishSpan) - res.on('error', (resError) => { - if (span && !span.finished) { - span.setTag('error', true) - span.setTag('error.message', resError.message) - } - finishSpan() - }) - - // Continue with normal request processing - emit.apply(server, originalArgs) - }) - } catch { - if (span && !span.finished) { - span.finish() - } - cleanup() - emit.apply(server, originalArgs) - } - } catch { - cleanup() - emit.apply(server, originalArgs) - } - }) -} - addHook({ name: httpNames }, http => { shimmer.wrap(http.ServerResponse.prototype, 'emit', wrapResponseEmit) + shimmer.wrap(http.Server.prototype, 'emit', wrapEmitForGcp) shimmer.wrap(http.Server.prototype, 'emit', wrapEmit) + shimmer.wrap(http.ServerResponse.prototype, 'writeHead', wrapWriteHead) shimmer.wrap(http.ServerResponse.prototype, 'write', wrapWrite) shimmer.wrap(http.ServerResponse.prototype, 'end', wrapEnd) @@ -277,6 +45,7 @@ addHook({ name: httpNames }, http => { addHook({ name: httpsNames }, http => { // http.ServerResponse not present on https + shimmer.wrap(http.Server.prototype, 'emit', wrapEmitForGcp) shimmer.wrap(http.Server.prototype, 'emit', wrapEmit) return http }) @@ -296,6 +65,36 @@ function wrapResponseEmit (emit) { } } +// GCP PubSub/Cloud Events wrapper (first priority) +function wrapEmitForGcp (emit) { + return function (eventName, req, res) { + // Only process 'request' events + if (eventName !== 'request') { + return emit.apply(this, arguments) + } + + // Prioritize Cloud Events over PubSub push (Cloud Events can be delivered via PubSub) + if (isCloudEventRequest(req)) { + // Add Cloud Event specific flag for downstream processing + req._isCloudEvent = true + // Delegate entirely to processEventRequest - it handles emit internally, so we return early + processEventRequest(req, res, emit, this, arguments, true) + // Return early to prevent double emit - processEventRequest already called emit.apply() + return true + } else if (isPubSubRequest(req)) { + // Add PubSub specific flag for downstream processing + req._isPubSubPush = true + // Delegate entirely to processEventRequest - it handles emit internally, so we return early + processEventRequest(req, res, emit, this, arguments, false) + // Return early to prevent double emit - processEventRequest already called emit.apply() + return true + } + + // Continue to next wrapper (shimmer chains) - for regular HTTP only + return emit.apply(this, arguments) + } +} + function wrapEmit (emit) { return function (eventName, req, res) { if (!startServerCh.hasSubscribers) { @@ -303,24 +102,12 @@ function wrapEmit (emit) { } if (eventName === 'request') { - // Handle PubSub push AND Cloud Events at HTTP server level - works with ANY framework - if (req.method === 'POST') { - // Cloud Events detection (Eventarc uses Binary Content Mode with ce-specversion header) - const isCloudEvent = req.headers['ce-specversion'] - - // Traditional PubSub push detection - const isPubSubPush = req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google') - - if (isCloudEvent) { - return handleCloudEvent(req, res, emit, this, arguments) - } else if (isPubSubPush) { - return handlePubSubPush(req, res, emit, this, arguments) - } - } - res.req = req + if (req._isPubSubPush || req._isCloudEvent) { + return emit.apply(this, arguments) + } + // Normal HTTP request processing (not PubSub/Cloud Events) const abortController = new AbortController() startServerCh.publish({ req, res, abortController }) diff --git a/packages/datadog-instrumentations/test/http-server-pubsub.spec.js b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js index 4605b54abc4..f266e97910a 100644 --- a/packages/datadog-instrumentations/test/http-server-pubsub.spec.js +++ b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js @@ -5,14 +5,31 @@ const sinon = require('sinon') const dc = require('dc-polyfill') const http = require('http') +// Import the actual functions from our implementation +const { isPubSubRequest, isCloudEventRequest, processEventRequest } = require('../src/gcp-pubsub-push') + describe('HTTP Server PubSub Instrumentation Tests', () => { let tracer, startServerCh, startServerSpy before(() => { // Mock tracer for unit tests tracer = { - startSpan: sinon.stub(), - extract: sinon.stub(), + startSpan: sinon.stub().returns({ + context: sinon.stub().returns({ + parentId: 'parent-123', + toSpanId: sinon.stub().returns('span-456'), + toTraceId: sinon.stub().returns('trace-789') + }), + setTag: sinon.stub(), + finish: sinon.stub(), + finished: false + }), + extract: sinon.stub().returns({ + _spanId: 'extracted-parent-123', + _traceId: 'extracted-trace-456', + toTraceId: sinon.stub().returns('extracted-trace-456') + }), + inject: sinon.stub(), scope: sinon.stub().returns({ activate: sinon.stub().callsArg(1) }) @@ -49,13 +66,9 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { } } - // Test the actual detection logic used in server.js - const isCloudEvent = req.headers['ce-specversion'] - const isPubSubPush = req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google') - - expect(isCloudEvent).to.be.undefined - expect(isPubSubPush).to.be.true + // Test the actual detection logic from gcp-pubsub-push + expect(isCloudEventRequest(req)).to.be.false + expect(isPubSubRequest(req)).to.be.true }) it('should detect Eventarc Cloud Events', () => { @@ -69,12 +82,8 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { } } - const isCloudEvent = req.headers['ce-specversion'] - const isPubSubPush = req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google') - - expect(isCloudEvent).to.equal('1.0') - expect(isPubSubPush).to.be.false // Cloud Events take precedence + expect(isCloudEventRequest(req)).to.be.true + expect(isPubSubRequest(req)).to.be.false }) it('should not detect regular HTTP requests', () => { @@ -86,12 +95,8 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { } } - const isCloudEvent = req.headers['ce-specversion'] - const isPubSubPush = req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google') - - expect(isCloudEvent).to.be.undefined - expect(isPubSubPush).to.be.false + expect(isCloudEventRequest(req)).to.be.false + expect(isPubSubRequest(req)).to.be.false }) }) @@ -320,7 +325,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { @@ -332,9 +337,10 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { expect(res.statusCode).to.equal(200) res.on('data', () => {}) res.on('end', () => { - // Verify channel was called for PubSub detection + // PubSub requests are now handled entirely by our custom logic + // and return early, so standard HTTP channels are NOT called setTimeout(() => { - expect(startServerSpy).to.have.been.called + // Just verify the request completed successfully done() }, 50) }) @@ -360,7 +366,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { @@ -375,8 +381,10 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { expect(res.statusCode).to.equal(200) res.on('data', () => {}) res.on('end', () => { + // Cloud Events are now handled entirely by our custom logic + // and return early, so standard HTTP channels are NOT called setTimeout(() => { - expect(startServerSpy).to.have.been.called + // Just verify the request completed successfully done() }, 50) }) @@ -390,7 +398,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { it('should handle regular HTTP requests normally', (done) => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'GET' }, (res) => { @@ -411,7 +419,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { it('should handle invalid JSON gracefully', (done) => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { @@ -445,7 +453,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { @@ -465,6 +473,111 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { }) }) + describe('processEventRequest function', () => { + let mockReq, mockRes, mockEmit, mockServer, mockArgs + + beforeEach(() => { + mockReq = { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'user-agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)' + }, + on: sinon.stub(), + removeAllListeners: sinon.stub() + } + + mockRes = { + on: sinon.stub(), + statusCode: 200 + } + + mockEmit = sinon.stub() + mockServer = {} + mockArgs = ['request', mockReq, mockRes] + + global._ddtrace = tracer + }) + + afterEach(() => { + delete global._ddtrace + }) + + it('should create PubSub span for traditional PubSub requests', () => { + // Setup mock request body parsing + const chunks = [Buffer.from(JSON.stringify({ + message: { + data: Buffer.from('test').toString('base64'), + messageId: 'test-id-123', + attributes: { + traceparent: '00-12345-67890-01', + 'pubsub.topic': 'test-topic' + } + }, + subscription: 'projects/test/subscriptions/test-sub' + }))] + + let dataCallback + mockReq.on.callsFake((event, callback) => { + if (event === 'data') dataCallback = callback + if (event === 'end') { + setTimeout(() => { + chunks.forEach(chunk => dataCallback(chunk)) + callback() + }, 0) + } + }) + + // Call processEventRequest + processEventRequest(mockReq, mockRes, mockEmit, mockServer, mockArgs, false) + + // Verify tracer methods were called + setTimeout(() => { + expect(tracer.extract).to.have.been.called + expect(tracer.startSpan).to.have.been.called + expect(tracer.inject).to.have.been.called + }, 10) + }) + + it('should create Cloud Event span for Eventarc requests', () => { + mockReq.headers['ce-specversion'] = '1.0' + mockReq.headers['ce-type'] = 'google.cloud.pubsub.topic.v1.messagePublished' + mockReq.headers['ce-source'] = '//pubsub.googleapis.com/projects/test/topics/test-topic' + + const chunks = [Buffer.from(JSON.stringify({ + message: { + data: Buffer.from('test').toString('base64'), + messageId: 'test-eventarc-id', + attributes: { + traceparent: '00-abc123-def456-01' + } + }, + subscription: 'projects/test/subscriptions/eventarc-sub' + }))] + + let dataCallback + mockReq.on.callsFake((event, callback) => { + if (event === 'data') dataCallback = callback + if (event === 'end') { + setTimeout(() => { + chunks.forEach(chunk => dataCallback(chunk)) + callback() + }, 0) + } + }) + + // Call processEventRequest for Cloud Event + processEventRequest(mockReq, mockRes, mockEmit, mockServer, mockArgs, true) + + // Verify tracer methods were called + setTimeout(() => { + expect(tracer.extract).to.have.been.called + expect(tracer.startSpan).to.have.been.called + expect(tracer.inject).to.have.been.called + }, 10) + }) + }) + describe('Utility Functions', () => { it('should validate body size limits', () => { const MAX_BODY_SIZE = 10 * 1024 * 1024 // 10MB @@ -473,5 +586,11 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { expect(1024 * 1024).to.be.lessThan(MAX_BODY_SIZE) // 1MB < 10MB expect(MAX_BODY_SIZE + 1).to.be.greaterThan(MAX_BODY_SIZE) }) + + it('should have required functions exported', () => { + expect(typeof isPubSubRequest).to.equal('function') + expect(typeof isCloudEventRequest).to.equal('function') + expect(typeof processEventRequest).to.equal('function') + }) }) }) diff --git a/packages/datadog-instrumentations/test/http.spec.js b/packages/datadog-instrumentations/test/http.spec.js index c226df8a143..4251081693a 100644 --- a/packages/datadog-instrumentations/test/http.spec.js +++ b/packages/datadog-instrumentations/test/http.spec.js @@ -1,9 +1,13 @@ 'use strict' -const { assert } = require('chai') +const { assert, expect } = require('chai') +const sinon = require('sinon') const dc = require('dc-polyfill') const agent = require('../../dd-trace/test/plugins/agent') + +// Import the GCP PubSub Push functions for testing +const { isPubSubRequest, isCloudEventRequest } = require('../src/gcp-pubsub-push') describe('client', () => { let url, http, startChannelCb, endChannelCb, asyncStartChannelCb, errorChannelCb @@ -214,7 +218,7 @@ describe('server', () => { } }) - describe('PubSub detection integration', () => { + describe('GCP PubSub Push detection', () => { beforeEach((done) => { server = http.createServer((req, res) => { res.writeHead(200) @@ -226,7 +230,34 @@ describe('server', () => { }) }) - it('should publish to startServerCh for PubSub requests', (done) => { + it('should detect PubSub push requests correctly', () => { + const pubsubReq = { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'user-agent': 'APIs-Google; (+https://developers.google.com/webmasters/APIs-Google.html)' + } + } + + expect(isPubSubRequest(pubsubReq)).to.be.true + expect(isCloudEventRequest(pubsubReq)).to.be.false + }) + + it('should detect Cloud Event requests correctly', () => { + const cloudEventReq = { + method: 'POST', + headers: { + 'ce-specversion': '1.0', + 'ce-type': 'google.cloud.pubsub.topic.v1.messagePublished', + 'content-type': 'application/json' + } + } + + expect(isCloudEventRequest(cloudEventReq)).to.be.true + expect(isPubSubRequest(cloudEventReq)).to.be.false + }) + + it('should handle PubSub requests via HTTP server', (done) => { const pubsubPayload = JSON.stringify({ message: { data: Buffer.from('test').toString('base64'), @@ -238,7 +269,7 @@ describe('server', () => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { @@ -249,9 +280,10 @@ describe('server', () => { }, (res) => { res.on('data', () => {}) res.on('end', () => { - // Verify the channel was called + // PubSub requests are now handled entirely by our custom logic + // and return early, so standard HTTP channels are NOT called setTimeout(() => { - expect(startServerSpy).to.have.been.called + // Just verify the request completed successfully done() }, 50) }) @@ -261,7 +293,7 @@ describe('server', () => { req.end() }) - it('should publish to startServerCh for Eventarc Cloud Events', (done) => { + it('should handle Cloud Event requests via HTTP server', (done) => { const eventarcPayload = JSON.stringify({ message: { data: Buffer.from('test').toString('base64'), @@ -290,8 +322,10 @@ describe('server', () => { }, (res) => { res.on('data', () => {}) res.on('end', () => { + // Cloud Events are now handled entirely by our custom logic + // and return early, so standard HTTP channels are NOT called setTimeout(() => { - expect(startServerSpy).to.have.been.called + // Just verify the request completed successfully done() }, 50) }) @@ -301,10 +335,10 @@ describe('server', () => { req.end() }) - it('should publish to startServerCh for regular HTTP requests', (done) => { + it('should handle regular HTTP requests normally', (done) => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'GET' }, (res) => { @@ -319,6 +353,19 @@ describe('server', () => { req.end() }) + + it('should not detect regular requests as PubSub or Cloud Events', () => { + const regularReq = { + method: 'GET', + headers: { + 'content-type': 'text/html', + 'user-agent': 'Mozilla/5.0' + } + } + + expect(isPubSubRequest(regularReq)).to.be.false + expect(isCloudEventRequest(regularReq)).to.be.false + }) }) describe('error handling for server', () => { @@ -336,7 +383,7 @@ describe('server', () => { it('should handle request errors gracefully', (done) => { const req = http.request({ hostname: 'localhost', - port, + port: port, path: '/', method: 'POST', headers: { From dfe16d27fae5ba0c16ee4481bb1fce941854da06 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 7 Aug 2025 18:45:12 -0400 Subject: [PATCH 07/10] remove unknown --- packages/datadog-instrumentations/src/gcp-pubsub-push.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/datadog-instrumentations/src/gcp-pubsub-push.js b/packages/datadog-instrumentations/src/gcp-pubsub-push.js index 668328a554d..19b887e1880 100644 --- a/packages/datadog-instrumentations/src/gcp-pubsub-push.js +++ b/packages/datadog-instrumentations/src/gcp-pubsub-push.js @@ -78,8 +78,8 @@ function createSpan (tracer, parent, topicName, projectId, subscription, message const spanTags = { component: 'google-cloud-pubsub', 'span.kind': 'consumer', - 'gcloud.project_id': projectId || 'unknown', - 'pubsub.topic': topicName || 'unknown', + 'gcloud.project_id': projectId, + 'pubsub.topic': topicName, 'pubsub.subscription': subscription, 'pubsub.message_id': message?.messageId, 'pubsub.delivery_method': isCloudEvent ? 'eventarc' : 'push' From 7ee875d9efdb416ee2f8e569d157dcbd287c0753 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Thu, 7 Aug 2025 19:14:58 -0400 Subject: [PATCH 08/10] fix test and lint --- .../datadog-instrumentations/src/gcp-pubsub-push.js | 4 ++-- .../test/http-server-pubsub.spec.js | 10 +++++----- packages/datadog-instrumentations/test/http.spec.js | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/datadog-instrumentations/src/gcp-pubsub-push.js b/packages/datadog-instrumentations/src/gcp-pubsub-push.js index 19b887e1880..2a42877e10b 100644 --- a/packages/datadog-instrumentations/src/gcp-pubsub-push.js +++ b/packages/datadog-instrumentations/src/gcp-pubsub-push.js @@ -3,8 +3,8 @@ // Detection functions function isPubSubRequest (req) { return req.method === 'POST' && - req.headers['content-type']?.includes('application/json') && - req.headers['user-agent']?.includes('APIs-Google') + !!req.headers['content-type']?.includes('application/json') && + !!req.headers['user-agent']?.includes('APIs-Google') } function isCloudEventRequest (req) { diff --git a/packages/datadog-instrumentations/test/http-server-pubsub.spec.js b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js index f266e97910a..4a61b4c5a50 100644 --- a/packages/datadog-instrumentations/test/http-server-pubsub.spec.js +++ b/packages/datadog-instrumentations/test/http-server-pubsub.spec.js @@ -325,7 +325,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -366,7 +366,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -398,7 +398,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { it('should handle regular HTTP requests normally', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'GET' }, (res) => { @@ -419,7 +419,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { it('should handle invalid JSON gracefully', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -453,7 +453,7 @@ describe('HTTP Server PubSub Instrumentation Tests', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { diff --git a/packages/datadog-instrumentations/test/http.spec.js b/packages/datadog-instrumentations/test/http.spec.js index 4251081693a..5058affa77e 100644 --- a/packages/datadog-instrumentations/test/http.spec.js +++ b/packages/datadog-instrumentations/test/http.spec.js @@ -269,7 +269,7 @@ describe('server', () => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { @@ -338,7 +338,7 @@ describe('server', () => { it('should handle regular HTTP requests normally', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'GET' }, (res) => { @@ -383,7 +383,7 @@ describe('server', () => { it('should handle request errors gracefully', (done) => { const req = http.request({ hostname: 'localhost', - port: port, + port, path: '/', method: 'POST', headers: { From 5816e4225127d6d8aa6dec9e26baa39086273286 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Fri, 8 Aug 2025 13:40:57 -0400 Subject: [PATCH 09/10] remove req._pubsubBodyParsed --- packages/datadog-instrumentations/src/body-parser.js | 8 ++++---- packages/datadog-instrumentations/src/gcp-pubsub-push.js | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/datadog-instrumentations/src/body-parser.js b/packages/datadog-instrumentations/src/body-parser.js index 2317dd43042..1b4e276f500 100644 --- a/packages/datadog-instrumentations/src/body-parser.js +++ b/packages/datadog-instrumentations/src/body-parser.js @@ -26,8 +26,8 @@ addHook({ versions: ['>=1.4.0 <1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { - // Skip body parsing if PubSub/Cloud Event middleware already parsed it - if (req._pubsubBodyParsed) { + // Skip body parsing if body has already been parsed by any middleware + if (req.body !== undefined) { return next() } const nextResource = new AsyncResource('bound-anonymous-fn') @@ -42,8 +42,8 @@ addHook({ versions: ['>=1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { - // Skip body parsing if PubSub/Cloud Event middleware already parsed it - if (req._pubsubBodyParsed) { + // Skip body parsing if body has already been parsed by any middleware + if (req.body !== undefined) { return next() } arguments[2] = publishRequestBodyAndNext(req, res, next) diff --git a/packages/datadog-instrumentations/src/gcp-pubsub-push.js b/packages/datadog-instrumentations/src/gcp-pubsub-push.js index 2a42877e10b..1cbeb5e6ccd 100644 --- a/packages/datadog-instrumentations/src/gcp-pubsub-push.js +++ b/packages/datadog-instrumentations/src/gcp-pubsub-push.js @@ -158,7 +158,6 @@ function processEventRequest (req, res, emit, server, originalArgs, isCloudEvent const body = Buffer.concat(chunks).toString('utf8') const json = JSON.parse(body) req.body = json - req._pubsubBodyParsed = true // Parse message based on event type const parsedEvent = isCloudEvent From b056b6af339c890cf684975c13c553564190ac55 Mon Sep 17 00:00:00 2001 From: nina9753 Date: Mon, 11 Aug 2025 11:29:20 -0400 Subject: [PATCH 10/10] fix req.body check and tests --- .../src/body-parser.js | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/packages/datadog-instrumentations/src/body-parser.js b/packages/datadog-instrumentations/src/body-parser.js index 1b4e276f500..16645e8ddfb 100644 --- a/packages/datadog-instrumentations/src/body-parser.js +++ b/packages/datadog-instrumentations/src/body-parser.js @@ -26,8 +26,17 @@ addHook({ versions: ['>=1.4.0 <1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { - // Skip body parsing if body has already been parsed by any middleware - if (req.body !== undefined) { + // Skip body parsing if body has already been meaningfully parsed by any middleware + if (req.body !== undefined && req.body !== null && + ((typeof req.body === 'object' && Object.keys(req.body).length > 0) || + (typeof req.body === 'string' && req.body.length > 0))) { + // Still publish the channel so AppSec and IAST can process the body + if (bodyParserReadCh.hasSubscribers && req) { + const abortController = new AbortController() + const body = req.body + bodyParserReadCh.publish({ req, res, body, abortController }) + if (abortController.signal.aborted) return + } return next() } const nextResource = new AsyncResource('bound-anonymous-fn') @@ -42,8 +51,17 @@ addHook({ versions: ['>=1.20.0'] }, read => { return shimmer.wrapFunction(read, read => function (req, res, next) { - // Skip body parsing if body has already been parsed by any middleware - if (req.body !== undefined) { + // Skip body parsing if body has already been meaningfully parsed by any middleware + if (req.body !== undefined && req.body !== null && + ((typeof req.body === 'object' && Object.keys(req.body).length > 0) || + (typeof req.body === 'string' && req.body.length > 0))) { + // Still publish the channel so AppSec and IAST can process the body + if (bodyParserReadCh.hasSubscribers && req) { + const abortController = new AbortController() + const body = req.body + bodyParserReadCh.publish({ req, res, body, abortController }) + if (abortController.signal.aborted) return + } return next() } arguments[2] = publishRequestBodyAndNext(req, res, next)