Skip to content

Commit ab554a7

Browse files
authored
chore(openai): refactor streamed response handling to the tracing plugin (#6107)
* refactor to tracing plugin
1 parent f8434e9 commit ab554a7

File tree

3 files changed

+166
-114
lines changed

3 files changed

+166
-114
lines changed

packages/datadog-instrumentations/src/openai.js

Lines changed: 13 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const shimmer = require('../../datadog-shimmer')
55

66
const dc = require('dc-polyfill')
77
const ch = dc.tracingChannel('apm:openai:request')
8+
const onStreamedChunkCh = dc.channel('apm:openai:request:chunk')
89

910
const V4_PACKAGE_SHIMS = [
1011
{
@@ -160,119 +161,24 @@ addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0 <4'] }, expor
160161
return exports
161162
})
162163

163-
function addStreamedChunk (content, chunk) {
164-
content.usage = chunk.usage // add usage if it was specified to be returned
165-
for (const choice of chunk.choices) {
166-
const choiceIdx = choice.index
167-
const oldChoice = content.choices.find(choice => choice?.index === choiceIdx)
168-
if (oldChoice) {
169-
if (!oldChoice.finish_reason) {
170-
oldChoice.finish_reason = choice.finish_reason
171-
}
172-
173-
// delta exists on chat completions
174-
const delta = choice.delta
175-
176-
if (delta) {
177-
const content = delta.content
178-
if (content) {
179-
if (oldChoice.delta.content) { // we don't want to append to undefined
180-
oldChoice.delta.content += content
181-
} else {
182-
oldChoice.delta.content = content
183-
}
184-
}
185-
} else {
186-
const text = choice.text
187-
if (text) {
188-
if (oldChoice.text) {
189-
oldChoice.text += text
190-
} else {
191-
oldChoice.text = text
192-
}
193-
}
194-
}
195-
196-
// tools only exist on chat completions
197-
const tools = delta && choice.delta.tool_calls
198-
199-
if (tools) {
200-
oldChoice.delta.tool_calls = tools.map((newTool, toolIdx) => {
201-
const oldTool = oldChoice.delta.tool_calls?.[toolIdx]
202-
203-
if (oldTool) {
204-
oldTool.function.arguments += newTool.function.arguments
205-
return oldTool
206-
}
207-
208-
return newTool
209-
})
210-
}
211-
} else {
212-
// we don't know which choices arrive in which order
213-
content.choices[choiceIdx] = choice
214-
}
215-
}
216-
}
217-
218-
function convertBufferstoObjects (chunks) {
219-
return Buffer
220-
.concat(chunks) // combine the buffers
221-
.toString() // stringify
222-
.split(/(?=data:)/) // split on "data:"
223-
.map(chunk => chunk.replaceAll('\n', '').slice(6)) // remove newlines and 'data: ' from the front
224-
.slice(0, -1) // remove the last [DONE] message
225-
.map(JSON.parse) // parse all of the returned objects
226-
}
227-
228164
/**
229165
* For streamed responses, we need to accumulate all of the content in
230166
* the chunks, and let the combined content be the final response.
231167
* This way, spans look the same as when not streamed.
232168
*/
233-
function wrapStreamIterator (response, options, n, ctx) {
234-
let processChunksAsBuffers = false
235-
let chunks = []
169+
function wrapStreamIterator (response, options, ctx) {
236170
return function (itr) {
237171
return function () {
238172
const iterator = itr.apply(this, arguments)
239173
shimmer.wrap(iterator, 'next', next => function () {
240174
return next.apply(this, arguments)
241175
.then(res => {
242176
const { done, value: chunk } = res
243-
244-
if (chunk) {
245-
chunks.push(chunk)
246-
// TODO(BridgeAR): It likely depends on the options being passed
247-
// through if the stream returns buffers or not. By reading that,
248-
// we don't have to do the instanceof check anymore, which is
249-
// relatively expensive.
250-
if (chunk instanceof Buffer) {
251-
// this operation should be safe
252-
// if one chunk is a buffer (versus a plain object), the rest should be as well
253-
processChunksAsBuffers = true
254-
}
255-
}
177+
onStreamedChunkCh.publish({ ctx, chunk, done })
256178

257179
if (done) {
258-
let body = {}
259-
if (processChunksAsBuffers) {
260-
chunks = convertBufferstoObjects(chunks)
261-
}
262-
263-
if (chunks.length) {
264-
// Define the initial body having all the content outside of choices from the first chunk
265-
// this will include import data like created, id, model, etc.
266-
body = { ...chunks[0], choices: Array.from({ length: n }) }
267-
// Start from the first chunk, and add its choices into the body
268-
for (const chunk_ of chunks) {
269-
addStreamedChunk(body, chunk_)
270-
}
271-
}
272-
273180
finish(ctx, {
274181
headers: response.headers,
275-
data: body,
276182
request: {
277183
path: response.url,
278184
method: options.method
@@ -312,17 +218,6 @@ for (const extension of extensions) {
312218
// chat.completions and completions
313219
const stream = streamedResponse && getOption(arguments, 'stream', false)
314220

315-
// we need to compute how many prompts we are sending in streamed cases for completions
316-
// not applicable for chat completiond
317-
let n
318-
if (stream) {
319-
n = getOption(arguments, 'n', 1)
320-
const prompt = getOption(arguments, 'prompt')
321-
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
322-
n *= prompt.length
323-
}
324-
}
325-
326221
const client = this._client || this.client
327222

328223
const ctx = {
@@ -348,7 +243,7 @@ for (const extension of extensions) {
348243
const parsedPromise = origApiPromParse.apply(this, arguments)
349244
.then(body => Promise.all([this.responsePromise, body]))
350245

351-
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
246+
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream)
352247
})
353248

354249
return unwrappedPromise
@@ -361,7 +256,7 @@ for (const extension of extensions) {
361256
const parsedPromise = origApiPromParse.apply(this, arguments)
362257
.then(body => Promise.all([this.responsePromise, body]))
363258

364-
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream, n)
259+
return handleUnwrappedAPIPromise(parsedPromise, ctx, stream)
365260
})
366261

367262
ch.end.publish(ctx)
@@ -375,15 +270,15 @@ for (const extension of extensions) {
375270
}
376271
}
377272

378-
function handleUnwrappedAPIPromise (apiProm, ctx, stream, n) {
273+
function handleUnwrappedAPIPromise (apiProm, ctx, stream) {
379274
return apiProm
380275
.then(([{ response, options }, body]) => {
381276
if (stream) {
382277
if (body.iterator) {
383-
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, n, ctx))
278+
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options, ctx))
384279
} else {
385280
shimmer.wrap(
386-
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, n, ctx)
281+
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options, ctx)
387282
)
388283
}
389284
} else {
@@ -412,7 +307,11 @@ function finish (ctx, response, error) {
412307
ch.error.publish(ctx)
413308
}
414309

415-
ctx.result = response
310+
// for successful streamed responses, we've already set the result on ctx.body,
311+
// so we don't want to override it here
312+
ctx.result ??= {}
313+
Object.assign(ctx.result, response)
314+
416315
ch.asyncEnd.publish(ctx)
417316
}
418317

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
'use strict'
2+
3+
/**
4+
* Combines legacy OpenAI streamed chunks into a single object.
5+
* These legacy chunks are returned as buffers instead of individual objects.
6+
* @param {readonly Uint8Array[]} chunks
7+
* @returns {Array<Record<string, any>>}
8+
*/
9+
function convertBuffersToObjects (chunks) {
10+
return Buffer
11+
.concat(chunks) // combine the buffers
12+
.toString() // stringify
13+
.split(/(?=data:)/) // split on "data:"
14+
.map(chunk => chunk.replaceAll('\n', '').slice(6)) // remove newlines and 'data: ' from the front
15+
.slice(0, -1) // remove the last [DONE] message
16+
.map(JSON.parse) // parse all of the returned objects
17+
}
18+
19+
/**
20+
* Constructs the entire response from a stream of OpenAI completion chunks,
21+
* mainly combining the text choices of each chunk into a single string per choice.
22+
* @param {Array<Record<string, any>>} chunks
23+
* @param {number} n the number of choices to expect in the response
24+
* @returns {Record<string, any>}
25+
*/
26+
function constructCompletionResponseFromStreamedChunks (chunks, n) {
27+
const body = { ...chunks[0], choices: Array.from({ length: n }) }
28+
29+
for (const chunk of chunks) {
30+
body.usage = chunk.usage
31+
for (const choice of chunk.choices) {
32+
const choiceIdx = choice.index
33+
const oldChoice = body.choices.find(choice => choice?.index === choiceIdx)
34+
if (oldChoice) {
35+
if (!oldChoice.finish_reason) {
36+
oldChoice.finish_reason = choice.finish_reason
37+
}
38+
39+
const text = choice.text
40+
if (text) {
41+
if (oldChoice.text) {
42+
oldChoice.text += text
43+
} else {
44+
oldChoice.text = text
45+
}
46+
}
47+
} else {
48+
body.choices[choiceIdx] = choice
49+
}
50+
}
51+
}
52+
53+
return body
54+
}
55+
56+
/**
57+
* Constructs the entire response from a stream of OpenAI chat completion chunks,
58+
* mainly combining the text choices of each chunk into a single string per choice.
59+
* @param {Array<Record<string, any>>} chunks
60+
* @param {number} n the number of choices to expect in the response
61+
* @returns {Record<string, any>}
62+
*/
63+
function constructChatCompletionResponseFromStreamedChunks (chunks, n) {
64+
const body = { ...chunks[0], choices: Array.from({ length: n }) }
65+
66+
for (const chunk of chunks) {
67+
body.usage = chunk.usage
68+
for (const choice of chunk.choices) {
69+
const choiceIdx = choice.index
70+
const oldChoice = body.choices.find(choice => choice?.index === choiceIdx)
71+
if (oldChoice) {
72+
if (!oldChoice.finish_reason) {
73+
oldChoice.finish_reason = choice.finish_reason
74+
}
75+
76+
const delta = choice.delta
77+
if (!delta) continue
78+
79+
const content = delta.content
80+
if (content) {
81+
if (oldChoice.delta.content) {
82+
oldChoice.delta.content += content
83+
} else {
84+
oldChoice.delta.content = content
85+
}
86+
}
87+
88+
const tools = choice.delta.tool_calls
89+
if (!tools) continue
90+
91+
oldChoice.delta.tool_calls = tools.map((newTool, toolIdx) => {
92+
const oldTool = oldChoice.delta.tool_calls?.[toolIdx]
93+
if (oldTool) {
94+
oldTool.function.arguments += newTool.function.arguments
95+
return oldTool
96+
}
97+
98+
return newTool
99+
})
100+
} else {
101+
body.choices[choiceIdx] = choice
102+
}
103+
}
104+
}
105+
106+
return body
107+
}
108+
109+
module.exports = {
110+
convertBuffersToObjects,
111+
constructCompletionResponseFromStreamedChunks,
112+
constructChatCompletionResponseFromStreamedChunks
113+
}

packages/datadog-plugin-openai/src/tracing.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ const { MEASURED } = require('../../../ext/tags')
1010
const { estimateTokens } = require('./token-estimator')
1111

1212
const makeUtilities = require('../../dd-trace/src/plugins/util/llm')
13+
const {
14+
convertBuffersToObjects,
15+
constructCompletionResponseFromStreamedChunks,
16+
constructChatCompletionResponseFromStreamedChunks
17+
} = require('./stream-helpers')
1318

1419
let normalize
1520

@@ -48,6 +53,41 @@ class OpenAiTracingPlugin extends TracingPlugin {
4853

4954
normalize = utilities.normalize
5055
}
56+
57+
this.addSub('apm:openai:request:chunk', ({ ctx, chunk, done }) => {
58+
if (!ctx.chunks) ctx.chunks = []
59+
60+
if (chunk) ctx.chunks.push(chunk)
61+
if (!done) return
62+
63+
let chunks = ctx.chunks
64+
if (chunks.length === 0) return
65+
66+
const firstChunk = chunks[0]
67+
// TODO(BridgeAR): It likely depends on the options being passed
68+
// through if the stream returns buffers or not. By reading that,
69+
// we don't have to do the instanceof check anymore, which is
70+
// relatively expensive.
71+
if (firstChunk instanceof Buffer) {
72+
chunks = convertBuffersToObjects(chunks)
73+
}
74+
75+
const methodName = ctx.currentStore.normalizedMethodName
76+
let n = 1
77+
const prompt = ctx.args[0].prompt
78+
if (Array.isArray(prompt) && typeof prompt[0] !== 'number') {
79+
n *= prompt.length
80+
}
81+
82+
let response = {}
83+
if (methodName === 'createCompletion') {
84+
response = constructCompletionResponseFromStreamedChunks(chunks, n)
85+
} else if (methodName === 'createChatCompletion') {
86+
response = constructChatCompletionResponseFromStreamedChunks(chunks, n)
87+
}
88+
89+
ctx.result = { data: response }
90+
})
5191
}
5292

5393
configure (config) {

0 commit comments

Comments
 (0)