Skip to content

Commit 6dd06e5

Browse files
authored
Instrument streaming chat for openai SDK (#14271)
1 parent bc1c384 commit 6dd06e5

File tree

9 files changed

+2617
-9
lines changed

9 files changed

+2617
-9
lines changed

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
package io.opentelemetry.instrumentation.openai.v1_1;
77

88
import com.openai.core.RequestOptions;
9+
import com.openai.core.http.StreamResponse;
910
import com.openai.models.chat.completions.ChatCompletion;
11+
import com.openai.models.chat.completions.ChatCompletionChunk;
1012
import com.openai.models.chat.completions.ChatCompletionCreateParams;
1113
import com.openai.services.blocking.chat.ChatCompletionService;
1214
import io.opentelemetry.api.logs.Logger;
@@ -43,22 +45,34 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
4345
String methodName = method.getName();
4446
Class<?>[] parameterTypes = method.getParameterTypes();
4547

46-
if (methodName.equals("create")
47-
&& parameterTypes.length >= 1
48-
&& parameterTypes[0] == ChatCompletionCreateParams.class) {
49-
if (parameterTypes.length == 1) {
50-
return create((ChatCompletionCreateParams) args[0], RequestOptions.none());
51-
} else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) {
52-
return create((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]);
53-
}
48+
switch (methodName) {
49+
case "create":
50+
if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) {
51+
if (parameterTypes.length == 1) {
52+
return create((ChatCompletionCreateParams) args[0], RequestOptions.none());
53+
} else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) {
54+
return create((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]);
55+
}
56+
}
57+
break;
58+
case "createStreaming":
59+
if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) {
60+
if (parameterTypes.length == 1) {
61+
return createStreaming((ChatCompletionCreateParams) args[0], RequestOptions.none());
62+
} else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) {
63+
return createStreaming((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]);
64+
}
65+
}
66+
break;
67+
default:
68+
// fallthrough
5469
}
5570

5671
return super.invoke(proxy, method, args);
5772
}
5873

