Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3f6f0e1
feat(routing): implement multi-tenant routing context support
PROFeNoM Dec 23, 2025
f15d74a
Removed default parameters from getCurrentRouting, now returns null i…
PROFeNoM Dec 23, 2025
21aec03
restore comment + use size AFTER truncation to make flushing decision
PROFeNoM Dec 23, 2025
4f9e357
refactor(routing-context): simplify getCurrentRouting function and en…
PROFeNoM Dec 23, 2025
ff7c821
Backward compat comment + Nested context test
PROFeNoM Dec 23, 2025
34fcd38
fix: API Key Logged in Debug Output
PROFeNoM Jan 5, 2026
d5647ea
chore: remove unnecessary comments
PROFeNoM Jan 5, 2026
06bc6bd
Converted to private # syntax, revised tests, and added rule to AGENT…
PROFeNoM Jan 6, 2026
d8b5417
fix
PROFeNoM Jan 6, 2026
5cfc108
#originalEndpoint
PROFeNoM Jan 6, 2026
bc41f53
remove some redundancies
PROFeNoM Jan 6, 2026
8396b85
add comment for legacy
PROFeNoM Jan 6, 2026
dd64483
refactor(routing-context): implement withRoutingContext directly in L…
PROFeNoM Jan 7, 2026
e1f460d
refactor
PROFeNoM Jan 7, 2026
b38fe4a
refactor
PROFeNoM Jan 7, 2026
ecaa34e
warn + private methods
PROFeNoM Jan 7, 2026
145ebac
refactor(llmobs): simplify warning message for nested routing context…
PROFeNoM Jan 8, 2026
3e253a8
Update AGENTS.md
PROFeNoM Jan 8, 2026
e6eee74
test(llmobs): enhance multi-tenant routing tests with improved assert…
PROFeNoM Jan 8, 2026
dbb1fde
refactor(llmobs): introduce LLMObsBuffer for improved event handling …
PROFeNoM Jan 15, 2026
df1542a
trim redundancy
PROFeNoM Jan 15, 2026
5a94a16
avoid double /evp_proxy/v2/... prefix
PROFeNoM Jan 15, 2026
5cd9405
feat(llmobs): add warning for routing context usage in agent proxy mo…
PROFeNoM Jan 15, 2026
ee4c068
lint
PROFeNoM Jan 15, 2026
ad914c9
Sam's code enhancements
PROFeNoM Jan 15, 2026
93a9c48
rename withRoutingContext to routingContextt
PROFeNoM Jan 15, 2026
a9c0186
add support for submitEvaluation
PROFeNoM Jan 20, 2026
a3d89ff
handle signature change
PROFeNoM Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ Never use the `doesNotThrow()` assertion. Instead, execute the method directly.
- Only define types inside of a method, if it can not be inferred otherwise
- Only rewrite code for better types in case it was explicitly requested by the user

### Class Properties and Methods
- Use `#privateField` syntax for private properties and methods in new code
- Do not use underscore prefix (`_property`) for new private members
- Existing underscore properties should be refactored, when a file is touched for other
code changes, if no other files rely on these properties (test files do not count).

### Import Ordering

