Skip to content

Commit 8ddce65

Browse files
authored
Instrument openai async client (#14322)
1 parent c965b09 commit 8ddce65

File tree

17 files changed

+751
-223
lines changed

17 files changed

+751
-223
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.openai.v1_1.OpenAiSingletons.TELEMETRY;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.returns;
11+
12+
import com.openai.client.OpenAIClientAsync;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class OpenAiClientAsyncInstrumentation implements TypeInstrumentation {
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("com.openai.client.okhttp.OpenAIOkHttpClientAsync$Builder");
23+
}
24+
25+
@Override
26+
public void transform(TypeTransformer transformer) {
27+
transformer.applyAdviceToMethod(
28+
named("build").and(returns(named("com.openai.client.OpenAIClientAsync"))),
29+
OpenAiClientAsyncInstrumentation.class.getName() + "$BuildAdvice");
30+
}
31+
32+
@SuppressWarnings("unused")
33+
public static class BuildAdvice {
34+
@Advice.OnMethodExit(suppress = Throwable.class)
35+
@Advice.AssignReturned.ToReturned
36+
public static OpenAIClientAsync onExit(@Advice.Return OpenAIClientAsync client) {
37+
return TELEMETRY.wrap(client);
38+
}
39+
}
40+
}

instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9-
import static java.util.Collections.singletonList;
9+
import static java.util.Arrays.asList;
1010

1111
import com.google.auto.service.AutoService;
1212
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -27,6 +27,6 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727

2828
@Override
2929
public List<TypeInstrumentation> typeInstrumentations() {
30-
return singletonList(new OpenAiClientInstrumentation());
30+
return asList(new OpenAiClientInstrumentation(), new OpenAiClientAsyncInstrumentation());
3131
}
3232
}

instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import com.openai.client.OpenAIClient;
9+
import com.openai.client.OpenAIClientAsync;
910
import io.opentelemetry.instrumentation.openai.v1_1.AbstractChatTest;
1011
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1112
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -31,6 +32,11 @@ protected OpenAIClient wrap(OpenAIClient client) {
3132
return client;
3233
}
3334

35+
@Override
36+
protected OpenAIClientAsync wrap(OpenAIClientAsync client) {
37+
return client;
38+
}
39+
3440
@Override
3541
protected final List<Consumer<SpanDataAssert>> maybeWithTransportSpan(
3642
Consumer<SpanDataAssert> span) {

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.stream.Collectors;
32-
import javax.annotation.Nullable;
3332

3433
final class ChatCompletionEventsHelper {
3534

3635
private static final AttributeKey<String> EVENT_NAME = stringKey("event.name");
3736

3837
public static void emitPromptLogEvents(
39-
Logger eventLogger, ChatCompletionCreateParams request, boolean captureMessageContent) {
38+
Context context,
39+
Logger eventLogger,
40+
ChatCompletionCreateParams request,
41+
boolean captureMessageContent) {
4042
for (ChatCompletionMessageParam msg : request.messages()) {
4143
String eventType;
4244
Map<String, Value<?>> body = new HashMap<>();
@@ -84,7 +86,7 @@ public static void emitPromptLogEvents(
8486
} else {
8587
continue;
8688
}
87-
newEvent(eventLogger, eventType).setBody(Value.of(body)).emit();
89+
newEvent(eventLogger, eventType).setContext(context).setBody(Value.of(body)).emit();
8890
}
8991
}
9092

@@ -160,7 +162,10 @@ private static String joinContentParts(List<ChatCompletionContentPartText> conte
160162
}
161163

162164
public static void emitCompletionLogEvents(
163-
Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) {
165+
Context context,
166+
Logger eventLogger,
167+
ChatCompletion completion,
168+
boolean captureMessageContent) {
164169
for (ChatCompletion.Choice choice : completion.choices()) {
165170
ChatCompletionMessage choiceMsg = choice.message();
166171
Map<String, Value<?>> message = new HashMap<>();
@@ -179,25 +184,25 @@ public static void emitCompletionLogEvents(
179184
.collect(Collectors.toList())));
180185
});
181186
emitCompletionLogEvent(
182-
eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message), null);
187+
context,
188+
eventLogger,
189+
choice.index(),
190+
choice.finishReason().toString(),
191+
Value.of(message));
183192
}
184193
}
185194

186195
public static void emitCompletionLogEvent(
196+
Context context,
187197
Logger eventLogger,
188198
long index,
189199
String finishReason,
190-
Value<?> eventMessageObject,
191-
@Nullable Context contextOverride) {
200+
Value<?> eventMessageObject) {
192201
Map<String, Value<?>> body = new HashMap<>();
193202
body.put("finish_reason", Value.of(finishReason));
194203
body.put("index", Value.of(index));
195204
body.put("message", eventMessageObject);
196-
LogRecordBuilder builder = newEvent(eventLogger, "gen_ai.choice").setBody(Value.of(body));
197-
if (contextOverride != null) {
198-
builder.setContext(contextOverride);
199-
}
200-
builder.emit();
205+
newEvent(eventLogger, "gen_ai.choice").setContext(context).setBody(Value.of(body)).emit();
201206
}
202207

