Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/datadog-instrumentations/src/body-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ addHook({
versions: ['>=1.4.0 <1.20.0']
}, read => {
return shimmer.wrapFunction(read, read => function (req, res, next) {
// Skip body parsing if body has already been meaningfully parsed by any middleware
if (req.body !== undefined && req.body !== null &&
((typeof req.body === 'object' && Object.keys(req.body).length > 0) ||
(typeof req.body === 'string' && req.body.length > 0))) {
// Still publish the channel so AppSec and IAST can process the body
if (bodyParserReadCh.hasSubscribers && req) {
const abortController = new AbortController()
const body = req.body
bodyParserReadCh.publish({ req, res, body, abortController })
if (abortController.signal.aborted) return
}
return next()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this change.
I don't think we should ever skip that functionality, since it would also interfere with user's code, if I am not mistaken.

const nextResource = new AsyncResource('bound-anonymous-fn')
arguments[2] = nextResource.bind(publishRequestBodyAndNext(req, res, next))
return read.apply(this, arguments)
Expand All @@ -38,6 +51,19 @@ addHook({
versions: ['>=1.20.0']
}, read => {
return shimmer.wrapFunction(read, read => function (req, res, next) {
// Skip body parsing if body has already been meaningfully parsed by any middleware
if (req.body !== undefined && req.body !== null &&
((typeof req.body === 'object' && Object.keys(req.body).length > 0) ||
(typeof req.body === 'string' && req.body.length > 0))) {
// Still publish the channel so AppSec and IAST can process the body
if (bodyParserReadCh.hasSubscribers && req) {
const abortController = new AbortController()
const body = req.body
bodyParserReadCh.publish({ req, res, body, abortController })
if (abortController.signal.aborted) return
}
return next()
}
arguments[2] = publishRequestBodyAndNext(req, res, next)
return read.apply(this, arguments)
})
Expand Down
14 changes: 14 additions & 0 deletions packages/datadog-instrumentations/src/http.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
'use strict'

try {
// Load the Pub/Sub Transit Handler plugin directly to ensure it gets instantiated
const TransitHandlerPlugin = require('../../datadog-plugin-google-cloud-pubsub/src/pubsub-transit-handler')

// Get tracer instance and instantiate the plugin
const tracer = require('../../dd-trace')
if (tracer && tracer._tracer && !global._dd_gcp_pubsub_transit_handler) {
// Keep a reference to avoid GC and satisfy no-new side-effect rule
global._dd_gcp_pubsub_transit_handler = new TransitHandlerPlugin(tracer)
}
} catch {
// Silently handle plugin loading errors
}

require('./http/client')
require('./http/server')
57 changes: 53 additions & 4 deletions packages/datadog-instrumentations/src/http/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ const {
addHook
} = require('../helpers/instrument')
const shimmer = require('../../../datadog-shimmer')
const { getSharedChannel } = require('../shared-channels')

const httpNames = ['http', 'node:http']
const httpsNames = ['https', 'node:https']

// Generic HTTP server instrumentation - no product-specific logic

const startServerCh = channel('apm:http:server:request:start')
const exitServerCh = channel('apm:http:server:request:exit')
Expand All @@ -14,14 +20,16 @@ const startWriteHeadCh = channel('apm:http:server:response:writeHead:start')
const finishSetHeaderCh = channel('datadog:http:server:response:set-header:finish')
const startSetHeaderCh = channel('datadog:http:server:response:set-header:start')

const requestFinishedSet = new WeakSet()
// Generic channel for request interception - use shared channel to ensure same instance
const requestInterceptCh = getSharedChannel('apm:http:server:request:intercept')

const httpNames = ['http', 'node:http']
const httpsNames = ['https', 'node:https']
const requestFinishedSet = new WeakSet()

addHook({ name: httpNames }, http => {
shimmer.wrap(http.ServerResponse.prototype, 'emit', wrapResponseEmit)
shimmer.wrap(http.Server.prototype, 'emit', wrapEmitForInterception)
shimmer.wrap(http.Server.prototype, 'emit', wrapEmit)

shimmer.wrap(http.ServerResponse.prototype, 'writeHead', wrapWriteHead)
shimmer.wrap(http.ServerResponse.prototype, 'write', wrapWrite)
shimmer.wrap(http.ServerResponse.prototype, 'end', wrapEnd)
Expand All @@ -36,6 +44,7 @@ addHook({ name: httpNames }, http => {

addHook({ name: httpsNames }, http => {
// http.ServerResponse not present on https
shimmer.wrap(http.Server.prototype, 'emit', wrapEmitForInterception)
shimmer.wrap(http.Server.prototype, 'emit', wrapEmit)
return http
})
Expand All @@ -54,6 +63,40 @@ function wrapResponseEmit (emit) {
return emit.apply(this, arguments)
}
}

// Generic request interceptor - allows any plugin to intercept requests
function wrapEmitForInterception (emit) {
return function (eventName, req, res) {
// Only process 'request' events
if (eventName !== 'request') {
return emit.apply(this, arguments)
}

// Check if any plugin wants to intercept this request
if (requestInterceptCh.hasSubscribers) {
const interceptData = {
req,
res,
emit,
server: this,
originalArgs: arguments,
handled: false // Plugin sets this to true if it handles the request
}

// Publish to generic intercept channel - any plugin can subscribe
requestInterceptCh.publish(interceptData)

// If a plugin handled it, don't continue with normal processing
if (interceptData.handled) {
return true
}
}

// No plugin intercepted, continue with normal HTTP processing
return emit.apply(this, arguments)
}
}

function wrapEmit (emit) {
return function (eventName, req, res) {
if (!startServerCh.hasSubscribers) {
Expand All @@ -62,9 +105,12 @@ function wrapEmit (emit) {

if (eventName === 'request') {
res.req = req
if (req._isPubSubPush || req._isCloudEvent) {
return emit.apply(this, arguments)
}

// Normal HTTP request processing (not PubSub/Cloud Events)
const abortController = new AbortController()

startServerCh.publish({ req, res, abortController })

try {
Expand Down Expand Up @@ -221,3 +267,6 @@ function wrapEnd (end) {
return end.apply(this, arguments)
}
}

// Export the channel for plugins to use the same instance
module.exports = { requestInterceptCh }
17 changes: 17 additions & 0 deletions packages/datadog-instrumentations/src/shared-channels.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

const { channel } = require('dc-polyfill')

// Shared channel registry to ensure all modules use the same channel instances
const channels = {}

function getSharedChannel (name) {
if (!channels[name]) {
channels[name] = channel(name)
}
return channels[name]
}

module.exports = {
getSharedChannel
}
Loading
Loading