Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ const log = require('../log')
if (NODE_MAJOR >= 20) { /* Use Node.js 20+ API */ }
```

### Event handlers

- Avoid adding new listeners, if possible
- Use monitor symbols like `events.errorMonitor` when available
- Use `.once()` methods instead of `.on()`, if the event is only needed once
- If new `beforeExit` events on `process` are needed, add them to `globalThis[Symbol.for('dd-trace')].beforeExitHandlers`

### Performance and Memory

**CRITICAL: Tracer runs in application hot paths - every operation counts.**
Expand Down
18 changes: 9 additions & 9 deletions integration-tests/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ async function runAndCheckOutput (filename, cwd, expectedOut, expectedSource) {
assert(proc.pid !== undefined, 'Process PID is not available')
const pid = proc.pid
let out = await new Promise((resolve, reject) => {
proc.on('error', reject)
proc.once('error', reject)
let out = Buffer.alloc(0)
proc.stdout.on('data', data => {
out = Buffer.concat([out, data])
})
proc.stderr.pipe(process.stdout)
proc.on('exit', () => resolve(out.toString('utf8')))
proc.once('exit', () => resolve(out.toString('utf8')))
if (shouldKill) {
setTimeout(() => {
if (proc.exitCode === null) proc.kill()
Expand Down Expand Up @@ -223,8 +223,8 @@ function spawnProc (filename, options = {}, stdioHandler, stderrHandler) {
proc.url = `http://localhost:${port}`
resolve(proc)
})
.on('error', reject)
.on('exit', code => {
.once('error', reject)
.once('exit', code => {
reject(new Error(`Process exited with status code ${code}.`))
})
})
Expand All @@ -251,8 +251,8 @@ function spawnProcAndExpectExit (filename, options = {}, stdioHandler, stderrHan

return new Promise((resolve, reject) => {
proc
.on('error', reject)
.on('exit', code => {
.once('error', reject)
.once('exit', code => {
if (code !== 0) {
return reject(new Error(`Process exited with status code ${code}.`))
}
Expand Down Expand Up @@ -585,11 +585,11 @@ async function curl (url) {
http.get(url, res => {
const bufs = []
res.on('data', d => bufs.push(d))
res.on('end', () => {
res.once('end', () => {
resolve(Object.assign(res, { body: Buffer.concat(bufs).toString('utf8') }))
})
res.on('error', reject)
}).on('error', reject)
res.once('error', reject)
}).once('error', reject)
})
}

Expand Down
10 changes: 7 additions & 3 deletions packages/datadog-instrumentations/src/ai.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ const noopTracer = {
}
}

const tracers = new WeakSet()

function wrapTracer (tracer) {
if (Object.hasOwn(tracer, Symbol.for('_dd.wrapped'))) return
if (tracers.has(tracer)) {
return
}

tracers.add(tracer)

shimmer.wrap(tracer, 'startActiveSpan', function (startActiveSpan) {
return function () {
Expand Down Expand Up @@ -90,8 +96,6 @@ function wrapTracer (tracer) {
})
}
})

Object.defineProperty(tracer, Symbol.for('_dd.wrapped'), { value: true })
}

function wrapWithTracer (fn) {
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-instrumentations/src/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ function wrapChildProcessAsyncMethod (ChildProcess, shell = false) {
childProcessChannel.error.publish(context)
})

childProcess.on('close', (code = 0) => {
childProcess.once('close', (code = 0) => {
if (!errorExecuted && code !== 0) {
childProcessChannel.error.publish(context)
}
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-instrumentations/src/graphql.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const {
channel
} = require('./helpers/instrument')

const ddGlobal = globalThis[Symbol.for('dd-trace')] ??= {}
const ddGlobal = globalThis[Symbol.for('dd-trace')]

/** cached objects */

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
'use strict'

// Use a global symbol to prevent stealthy-require to interfere.
// TODO: Use the symbol from dd-trace instead and remove this file.
const sym = Symbol.for('_ddtrace_instrumentations')
globalThis[sym] ??= {}

global[sym] = global[sym] || {}

module.exports = global[sym]
module.exports = globalThis[sym]
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/http/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ function patch (http, methodName) {
case 'response': {
const res = arg
ctx.res = res
res.on('end', finish)
res.on(errorMonitor, finish)
res.once('end', finish)
res.once(errorMonitor, finish)
break
}
case 'connect':
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/light-my-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ function wrapDispatchFunc (dispatchFunc) {

// light-my-request Response emits 'finish' when done
if (res.on && typeof res.on === 'function') {
res.on('finish', onFinish)
res.on('close', onFinish)
res.once('finish', onFinish)
res.once('close', onFinish)
}

// Also wrap end() as fallback
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-instrumentations/src/mocha/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ addHook({
return run.apply(this, arguments)
}
// We flush when the worker ends with its test file (a mocha instance in a worker runs a single test file)
this.on('end', () => {
this.once('end', () => {
workerFinishCh.publish()
})
this.on('test', getOnTestHandler(false))
Expand Down
2 changes: 1 addition & 1 deletion packages/datadog-instrumentations/src/mysql.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ addHook({ name: 'mysql', file: 'lib/Connection.js', versions: ['>=2'] }, Connect
return finishCh.runStores(ctx, cb, this, error, result)
})
} else {
res.on('end', () => finishCh.publish(ctx))
res.once('end', () => finishCh.publish(ctx))
}

return res
Expand Down
4 changes: 2 additions & 2 deletions packages/datadog-instrumentations/src/mysql2.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ function wrapConnection (Connection, version) {
finishCh.runStores(ctx, onResult, this, ...arguments)
})
} else {
this.on(errorMonitor, error => {
this.once(errorMonitor, error => {
ctx.error = error
errorCh.publish(ctx)
})
this.on('end', () => finishCh.publish(ctx))
this.once('end', () => finishCh.publish(ctx))
}

this.execute = execute
Expand Down
18 changes: 13 additions & 5 deletions packages/datadog-instrumentations/src/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,43 @@ function getOptions (args) {
}

function setupListeners (socket, protocol, ctx, finishCh, errorCh) {
const events = ['connect', errorMonitor, 'close', 'timeout']
const events = [errorMonitor, 'close', 'timeout']

const wrapListener = function (error) {
if (error) {
ctx.error = error
errorCh.publish(ctx)
}
finishCh.runStores(ctx, () => {})
cleanupOtherListeners()
}

const localListener = function () {
const localListener = function (error) {
ctx.socket = socket
connectionCh.publish(ctx)
if (error) {
ctx.error = error
errorCh.publish(ctx)
}
finishCh.runStores(ctx, () => {})
cleanupOtherListeners()
}

const cleanupListener = function () {
const cleanupOtherListeners = function () {
socket.removeListener('connect', localListener)
events.forEach(event => {
socket.removeListener(event, wrapListener)
socket.removeListener(event, cleanupListener)
})
}

// TODO: Identify why the connect listener should remove the other listeners.
if (protocol === 'tcp') {
socket.once('connect', localListener)
} else {
events.push('connect')
}

events.forEach(event => {
socket.once(event, wrapListener)
socket.once(event, cleanupListener)
})
}
7 changes: 6 additions & 1 deletion packages/datadog-plugin-net/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/c
const agent = require('../../dd-trace/test/plugins/agent')
const { expectSomeSpan } = require('../../dd-trace/test/plugins/helpers')
const { withPeerService } = require('../../dd-trace/test/setup/mocha')

describe('Plugin', () => {
let net
let tcp
Expand All @@ -18,7 +19,11 @@ describe('Plugin', () => {
let tracer
let parent

['net', 'node:net'].forEach(pluginToBeLoaded => {
before(() => {
require('events').defaultMaxListeners = 5
})

;['net', 'node:net'].forEach(pluginToBeLoaded => {
describe(pluginToBeLoaded, () => {
afterEach(() => {
return agent.close()
Expand Down
4 changes: 3 additions & 1 deletion packages/datadog-plugin-pino/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ describe('Plugin', () => {
if (semver.intersects(version, '>=8') && options.prettyPrint) {
delete options.prettyPrint // deprecated

// pino-pretty uses `on-exit-leak-free` and that adds a listener to process.
process.setMaxListeners(process.getMaxListeners() + 1)
const pretty = require('../../../versions/[email protected]').get()

stream = pretty().pipe(stream)
}

logger = pino && pino(options, stream)
logger = pino(options, stream)
}

describe('without configuration', () => {
Expand Down
19 changes: 19 additions & 0 deletions packages/dd-trace/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@ if (!global._ddtrace) {
writable: true
})

const ddTraceSymbol = Symbol.for('dd-trace')

Object.defineProperty(globalThis, ddTraceSymbol, {
value: {
beforeExitHandlers: new Set(),
},
enumerable: false,
configurable: true, // Allow this to be overridden by loading the tracer
writable: false
})

process.once('beforeExit', function mainBeforeExit () {
if (globalThis[ddTraceSymbol]?.beforeExitHandlers) {
for (const handler of globalThis[ddTraceSymbol].beforeExitHandlers) {
handler()
}
}
})

global._ddtrace.default = global._ddtrace
global._ddtrace.tracer = global._ddtrace
}
Expand Down
6 changes: 2 additions & 4 deletions packages/dd-trace/src/appsec/rasp/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ function handleUncaughtExceptionMonitor (error) {
}
} else {
const cleanUp = removeAllListeners(process, 'uncaughtException')
const handler = () => {
process.removeListener('uncaughtException', handler)
}
const handler = () => {}

setTimeout(() => {
process.removeListener('uncaughtException', handler)
cleanUp()
})

process.on('uncaughtException', handler)
process.once('uncaughtException', handler)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CiVisibilityExporter extends AgentInfoExporter {
}
})

process.once('beforeExit', () => {
const flush = () => {
if (this._writer) {
this._writer.flush()
}
Expand All @@ -78,7 +78,8 @@ class CiVisibilityExporter extends AgentInfoExporter {
if (this._logsWriter) {
this._logsWriter.flush()
}
})
}
globalThis[Symbol.for('dd-trace')].beforeExitHandlers.add(flush.bind(this))
}

shouldRequestSkippableSuites () {
Expand Down
2 changes: 1 addition & 1 deletion packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class DataStreamsProcessor {
this.timer = setInterval(this.onInterval.bind(this), flushInterval)
this.timer.unref()
}
process.once('beforeExit', () => this.onInterval())
globalThis[Symbol.for('dd-trace')].beforeExitHandlers.add(this.onInterval.bind(this))
}

onInterval () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ function isTypedArray (variable) {
}

function isInstanceOfCoreType (type, variable, fallback = `${variable} instanceof ${type}`) {
return `(process[Symbol.for('datadog:node:util:types')]?.is${type}?.(${variable}) ?? ${fallback})`
return `(globalThis[Symbol.for('dd-trace')].utilTypes?.is${type}?.(${variable}) ?? ${fallback})`
}

function getSize (variable) {
Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/src/debugger/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function start (config, rc) {
const logChannel = new MessageChannel()
configChannel = new MessageChannel()

process[Symbol.for('datadog:node:util:types')] = types
globalThis[Symbol.for('dd-trace')].utilTypes = types

readProbeFile(config.dynamicInstrumentation.probeFile, (probes) => {
const action = 'apply'
Expand Down Expand Up @@ -84,7 +84,7 @@ function start (config, rc) {
worker.on('error', (err) => log.error('[debugger] worker thread error', err))
worker.on('messageerror', (err) => log.error('[debugger] received "messageerror" from worker', err))

worker.on('exit', (code) => {
worker.once('exit', (code) => {
const error = new Error(`Dynamic Instrumentation worker thread exited unexpectedly with code ${code}`)

log.error('[debugger] worker thread exited unexpectedly', error)
Expand Down
2 changes: 1 addition & 1 deletion packages/dd-trace/src/dogstatsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class CustomMetrics {
// TODO(bengl) this magic number should be configurable
setInterval(flush, 10 * 1000).unref()

process.once('beforeExit', flush)
globalThis[Symbol.for('dd-trace')].beforeExitHandlers.add(flush)
}

increment (stat, value = 1, tags) {
Expand Down
4 changes: 1 addition & 3 deletions packages/dd-trace/src/exporters/agent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ class AgentExporter {
config
})

process.once('beforeExit', () => {
this.flush()
})
globalThis[Symbol.for('dd-trace')].beforeExitHandlers.add(this.flush.bind(this))
}

setUrl (url) {
Expand Down
2 changes: 1 addition & 1 deletion packages/dd-trace/src/exporters/common/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function request (data, options, callback) {
res.on('data', chunk => {
chunks.push(chunk)
})
res.on('end', () => {
res.once('end', () => {
activeRequests--
const buffer = Buffer.concat(chunks)

Expand Down
2 changes: 1 addition & 1 deletion packages/dd-trace/src/guardrails/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function sendTelemetry (name, tags, resultMetadata) {
proc.on('error', function () {
log.error('Failed to spawn telemetry forwarder')
})
proc.on('exit', function (code) {
proc.once('exit', function (code) {
if (code !== 0) {
log.error('Telemetry forwarder exited with code', code)
}
Expand Down
Loading