203208
private static LogRecordBuilder newEvent(Logger eventLogger, String name) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.openai.v1_1;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
final class CompletableFutureWrapper {
13+
private CompletableFutureWrapper() {}
14+
15+
static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
16+
CompletableFuture<T> result = new CompletableFuture<>();
17+
future.whenComplete(
18+
(T value, Throwable throwable) -> {
19+
try (Scope ignored = context.makeCurrent()) {
20+
if (throwable != null) {
21+
result.completeExceptionally(throwable);
22+
} else {
23+
result.complete(value);
24+
}
25+
}
26+
});
27+
28+
return result;
29+
}
30+
}

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,65 +73,70 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
7373

7474
private ChatCompletion create(
7575
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
76-
Context parentCtx = Context.current();
77-
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
78-
return createWithLogs(chatCompletionCreateParams, requestOptions);
76+
Context parentContext = Context.current();
77+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
78+
return createWithLogs(parentContext, chatCompletionCreateParams, requestOptions);
7979
}
8080

81-
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
81+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
8282
ChatCompletion completion;
83-
try (Scope ignored = ctx.makeCurrent()) {
84-
completion = createWithLogs(chatCompletionCreateParams, requestOptions);
83+
try (Scope ignored = context.makeCurrent()) {
84+
completion = createWithLogs(context, chatCompletionCreateParams, requestOptions);
8585
} catch (Throwable t) {
86-
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
86+
instrumenter.end(context, chatCompletionCreateParams, null, t);
8787
throw t;
8888
}
8989

90-
instrumenter.end(ctx, chatCompletionCreateParams, completion, null);
90+
instrumenter.end(context, chatCompletionCreateParams, completion, null);
9191
return completion;
9292
}
9393

9494
private ChatCompletion createWithLogs(
95-
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
95+
Context context,
96+
ChatCompletionCreateParams chatCompletionCreateParams,
97+
RequestOptions requestOptions) {
9698
ChatCompletionEventsHelper.emitPromptLogEvents(
97-
eventLogger, chatCompletionCreateParams, captureMessageContent);
99+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
98100
ChatCompletion result = delegate.create(chatCompletionCreateParams, requestOptions);
99-
ChatCompletionEventsHelper.emitCompletionLogEvents(eventLogger, result, captureMessageContent);
101+
ChatCompletionEventsHelper.emitCompletionLogEvents(
102+
context, eventLogger, result, captureMessageContent);
100103
return result;
101104
}
102105

103106
private StreamResponse<ChatCompletionChunk> createStreaming(
104107
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
105-
Context parentCtx = Context.current();
106-
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
107-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, parentCtx, false);
108+
Context parentContext = Context.current();
109+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
110+
return createStreamingWithLogs(
111+
parentContext, chatCompletionCreateParams, requestOptions, false);
108112
}
109113

110-
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
111-
try (Scope ignored = ctx.makeCurrent()) {
112-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, ctx, true);
114+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
115+
try (Scope ignored = context.makeCurrent()) {
116+
return createStreamingWithLogs(context, chatCompletionCreateParams, requestOptions, true);
113117
} catch (Throwable t) {
114-
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
118+
instrumenter.end(context, chatCompletionCreateParams, null, t);
115119
throw t;
116120
}
117121
}
118122

119123
private StreamResponse<ChatCompletionChunk> createStreamingWithLogs(
124+
Context context,
120125
ChatCompletionCreateParams chatCompletionCreateParams,
121126
RequestOptions requestOptions,
122-
Context parentCtx,
123127
boolean newSpan) {
124128
ChatCompletionEventsHelper.emitPromptLogEvents(
125-
eventLogger, chatCompletionCreateParams, captureMessageContent);
129+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
126130
StreamResponse<ChatCompletionChunk> result =
127131
delegate.createStreaming(chatCompletionCreateParams, requestOptions);
128132
return new TracingStreamedResponse(
129133
result,
130-
parentCtx,
131-
chatCompletionCreateParams,
132-
instrumenter,
133-
eventLogger,
134-
captureMessageContent,
135-
newSpan);
134+
new StreamListener(
135+
context,
136+
chatCompletionCreateParams,
137+
instrumenter,
138+
eventLogger,
139+
captureMessageContent,
140+
newSpan));
136141
}
137142
}

0 commit comments

Comments
 (0)