diff --git a/AGENTS.md b/AGENTS.md index d3815ab9aed..2623adfdf16 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -144,6 +144,12 @@ Never use the `doesNotThrow()` assertion. Instead, execute the method directly. - Only define types inside of a method, if it can not be inferred otherwise - Only rewrite code for better types in case it was explicitly requested by the user +### Class Properties and Methods +- Use `#privateField` syntax for private properties and methods in new code +- Do not use underscore prefix (`_property`) for new private members +- Existing underscore properties should be refactored, when a file is touched for other + code changes, if no other files rely on these properties (test files do not count). + ### Import Ordering Separate groups with empty line, sort alphabetically within each: diff --git a/index.d.ts b/index.d.ts index c9b0bce07e8..1e0809745bd 100644 --- a/index.d.ts +++ b/index.d.ts @@ -3033,6 +3033,14 @@ declare namespace tracer { */ annotationContext (options: llmobs.AnnotationContextOptions, fn: () => T): T + /** + * Execute a function within a routing context, directing all LLMObs spans to a specific Datadog organization. + * @param options The routing context options containing the target API key and optional site. + * @param fn The callback over which to apply the routing context. + * @returns The result of the function. + */ + routingContext (options: llmobs.RoutingContextOptions, fn: () => T): T + /** * Flushes any remaining spans and evaluation metrics to LLM Observability. */ @@ -3206,6 +3214,18 @@ declare namespace tracer { name?: string, } + interface RoutingContextOptions { + /** + * The Datadog API key for the target organization. + */ + ddApiKey: string, + + /** + * The Datadog site for the target organization (e.g., 'datadoghq.eu'). + */ + ddSite?: string, + } + /** * An object containing the span ID and trace ID of interest */ diff --git a/packages/dd-trace/src/llmobs/constants/tags.js b/packages/dd-trace/src/llmobs/constants/tags.js index a2f904ffa84..c86705bb63c 100644 --- a/packages/dd-trace/src/llmobs/constants/tags.js +++ b/packages/dd-trace/src/llmobs/constants/tags.js @@ -42,5 +42,8 @@ module.exports = { PROMPT_MULTIMODAL: 'prompt_multimodal', INSTRUMENTATION_METHOD_AUTO: 'auto', INSTRUMENTATION_METHOD_ANNOTATED: 'annotated', - INSTRUMENTATION_METHOD_UNKNOWN: 'unknown' + INSTRUMENTATION_METHOD_UNKNOWN: 'unknown', + + ROUTING_API_KEY: '_dd.llmobs.routing.api_key', + ROUTING_SITE: '_dd.llmobs.routing.site' } diff --git a/packages/dd-trace/src/llmobs/index.js b/packages/dd-trace/src/llmobs/index.js index 1fe01dcbbba..32dd8504f1c 100644 --- a/packages/dd-trace/src/llmobs/index.js +++ b/packages/dd-trace/src/llmobs/index.js @@ -136,9 +136,9 @@ function handleSpanProcess (span) { spanProcessor.process(span) } -function handleEvalMetricAppend (payload) { +function handleEvalMetricAppend ({ payload, routing }) { try { - evalWriter.append(payload) + evalWriter.append(payload, routing) } catch (e) { log.warn( // eslint-disable-next-line @stylistic/max-len diff --git a/packages/dd-trace/src/llmobs/noop.js b/packages/dd-trace/src/llmobs/noop.js index 227ea9620a2..305d4a761bb 100644 --- a/packages/dd-trace/src/llmobs/noop.js +++ b/packages/dd-trace/src/llmobs/noop.js @@ -81,6 +81,8 @@ class NoopLLMObs { deregisterProcessor () {} annotationContext (options, fn) { return fn() } + + routingContext (options, fn) { return fn() } } module.exports = NoopLLMObs diff --git a/packages/dd-trace/src/llmobs/sdk.js b/packages/dd-trace/src/llmobs/sdk.js index 040e03e78aa..ffd92dec7ac 100644 --- a/packages/dd-trace/src/llmobs/sdk.js +++ b/packages/dd-trace/src/llmobs/sdk.js @@ -418,7 +418,9 @@ class LLMObs extends NoopLLMObs { timestamp_ms: timestampMs, tags: Object.entries(evaluationTags).map(([key, value]) => `${key}:${value}`) } - evalMetricAppendCh.publish(payload) + const currentStore = storage.getStore() + const routing = currentStore?.routingContext + evalMetricAppendCh.publish({ payload, routing }) } finally { telemetry.recordSubmitEvaluation(options, err) } @@ -440,6 +442,28 @@ class LLMObs extends NoopLLMObs { return storage.run(store, fn) } + routingContext (options, fn) { + if (!this.enabled) return fn() + if (!options?.ddApiKey) { + throw new Error('ddApiKey is required for routing context') + } + const currentStore = storage.getStore() + if (currentStore?.routingContext) { + logger.warn( + '[LLM Observability] Nested routing context detected. Inner context will override outer context. ' + + 'Spans created in the inner context will only be sent to the inner context.' + ) + } + const store = { + ...currentStore, + routingContext: { + apiKey: options.ddApiKey, + site: options.ddSite + } + } + return storage.run(store, fn) + } + flush () { if (!this.enabled) return diff --git a/packages/dd-trace/src/llmobs/span_processor.js b/packages/dd-trace/src/llmobs/span_processor.js index 74d74e4e52b..e6db3a794dd 100644 --- a/packages/dd-trace/src/llmobs/span_processor.js +++ b/packages/dd-trace/src/llmobs/span_processor.js @@ -26,7 +26,9 @@ const { TAGS, PARENT_ID_KEY, SESSION_ID, - NAME + NAME, + ROUTING_API_KEY, + ROUTING_SITE } = require('./constants/tags') const { UNSERIALIZABLE_VALUE_TEXT } = require('./constants/text') const telemetry = require('./telemetry') @@ -78,7 +80,13 @@ class LLMObsSpanProcessor { telemetry.incrementLLMObsSpanFinishedCount(span) if (formattedEvent == null) return - this.#writer.append(formattedEvent) + const mlObsTags = LLMObsTagger.tagMap.get(span) + const routing = { + apiKey: mlObsTags[ROUTING_API_KEY], + site: mlObsTags[ROUTING_SITE] + } + + this.#writer.append(formattedEvent, routing) } catch (e) { // this should be a rare case // we protect against unserializable properties in the format function, and in diff --git a/packages/dd-trace/src/llmobs/tagger.js b/packages/dd-trace/src/llmobs/tagger.js index a8a3d7ba978..6fe17906205 100644 --- a/packages/dd-trace/src/llmobs/tagger.js +++ b/packages/dd-trace/src/llmobs/tagger.js @@ -28,7 +28,9 @@ const { REASONING_OUTPUT_TOKENS_METRIC_KEY, INTEGRATION, DECORATOR, - PROPAGATED_ML_APP_KEY + PROPAGATED_ML_APP_KEY, + ROUTING_API_KEY, + ROUTING_SITE } = require('./constants/tags') const { storage } = require('./storage') @@ -110,6 +112,14 @@ class LLMObsTagger { // apply annotation context name const annotationContextName = annotationContext?.name if (annotationContextName) this._setTag(span, NAME, annotationContextName) + + const routing = storage.getStore()?.routingContext + if (routing) { + this._setTag(span, ROUTING_API_KEY, routing.apiKey) + if (routing.site) { + this._setTag(span, ROUTING_SITE, routing.site) + } + } } // TODO: similarly for the following `tag` methods, diff --git a/packages/dd-trace/src/llmobs/writers/base.js b/packages/dd-trace/src/llmobs/writers/base.js index d2c19047d3b..312c6f16162 100644 --- a/packages/dd-trace/src/llmobs/writers/base.js +++ b/packages/dd-trace/src/llmobs/writers/base.js @@ -16,19 +16,37 @@ const { } = require('../constants/writers') const { parseResponseAndLog } = require('./util') +class LLMObsBuffer { + constructor ({ events, size, routing = {}, isDefault = false, limit = 1000 }) { + this.events = events + this.size = size + this.routing = routing + this.isDefault = isDefault + this.limit = limit + } + + clear () { + this.events = [] + this.size = 0 + } +} + class BaseLLMObsWriter { #destroyer + /** @type {Map} */ + #multiTenantBuffers = new Map() + constructor ({ interval, timeout, eventType, config, endpoint, intake }) { this._interval = interval ?? getEnvironmentVariable('_DD_LLMOBS_FLUSH_INTERVAL') ?? 1000 // 1s this._timeout = timeout ?? getEnvironmentVariable('_DD_LLMOBS_TIMEOUT') ?? 5000 // 5s this._eventType = eventType - this._buffer = [] - this._bufferLimit = 1000 - this._bufferSize = 0 + /** @type {LLMObsBuffer} */ + this._buffer = new LLMObsBuffer({ events: [], size: 0, isDefault: true }) this._config = config this._endpoint = endpoint + this._baseEndpoint = endpoint // should not be unset this._intake = intake this._periodic = setInterval(() => { @@ -41,48 +59,109 @@ class BaseLLMObsWriter { this.#destroyer = destroyer } + // Split on protocol separator to preserve it + // path.join will remove some slashes unnecessarily + #buildUrl (baseUrl, endpoint) { + const [protocol, rest] = baseUrl.split('://') + return protocol + '://' + path.join(rest, endpoint) + } + get url () { if (this._agentless == null) return null + return this.#buildUrl(this._baseUrl.href, this._endpoint) + } - const baseUrl = this._baseUrl.href - const endpoint = this._endpoint - - // Split on protocol separator to preserve it - // path.join will remove some slashes unnecessarily - const [protocol, rest] = baseUrl.split('://') - return protocol + '://' + path.join(rest, endpoint) + _getBuffer (routing) { + if (!routing?.apiKey) { + return this._buffer + } + const apiKey = routing.apiKey + let buffer = this.#multiTenantBuffers.get(apiKey) + if (!buffer) { + buffer = new LLMObsBuffer({ events: [], size: 0, routing }) + this.#multiTenantBuffers.set(apiKey, buffer) + } + return buffer } - append (event, byteLength) { - if (this._buffer.length >= this._bufferLimit) { - logger.warn(`${this.constructor.name} event buffer full (limit is ${this._bufferLimit}), dropping event`) + append (event, routing, byteLength) { + const buffer = this._getBuffer(routing) + + if (buffer.events.length >= buffer.limit) { + logger.warn(`${this.constructor.name} event buffer full (limit is ${buffer.limit}), dropping event`) telemetry.recordDroppedPayload(1, this._eventType, 'buffer_full') return } - this._bufferSize += byteLength || Buffer.byteLength(JSON.stringify(event)) - this._buffer.push(event) + const eventSize = byteLength || Buffer.byteLength(JSON.stringify(event)) + + buffer.size += eventSize + buffer.events.push(event) } flush () { - const noAgentStrategy = this._agentless == null - - if (this._buffer.length === 0 || noAgentStrategy) { + if (this._agentless == null) { return } - const events = this._buffer - this._buffer = [] - this._bufferSize = 0 - const payload = this._encode(this.makePayload(events)) + // Flush default buffer + if (this._buffer.events.length > 0) { + const events = this._buffer.events + this._buffer.clear() + + const payload = this._encode(this.makePayload(events)) - log.debug('Encoded LLMObs payload: %s', payload) + log.debug('Encoded LLMObs payload: %s', payload) - const options = this._getOptions() + const options = this._getOptions() + + request(payload, options, (err, resp, code) => { + parseResponseAndLog(err, code, events.length, this.url, this._eventType) + }) + } - request(payload, options, (err, resp, code) => { - parseResponseAndLog(err, code, events.length, this.url, this._eventType) - }) + // Flush multi-tenant buffers + for (const [apiKey, buffer] of this.#multiTenantBuffers) { + if (buffer.events.length === 0) continue + + const events = buffer.events + buffer.clear() + + const payload = this._encode(this.makePayload(events)) + const site = buffer.routing.site || this._config.site + const options = { + headers: { + 'Content-Type': 'application/json', + 'DD-API-KEY': apiKey + }, + method: 'POST', + timeout: this._timeout, + url: new URL(format({ + protocol: 'https:', + hostname: `${this._intake}.${site}` + })), + path: this._baseEndpoint + } + const url = this.#buildUrl(options.url.href, options.path) + const maskedApiKey = apiKey ? `****${apiKey.slice(-4)}` : '' + + log.debug('Encoding and flushing multi-tenant buffer for %s', maskedApiKey) + log.debug('Encoded LLMObs payload: %s', payload) + + request(payload, options, (err, resp, code) => { + parseResponseAndLog(err, code, events.length, url, this._eventType) + }) + } + + this.#cleanupEmptyBuffers() + } + + #cleanupEmptyBuffers () { + for (const [key, buffer] of this.#multiTenantBuffers) { + if (buffer.events.length === 0) { + this.#multiTenantBuffers.delete(key) + } + } } makePayload (events) {} @@ -109,10 +188,11 @@ class BaseLLMObsWriter { _getUrlAndPath () { if (this._agentless) { + const site = this._config.site return { url: new URL(format({ protocol: 'https:', - hostname: `${this._intake}.${this._config.site}` + hostname: `${this._intake}.${site}` })), endpoint: this._endpoint } diff --git a/packages/dd-trace/src/llmobs/writers/spans.js b/packages/dd-trace/src/llmobs/writers/spans.js index 7bcfb07fc4c..dc83bd3b007 100644 --- a/packages/dd-trace/src/llmobs/writers/spans.js +++ b/packages/dd-trace/src/llmobs/writers/spans.js @@ -24,7 +24,7 @@ class LLMObsSpanWriter extends BaseWriter { }) } - append (event) { + append (event, routing) { const eventSizeBytes = Buffer.byteLength(JSON.stringify(event)) telemetry.recordLLMObsRawSpanSize(event, eventSizeBytes) @@ -39,12 +39,13 @@ class LLMObsSpanWriter extends BaseWriter { telemetry.recordLLMObsSpanSize(event, processedEventSizeBytes, shouldTruncate) - if (this._bufferSize + eventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) { + const buffer = this._getBuffer(routing) + if (buffer.size + processedEventSizeBytes > EVP_PAYLOAD_SIZE_LIMIT) { logger.debug('Flushing queue because queuing next event will exceed EvP payload limit') this.flush() } - super.append(event, processedEventSizeBytes) + super.append(event, routing, processedEventSizeBytes) } makePayload (events) { diff --git a/packages/dd-trace/test/llmobs/index.spec.js b/packages/dd-trace/test/llmobs/index.spec.js index 740d2bc2b3a..44ef15d6f60 100644 --- a/packages/dd-trace/test/llmobs/index.spec.js +++ b/packages/dd-trace/test/llmobs/index.spec.js @@ -256,9 +256,9 @@ describe('module', () => { const payload = {} - evalMetricAppendCh.publish(payload) + evalMetricAppendCh.publish({ payload }) - sinon.assert.calledWith(LLMObsEvalMetricsWriterSpy().append, payload) + sinon.assert.calledWith(LLMObsEvalMetricsWriterSpy().append, payload, undefined) }) it('removes all subscribers when disabling', () => { diff --git a/packages/dd-trace/test/llmobs/writers/base.spec.js b/packages/dd-trace/test/llmobs/writers/base.spec.js index c788491e9b2..23c6d51136c 100644 --- a/packages/dd-trace/test/llmobs/writers/base.spec.js +++ b/packages/dd-trace/test/llmobs/writers/base.spec.js @@ -159,9 +159,9 @@ describe('BaseLLMObsWriter', () => { const event = { foo: 'bar–' } writer.append(event) - assert.strictEqual(writer._buffer.length, 1) - assert.deepStrictEqual(writer._buffer[0], event) - assert.strictEqual(writer._bufferSize, 16) + assert.strictEqual(writer._buffer.events.length, 1) + assert.deepStrictEqual(writer._buffer.events[0], event) + assert.strictEqual(writer._buffer.size, 16) }) it('does not append an event if the buffer is full', () => { @@ -173,7 +173,7 @@ describe('BaseLLMObsWriter', () => { } writer.append({ foo: 'bar' }) - assert.strictEqual(writer._buffer.length, 1000) + assert.strictEqual(writer._buffer.events.length, 1000) sinon.assert.calledWith(logger.warn, 'BaseLLMObsWriter event buffer full (limit is 1000), dropping event') }) @@ -208,6 +208,20 @@ describe('BaseLLMObsWriter', () => { assert.strictEqual(requestOptions.headers['X-Datadog-EVP-Subdomain'], 'intake') }) + it('flushes routed buffers directly to intake in agent proxy mode', () => { + writer = new BaseLLMObsWriter(options) + writer.setAgentless(false) + writer.makePayload = (events) => ({ events }) + + writer.append({ foo: 'bar' }, { apiKey: 'key-a', site: 'site-a.com' }) + writer.flush() + + const requestOptions = request.getCall(0).args[1] + assert.strictEqual(requestOptions.url.href, 'https://intake.site-a.com/') + assert.strictEqual(requestOptions.path, '/endpoint') + assert.strictEqual(requestOptions.headers['DD-API-KEY'], 'key-a') + }) + it('does not flush when agentless property is not set', () => { writer = new BaseLLMObsWriter(options) writer.makePayload = (events) => ({ events }) @@ -217,8 +231,8 @@ describe('BaseLLMObsWriter', () => { writer.flush() sinon.assert.notCalled(request) - assert.strictEqual(writer._buffer.length, 1) - assert.deepStrictEqual(writer._buffer[0], event) + assert.strictEqual(writer._buffer.events.length, 1) + assert.deepStrictEqual(writer._buffer.events[0], event) writer.setAgentless(true) writer.flush() diff --git a/packages/dd-trace/test/llmobs/writers/multi-tenant.spec.js b/packages/dd-trace/test/llmobs/writers/multi-tenant.spec.js new file mode 100644 index 00000000000..474e01e53c6 --- /dev/null +++ b/packages/dd-trace/test/llmobs/writers/multi-tenant.spec.js @@ -0,0 +1,259 @@ +'use strict' + +const assert = require('node:assert/strict') +const { after, afterEach, before, beforeEach, describe, it } = require('mocha') +const proxyquire = require('proxyquire') +const sinon = require('sinon') + +const LLMObsSpanWriter = require('../../../src/llmobs/writers/spans') +const LLMObsEvalMetricsWriter = require('../../../src/llmobs/writers/evaluations') +const log = require('../../../src/log') +const agent = require('../../plugins/agent') + +describe('Multi-Tenant Routing', () => { + let BaseLLMObsWriter + let request + let logger + let writer + + const config = { + site: 'default-site.com', + hostname: 'localhost', + port: 8126, + apiKey: 'default-key' + } + + beforeEach(() => { + request = sinon.stub() + logger = { debug: sinon.stub(), warn: sinon.stub(), error: sinon.stub() } + + BaseLLMObsWriter = proxyquire('../../../src/llmobs/writers/base', { + '../../exporters/common/request': request, + '../../log': logger, + './util': proxyquire('../../../src/llmobs/writers/util', { '../../log': logger }) + }) + + writer = new BaseLLMObsWriter({ endpoint: '/endpoint', intake: 'intake', config }) + writer.setAgentless(true) + writer.makePayload = (events) => ({ events }) + }) + + afterEach(() => { + writer.destroy() + process.removeAllListeners('beforeExit') + }) + + it('routes events to correct endpoints with correct API keys', () => { + writer.append({ id: 1 }, { apiKey: 'key-a', site: 'site-a.com' }) + writer.append({ id: 2 }, { apiKey: 'key-b', site: 'site-b.com' }) + writer.append({ id: 3 }) // default routing + + writer.flush() + + assert.strictEqual(request.callCount, 3) + + const calls = request.getCalls() + const byKey = (key) => calls.find(c => c.args[1].headers['DD-API-KEY'] === key).args[1].url.href + + assert.strictEqual(byKey('key-a'), 'https://intake.site-a.com/') + assert.strictEqual(byKey('key-b'), 'https://intake.site-b.com/') + assert.strictEqual(byKey('default-key'), 'https://intake.default-site.com/') + }) + + it('isolates events between tenants', () => { + writer.append({ tenant: 'A', secret: 'A-data' }, { apiKey: 'key-a', site: 'site-a.com' }) + writer.append({ tenant: 'B', secret: 'B-data' }, { apiKey: 'key-b', site: 'site-b.com' }) + + writer.flush() + + const payloads = request.getCalls().map(c => ({ + apiKey: c.args[1].headers['DD-API-KEY'], + events: JSON.parse(c.args[0]).events + })) + + const payloadA = payloads.find(p => p.apiKey === 'key-a') + const payloadB = payloads.find(p => p.apiKey === 'key-b') + + assert.strictEqual(payloadA.events.length, 1) + assert.strictEqual(payloadA.events[0].secret, 'A-data') + assert.strictEqual(payloadB.events.length, 1) + assert.strictEqual(payloadB.events[0].secret, 'B-data') + }) + + it('enforces buffer limit per routing key', () => { + const routing = { apiKey: 'key-a', site: 'site-a.com' } + + for (let i = 0; i < 1001; i++) { + writer.append({ id: i }, routing) + } + + writer.flush() + + const payload = JSON.parse(request.getCall(0).args[0]) + assert.strictEqual(payload.events.length, 1000) + sinon.assert.calledOnce(logger.warn) + }) + + it('clears buffers after flush', () => { + writer.append({ id: 1 }, { apiKey: 'key-a', site: 'site-a.com' }) + + writer.flush() + assert.strictEqual(request.callCount, 1) + + writer.flush() + assert.strictEqual(request.callCount, 1) // no new requests + }) + + it('does not include API key in payload body', () => { + writer.append({ data: 'test' }, { apiKey: 'secret-tenant-key', site: 'tenant.com' }) + + writer.flush() + + const payload = request.getCall(0).args[0] + assert.ok(!payload.includes('secret-tenant-key')) + assert.ok(!payload.includes('default-key')) + }) + + describe('routing context behavior', () => { + let tracer + let llmobs + let appendSpy + let flushStub + let logWarnSpy + + before(() => { + process.env.DD_API_KEY = 'test-api-key' + process.env.DD_SITE = 'datadoghq.com' + + agent.wipe() + + tracer = require('../../../../dd-trace') + tracer.init({ + service: 'service', + llmobs: { + mlApp: 'mlApp', + agentlessEnabled: true + } + }) + llmobs = tracer.llmobs + }) + + let evalAppendSpy + let evalFlushStub + + beforeEach(() => { + appendSpy = sinon.spy(LLMObsSpanWriter.prototype, 'append') + flushStub = sinon.stub(LLMObsSpanWriter.prototype, 'flush') + evalAppendSpy = sinon.spy(LLMObsEvalMetricsWriter.prototype, 'append') + evalFlushStub = sinon.stub(LLMObsEvalMetricsWriter.prototype, 'flush') + logWarnSpy = sinon.spy(log, 'warn') + }) + + afterEach(() => { + appendSpy.restore() + flushStub.restore() + evalAppendSpy.restore() + evalFlushStub.restore() + logWarnSpy.restore() + }) + + after(() => { + delete process.env.DD_API_KEY + delete process.env.DD_SITE + agent.wipe() + }) + + it('nested contexts route spans correctly and log warning', () => { + llmobs.routingContext({ ddApiKey: 'outer-key', ddSite: 'outer-site.com' }, () => { + llmobs.trace({ kind: 'workflow', name: 'outer-span' }, () => {}) + + llmobs.routingContext({ ddApiKey: 'inner-key', ddSite: 'inner-site.com' }, () => { + llmobs.trace({ kind: 'workflow', name: 'inner-span' }, () => {}) + }) + + llmobs.trace({ kind: 'workflow', name: 'after-inner-span' }, () => {}) + }) + + const calls = appendSpy.getCalls() + assert.strictEqual(calls.length, 3) + + const routingFor = (name) => calls.find(c => c.args[0].name === name).args[1] + + assert.deepStrictEqual(routingFor('outer-span'), { apiKey: 'outer-key', site: 'outer-site.com' }) + assert.deepStrictEqual(routingFor('inner-span'), { apiKey: 'inner-key', site: 'inner-site.com' }) + assert.deepStrictEqual(routingFor('after-inner-span'), { apiKey: 'outer-key', site: 'outer-site.com' }) + + const warningMessages = logWarnSpy.getCalls().map(call => call.args[0]) + const nestedWarnings = warningMessages.filter(message => /Nested routing context detected/.test(message)) + assert.strictEqual(nestedWarnings.length, 1) + }) + + it('concurrent contexts are isolated', async () => { + let resolveA + let resolveB + const gateA = new Promise(resolve => { resolveA = resolve }) + const gateB = new Promise(resolve => { resolveB = resolve }) + + const taskA = llmobs.routingContext({ ddApiKey: 'key-a', ddSite: 'site-a.com' }, async () => { + await gateA + llmobs.trace({ kind: 'workflow', name: 'span-a' }, () => {}) + }) + const taskB = llmobs.routingContext({ ddApiKey: 'key-b', ddSite: 'site-b.com' }, async () => { + await gateB + llmobs.trace({ kind: 'workflow', name: 'span-b' }, () => {}) + }) + + resolveB() + resolveA() + + await Promise.all([taskA, taskB]) + + const calls = appendSpy.getCalls() + + // explicit assertion that span-b is appended before span-a + const callNames = calls.map(call => call.args[0].name) + const spanBIndex = callNames.indexOf('span-b') + const spanAIndex = callNames.indexOf('span-a') + assert.ok(spanBIndex !== -1) + assert.ok(spanAIndex !== -1) + assert.ok(spanBIndex < spanAIndex) + + const routingFor = (name) => calls.find(c => c.args[0].name === name).args[1] + + assert.deepStrictEqual(routingFor('span-a'), { apiKey: 'key-a', site: 'site-a.com' }) + assert.deepStrictEqual(routingFor('span-b'), { apiKey: 'key-b', site: 'site-b.com' }) + }) + + it('routes evaluations to correct tenant', () => { + const spanContext = { traceId: '123', spanId: '456' } + + llmobs.routingContext({ ddApiKey: 'eval-key', ddSite: 'eval-site.com' }, () => { + llmobs.submitEvaluation(spanContext, { + label: 'test-label', + metricType: 'score', + value: 0.9 + }) + }) + + assert.strictEqual(evalAppendSpy.callCount, 1) + const [payload, routing] = evalAppendSpy.firstCall.args + assert.strictEqual(payload.label, 'test-label') + assert.deepStrictEqual(routing, { apiKey: 'eval-key', site: 'eval-site.com' }) + }) + + it('evaluations outside routing context have no routing', () => { + const spanContext = { traceId: '123', spanId: '456' } + + llmobs.submitEvaluation(spanContext, { + label: 'default-label', + metricType: 'categorical', + value: 'good' + }) + + assert.strictEqual(evalAppendSpy.callCount, 1) + const [payload, routing] = evalAppendSpy.firstCall.args + assert.strictEqual(payload.label, 'default-label') + assert.strictEqual(routing, undefined) + }) + }) +}) diff --git a/packages/dd-trace/test/llmobs/writers/spans.spec.js b/packages/dd-trace/test/llmobs/writers/spans.spec.js index d033e5ee641..a7de0e2f2b1 100644 --- a/packages/dd-trace/test/llmobs/writers/spans.spec.js +++ b/packages/dd-trace/test/llmobs/writers/spans.spec.js @@ -62,7 +62,7 @@ describe('LLMObsSpanWriter', () => { writer.append(event) - assert.strictEqual(writer._bufferSize, eventSizeBytes) + assert.strictEqual(writer._buffer.size, eventSizeBytes) }) it('truncates the event if it exceeds the size limit', () => { @@ -78,7 +78,7 @@ describe('LLMObsSpanWriter', () => { writer.append(event) - const bufferEvent = writer._buffer[0] + const bufferEvent = writer._buffer.events[0] assert.deepStrictEqual(bufferEvent, { name: 'test', meta: { @@ -93,8 +93,8 @@ describe('LLMObsSpanWriter', () => { writer = new LLMObsSpanWriter(config) writer.flush = sinon.stub() - writer._bufferSize = (5 << 20) - 1 - writer._buffer = Array.from({ length: 10 }) + writer._buffer.size = (5 << 20) - 1 + writer._buffer.events = Array.from({ length: 10 }) const event = { name: 'test', value: 'a'.repeat(1024) } writer.append(event)