Separate groups with empty line, sort alphabetically within each:
Expand Down
20 changes: 20 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,14 @@ declare namespace tracer {
*/
annotationContext<T> (options: llmobs.AnnotationContextOptions, fn: () => T): T

/**
* Execute a function within a routing context, directing all LLMObs spans to a specific Datadog organization.
* @param options The routing context options containing the target API key and optional site.
* @param fn The callback over which to apply the routing context.
* @returns The result of the function.
*/
routingContext<T> (options: llmobs.RoutingContextOptions, fn: () => T): T

/**
* Flushes any remaining spans and evaluation metrics to LLM Observability.
*/
Expand Down Expand Up @@ -3206,6 +3214,18 @@ declare namespace tracer {
name?: string,
}

interface RoutingContextOptions {
/**
* The Datadog API key for the target organization.
*/
ddApiKey: string,

/**
* The Datadog site for the target organization (e.g., 'datadoghq.eu').
*/
ddSite?: string,
}

/**
* An object containing the span ID and trace ID of interest
*/
Expand Down
5 changes: 4 additions & 1 deletion packages/dd-trace/src/llmobs/constants/tags.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ module.exports = {
PROMPT_MULTIMODAL: 'prompt_multimodal',
INSTRUMENTATION_METHOD_AUTO: 'auto',
INSTRUMENTATION_METHOD_ANNOTATED: 'annotated',
INSTRUMENTATION_METHOD_UNKNOWN: 'unknown'
INSTRUMENTATION_METHOD_UNKNOWN: 'unknown',

ROUTING_API_KEY: '_dd.llmobs.routing.api_key',
ROUTING_SITE: '_dd.llmobs.routing.site'
}
4 changes: 2 additions & 2 deletions packages/dd-trace/src/llmobs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ function handleSpanProcess (span) {
spanProcessor.process(span)
}

function handleEvalMetricAppend (payload) {
function handleEvalMetricAppend ({ payload, routing }) {
try {
evalWriter.append(payload)
evalWriter.append(payload, routing)
} catch (e) {
log.warn(
// eslint-disable-next-line @stylistic/max-len
Expand Down
2 changes: 2 additions & 0 deletions packages/dd-trace/src/llmobs/noop.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class NoopLLMObs {
deregisterProcessor () {}

annotationContext (options, fn) { return fn() }

routingContext (options, fn) { return fn() }
}

module.exports = NoopLLMObs
26 changes: 25 additions & 1 deletion packages/dd-trace/src/llmobs/sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ class LLMObs extends NoopLLMObs {
timestamp_ms: timestampMs,
tags: Object.entries(evaluationTags).map(([key, value]) => `${key}:${value}`)
}
evalMetricAppendCh.publish(payload)
const currentStore = storage.getStore()
const routing = currentStore?.routingContext
evalMetricAppendCh.publish({ payload, routing })
} finally {
telemetry.recordSubmitEvaluation(options, err)
}
Expand All @@ -440,6 +442,28 @@ class LLMObs extends NoopLLMObs {
return storage.run(store, fn)
}

routingContext (options, fn) {
if (!this.enabled) return fn()
if (!options?.ddApiKey) {
throw new Error('ddApiKey is required for routing context')
}
const currentStore = storage.getStore()
if (currentStore?.routingContext) {
logger.warn(
'[LLM Observability] Nested routing context detected. Inner context will override outer context. ' +
'Spans created in the inner context will only be sent to the inner context.'
)
}
const store = {
...currentStore,
routingContext: {
apiKey: options.ddApiKey,
site: options.ddSite
}
}
return storage.run(store, fn)
}

flush () {
if (!this.enabled) return

Expand Down
12 changes: 10 additions & 2 deletions packages/dd-trace/src/llmobs/span_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ const {
TAGS,
PARENT_ID_KEY,
SESSION_ID,
NAME
NAME,
ROUTING_API_KEY,
ROUTING_SITE
} = require('./constants/tags')
const { UNSERIALIZABLE_VALUE_TEXT } = require('./constants/text')
const telemetry = require('./telemetry')
Expand Down Expand Up @@ -78,7 +80,13 @@ class LLMObsSpanProcessor {
telemetry.incrementLLMObsSpanFinishedCount(span)
if (formattedEvent == null) return

this.#writer.append(formattedEvent)
const mlObsTags = LLMObsTagger.tagMap.get(span)
const routing = {
apiKey: mlObsTags[ROUTING_API_KEY],
site: mlObsTags[ROUTING_SITE]
}

this.#writer.append(formattedEvent, routing)
} catch (e) {
// this should be a rare case
// we protect against unserializable properties in the format function, and in
Expand Down
12 changes: 11 additions & 1 deletion packages/dd-trace/src/llmobs/tagger.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ const {
REASONING_OUTPUT_TOKENS_METRIC_KEY,
INTEGRATION,
DECORATOR,
PROPAGATED_ML_APP_KEY
PROPAGATED_ML_APP_KEY,
ROUTING_API_KEY,
ROUTING_SITE
} = require('./constants/tags')
const { storage } = require('./storage')

Expand Down Expand Up @@ -110,6 +112,14 @@ class LLMObsTagger {
// apply annotation context name
const annotationContextName = annotationContext?.name
if (annotationContextName) this._setTag(span, NAME, annotationContextName)

const routing = storage.getStore()?.routingContext
if (routing) {
this._setTag(span, ROUTING_API_KEY, routing.apiKey)
if (routing.site) {
this._setTag(span, ROUTING_SITE, routing.site)
}
}
}

// TODO: similarly for the following `tag` methods,
Expand Down
136 changes: 108 additions & 28 deletions packages/dd-trace/src/llmobs/writers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,37 @@ const {
} = require('../constants/writers')
const { parseResponseAndLog } = require('./util')

class LLMObsBuffer {
constructor ({ events, size, routing = {}, isDefault = false, limit = 1000 }) {
this.events = events
this.size = size
this.routing = routing
this.isDefault = isDefault
this.limit = limit
}

clear () {
this.events = []
this.size = 0
}
}

class BaseLLMObsWriter {
#destroyer
/** @type {Map<string, LLMObsBuffer>} */
#multiTenantBuffers = new Map()

constructor ({ interval, timeout, eventType, config, endpoint, intake }) {
this._interval = interval ?? getEnvironmentVariable('_DD_LLMOBS_FLUSH_INTERVAL') ?? 1000 // 1s
this._timeout = timeout ?? getEnvironmentVariable('_DD_LLMOBS_TIMEOUT') ?? 5000 // 5s
this._eventType = eventType

this._buffer = []
this._bufferLimit = 1000
this._bufferSize = 0
/** @type {LLMObsBuffer} */
this._buffer = new LLMObsBuffer({ events: [], size: 0, isDefault: true })

this._config = config
this._endpoint = endpoint
this._baseEndpoint = endpoint // should not be unset
this._intake = intake

this._periodic = setInterval(() => {
Expand All @@ -41,48 +59,109 @@ class BaseLLMObsWriter {
this.#destroyer = destroyer
}

// Split on protocol separator to preserve it
// path.join will remove some slashes unnecessarily
#buildUrl (baseUrl, endpoint) {
const [protocol, rest] = baseUrl.split('://')
return protocol + '://' + path.join(rest, endpoint)
}

get url () {
if (this._agentless == null) return null
return this.#buildUrl(this._baseUrl.href, this._endpoint)
}

const baseUrl = this._baseUrl.href
const endpoint = this._endpoint

// Split on protocol separator to preserve it
// path.join will remove some slashes unnecessarily
const [protocol, rest] = baseUrl.split('://')
return protocol + '://' + path.join(rest, endpoint)
_getBuffer (routing) {
if (!routing?.apiKey) {
return this._buffer
}
const apiKey = routing.apiKey
let buffer = this.#multiTenantBuffers.get(apiKey)
if (!buffer) {
buffer = new LLMObsBuffer({ events: [], size: 0, routing })
this.#multiTenantBuffers.set(apiKey, buffer)
}
return buffer
}

append (event, byteLength) {
if (this._buffer.length >= this._bufferLimit) {
logger.warn(`${this.constructor.name} event buffer full (limit is ${this._bufferLimit}), dropping event`)
append (event, routing, byteLength) {
const buffer = this._getBuffer(routing)

if (buffer.events.length >= buffer.limit) {
logger.warn(`${this.constructor.name} event buffer full (limit is ${buffer.limit}), dropping event`)
telemetry.recordDroppedPayload(1, this._eventType, 'buffer_full')
return
}

this._bufferSize += byteLength || Buffer.byteLength(JSON.stringify(event))
this._buffer.push(event)
const eventSize = byteLength || Buffer.byteLength(JSON.stringify(event))

buffer.size += eventSize
buffer.events.push(event)
}

flush () {
const noAgentStrategy = this._agentless == null

if (this._buffer.length === 0 || noAgentStrategy) {
if (this._agentless == null) {
return
}

const events = this._buffer
this._buffer = []
this._bufferSize = 0
const payload = this._encode(this.makePayload(events))
// Flush default buffer
if (this._buffer.events.length > 0) {
const events = this._buffer.events
this._buffer.clear()

const payload = this._encode(this.makePayload(events))

log.debug('Encoded LLMObs payload: %s', payload)
log.debug('Encoded LLMObs payload: %s', payload)

const options = this._getOptions()
const options = this._getOptions()

request(payload, options, (err, resp, code) => {
parseResponseAndLog(err, code, events.length, this.url, this._eventType)
})
}

request(payload, options, (err, resp, code) => {
parseResponseAndLog(err, code, events.length, this.url, this._eventType)
})
// Flush multi-tenant buffers
for (const [apiKey, buffer] of this.#multiTenantBuffers) {
if (buffer.events.length === 0) continue

const events = buffer.events
buffer.clear()

const payload = this._encode(this.makePayload(events))
const site = buffer.routing.site || this._config.site
const options = {
headers: {
'Content-Type': 'application/json',
'DD-API-KEY': apiKey
},
method: 'POST',
timeout: this._timeout,
url: new URL(format({
protocol: 'https:',
hostname: `${this._intake}.${site}`
})),
path: this._baseEndpoint
}
const url = this.#buildUrl(options.url.href, options.path)
const maskedApiKey = apiKey ? `****${apiKey.slice(-4)}` : ''

log.debug('Encoding and flushing multi-tenant buffer for %s', maskedApiKey)
log.debug('Encoded LLMObs payload: %s', payload)

request(payload, options, (err, resp, code) => {
parseResponseAndLog(err, code, events.length, url, this._eventType)
})
}

this.#cleanupEmptyBuffers()
}

#cleanupEmptyBuffers () {
for (const [key, buffer] of this.#multiTenantBuffers) {
if (buffer.events.length === 0) {
this.#multiTenantBuffers.delete(key)
}
}
}

makePayload (events) {}
Expand All @@ -109,10 +188,11 @@ class BaseLLMObsWriter {

_getUrlAndPath () {
if (this._agentless) {
const site = this._config.site
return {
url: new URL(format({
protocol: 'https:',
hostname: `${this._intake}.${this._config.site}`
hostname: `${this._intake}.${site}`
})),
endpoint: this._endpoint
}
Expand Down
7 changes: 4 additions & 3 deletions packages/dd-trace/src/llmobs/writers/spans.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class LLMObsSpanWriter extends BaseWriter {
})
}

append (event) {
append (event, routing) {
const eventSizeBytes = Buffer.byteLength(JSON.stringify(event))
telemetry.recordLLMObsRawSpanSize(event, eventSizeBytes)

Expand All @@ -39,12 +39,13 @@ class LLMObsSpanWriter extends BaseWriter {

telemetry.recordLLMObsSpanSize(event, processedEventSizeBytes, shouldTruncate)

if (this._bufferSize + eventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) {
const buffer = this._getBuffer(routing)
if (buffer.size + processedEventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) {
logger.debug('Flushing queue because queuing next event will exceed EvP payload limit')
this.flush()
}

super.append(event, processedEventSizeBytes)
super.append(event, routing, processedEventSizeBytes)
}

makePayload (events) {
Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/test/llmobs/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ describe('module', () => {

const payload = {}

evalMetricAppendCh.publish(payload)
evalMetricAppendCh.publish({ payload })

sinon.assert.calledWith(LLMObsEvalMetricsWriterSpy().append, payload)
sinon.assert.calledWith(LLMObsEvalMetricsWriterSpy().append, payload, undefined)
})

it('removes all subscribers when disabling', () => {
Expand Down
Loading
Loading