5974
private ChatCompletion create(
6075
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
61-
6276
Context parentCtx = Context.current();
6377
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
6478
return createWithLogs(chatCompletionCreateParams, requestOptions);
@@ -85,4 +99,39 @@ private ChatCompletion createWithLogs(
8599
ChatCompletionEventsHelper.emitCompletionLogEvents(eventLogger, result, captureMessageContent);
86100
return result;
87101
}
102+
103+
private StreamResponse<ChatCompletionChunk> createStreaming(
104+
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
105+
Context parentCtx = Context.current();
106+
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
107+
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, parentCtx, false);
108+
}
109+
110+
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
111+
try (Scope ignored = ctx.makeCurrent()) {
112+
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, ctx, true);
113+
} catch (Throwable t) {
114+
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
115+
throw t;
116+
}
117+
}
118+
119+
private StreamResponse<ChatCompletionChunk> createStreamingWithLogs(
120+
ChatCompletionCreateParams chatCompletionCreateParams,
121+
RequestOptions requestOptions,
122+
Context parentCtx,
123+
boolean newSpan) {
124+
ChatCompletionEventsHelper.emitPromptLogEvents(
125+
eventLogger, chatCompletionCreateParams, captureMessageContent);
126+
StreamResponse<ChatCompletionChunk> result =
127+
delegate.createStreaming(chatCompletionCreateParams, requestOptions);
128+
return new TracingStreamedResponse(
129+
result,
130+
parentCtx,
131+
chatCompletionCreateParams,
132+
instrumenter,
133+
eventLogger,
134+
captureMessageContent,
135+
newSpan);
136+
}
88137
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.openai.v1_1;
7+
8+
import com.openai.core.JsonField;
9+
import com.openai.models.chat.completions.ChatCompletion;
10+
import com.openai.models.chat.completions.ChatCompletionChunk;
11+
import com.openai.models.chat.completions.ChatCompletionMessage;
12+
import io.opentelemetry.api.common.Value;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Optional;
17+
import java.util.stream.Collectors;
18+
import javax.annotation.Nullable;
19+
20+
final class StreamedMessageBuffer {
21+
private final long index;
22+
private final boolean captureMessageContent;
23+
24+
@Nullable String finishReason;
25+
26+
@Nullable private StringBuilder message;
27+
@Nullable private Map<Long, ToolCallBuffer> toolCalls;
28+
29+
StreamedMessageBuffer(long index, boolean captureMessageContent) {
30+
this.index = index;
31+
this.captureMessageContent = captureMessageContent;
32+
}
33+
34+
ChatCompletion.Choice toChoice() {
35+
ChatCompletion.Choice.Builder choice =
36+
ChatCompletion.Choice.builder().index(index).logprobs(Optional.empty());
37+
if (finishReason != null) {
38+
choice.finishReason(ChatCompletion.Choice.FinishReason.of(finishReason));
39+
} else {
40+
// Can't happen in practice, mostly to satisfy null check
41+
choice.finishReason(JsonField.ofNullable(null));
42+
}
43+
if (message != null) {
44+
choice.message(
45+
ChatCompletionMessage.builder()
46+
.content(message.toString())
47+
.refusal(Optional.empty())
48+
.build());
49+
} else {
50+
choice.message(JsonField.ofNullable(null));
51+
}
52+
return choice.build();
53+
}
54+
55+
Value<?> toEventBody() {
56+
Map<String, Value<?>> body = new HashMap<>();
57+
if (message != null) {
58+
body.put("content", Value.of(message.toString()));
59+
}
60+
if (toolCalls != null) {
61+
List<Value<?>> toolCallsJson =
62+
toolCalls.values().stream()
63+
.map(StreamedMessageBuffer::buildToolCallEventObject)
64+
.collect(Collectors.toList());
65+
body.put("tool_calls", Value.of(toolCallsJson));
66+
}
67+
return Value.of(body);
68+
}
69+
70+
void append(ChatCompletionChunk.Choice.Delta delta) {
71+
if (captureMessageContent) {
72+
if (delta.content().isPresent()) {
73+
if (message == null) {
74+
message = new StringBuilder();
75+
}
76+
message.append(delta.content().get());
77+
}
78+
}
79+
80+
if (delta.toolCalls().isPresent()) {
81+
if (toolCalls == null) {
82+
toolCalls = new HashMap<>();
83+
}
84+
85+
for (ChatCompletionChunk.Choice.Delta.ToolCall toolCall : delta.toolCalls().get()) {
86+
ToolCallBuffer buffer =
87+
toolCalls.computeIfAbsent(
88+
toolCall.index(), unused -> new ToolCallBuffer(toolCall.id().orElse("")));
89+
toolCall.type().ifPresent(type -> buffer.type = type.toString());
90+
toolCall
91+
.function()
92+
.ifPresent(
93+
function -> {
94+
function.name().ifPresent(name -> buffer.function.name = name);
95+
if (captureMessageContent) {
96+
function
97+
.arguments()
98+
.ifPresent(
99+
args -> {
100+
if (buffer.function.arguments == null) {
101+
buffer.function.arguments = new StringBuilder();
102+
}
103+
buffer.function.arguments.append(args);
104+
});
105+
}
106+
});
107+
}
108+
}
109+
}
110+
111+
private static Value<?> buildToolCallEventObject(ToolCallBuffer call) {
112+
Map<String, Value<?>> result = new HashMap<>();
113+
result.put("id", Value.of(call.id));
114+
if (call.type != null) {
115+
result.put("type", Value.of(call.type));
116+
}
117+
118+
Map<String, Value<?>> function = new HashMap<>();
119+
if (call.function.name != null) {
120+
function.put("name", Value.of(call.function.name));
121+
}
122+
if (call.function.arguments != null) {
123+
function.put("arguments", Value.of(call.function.arguments.toString()));
124+
}
125+
result.put("function", Value.of(function));
126+
127+
return Value.of(result);
128+
}
129+
130+
private static class FunctionBuffer {
131+
@Nullable String name;
132+
@Nullable StringBuilder arguments;
133+
}
134+
135+
private static class ToolCallBuffer {
136+
final String id;
137+
final FunctionBuffer function = new FunctionBuffer();
138+
@Nullable String type;
139+
140+
ToolCallBuffer(String id) {
141+
this.id = id;
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)