Skip to content

Commit 085fa4d

Browse files
committed
Instrument openai async client
1 parent 3e1647c commit 085fa4d

File tree

16 files changed

+719
-211
lines changed

16 files changed

+719
-211
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.openai.v1_1.OpenAiSingletons.TELEMETRY;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.returns;
11+
12+
import com.openai.client.OpenAIClientAsync;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class OpenAiClientAsyncInstrumentation implements TypeInstrumentation {
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("com.openai.client.okhttp.OpenAIOkHttpClientAsync$Builder");
23+
}
24+
25+
@Override
26+
public void transform(TypeTransformer transformer) {
27+
transformer.applyAdviceToMethod(
28+
named("build").and(returns(named("com.openai.client.OpenAIClientAsync"))),
29+
OpenAiClientAsyncInstrumentation.class.getName() + "$BuildAdvice");
30+
}
31+
32+
@SuppressWarnings("unused")
33+
public static class BuildAdvice {
34+
@Advice.OnMethodExit(suppress = Throwable.class)
35+
@Advice.AssignReturned.ToReturned
36+
public static OpenAIClientAsync onExit(@Advice.Return OpenAIClientAsync client) {
37+
return TELEMETRY.wrap(client);
38+
}
39+
}
40+
}

instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9-
import static java.util.Collections.singletonList;
9+
import static java.util.Arrays.asList;
1010

1111
import com.google.auto.service.AutoService;
1212
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -27,6 +27,6 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727

2828
@Override
2929
public List<TypeInstrumentation> typeInstrumentations() {
30-
return singletonList(new OpenAiClientInstrumentation());
30+
return asList(new OpenAiClientInstrumentation(), new OpenAiClientAsyncInstrumentation());
3131
}
3232
}

instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import com.openai.client.OpenAIClient;
9+
import com.openai.client.OpenAIClientAsync;
910
import io.opentelemetry.instrumentation.openai.v1_1.AbstractChatTest;
1011
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1112
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -31,6 +32,11 @@ protected OpenAIClient wrap(OpenAIClient client) {
3132
return client;
3233
}
3334

35+
@Override
36+
protected OpenAIClientAsync wrap(OpenAIClientAsync client) {
37+
return client;
38+
}
39+
3440
@Override
3541
protected final List<Consumer<SpanDataAssert>> maybeWithTransportSpan(
3642
Consumer<SpanDataAssert> span) {

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.stream.Collectors;
32-
import javax.annotation.Nullable;
3332

3433
final class ChatCompletionEventsHelper {
3534

3635
private static final AttributeKey<String> EVENT_NAME = stringKey("event.name");
3736

3837
public static void emitPromptLogEvents(
39-
Logger eventLogger, ChatCompletionCreateParams request, boolean captureMessageContent) {
38+
Context ctx,
39+
Logger eventLogger,
40+
ChatCompletionCreateParams request,
41+
boolean captureMessageContent) {
4042
for (ChatCompletionMessageParam msg : request.messages()) {
4143
String eventType;
4244
Map<String, Value<?>> body = new HashMap<>();
@@ -84,7 +86,7 @@ public static void emitPromptLogEvents(
8486
} else {
8587
continue;
8688
}
87-
newEvent(eventLogger, eventType).setBody(Value.of(body)).emit();
89+
newEvent(eventLogger, eventType).setContext(ctx).setBody(Value.of(body)).emit();
8890
}
8991
}
9092

@@ -160,7 +162,7 @@ private static String joinContentParts(List<ChatCompletionContentPartText> conte
160162
}
161163

162164
public static void emitCompletionLogEvents(
163-
Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) {
165+
Context ctx, Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) {
164166
for (ChatCompletion.Choice choice : completion.choices()) {
165167
ChatCompletionMessage choiceMsg = choice.message();
166168
Map<String, Value<?>> message = new HashMap<>();
@@ -179,25 +181,21 @@ public static void emitCompletionLogEvents(
179181
.collect(Collectors.toList())));
180182
});
181183
emitCompletionLogEvent(
182-
eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message), null);
184+
ctx, eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message));
183185
}
184186
}
185187

186188
public static void emitCompletionLogEvent(
189+
Context ctx,
187190
Logger eventLogger,
188191
long index,
189192
String finishReason,
190-
Value<?> eventMessageObject,
191-
@Nullable Context contextOverride) {
193+
Value<?> eventMessageObject) {
192194
Map<String, Value<?>> body = new HashMap<>();
193195
body.put("finish_reason", Value.of(finishReason));
194196
body.put("index", Value.of(index));
195197
body.put("message", eventMessageObject);
196-
LogRecordBuilder builder = newEvent(eventLogger, "gen_ai.choice").setBody(Value.of(body));
197-
if (contextOverride != null) {
198-
builder.setContext(contextOverride);
199-
}
200-
builder.emit();
198+
newEvent(eventLogger, "gen_ai.choice").setContext(ctx).setBody(Value.of(body)).emit();
201199
}
202200

203201
private static LogRecordBuilder newEvent(Logger eventLogger, String name) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.openai.v1_1;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
final class CompletableFutureWrapper {
13+
private CompletableFutureWrapper() {}
14+
15+
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
16+
CompletableFuture<T> result = new CompletableFuture<>();
17+
future.whenComplete(
18+
(T value, Throwable throwable) -> {
19+
try (Scope ignored = context.makeCurrent()) {
20+
if (throwable != null) {
21+
result.completeExceptionally(throwable);
22+
} else {
23+
result.complete(value);
24+
}
25+
}
26+
});
27+
28+
return result;
29+
}
30+
}

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ private ChatCompletion create(
7575
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
7676
Context parentCtx = Context.current();
7777
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
78-
return createWithLogs(chatCompletionCreateParams, requestOptions);
78+
return createWithLogs(parentCtx, chatCompletionCreateParams, requestOptions);
7979
}
8080

8181
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
8282
ChatCompletion completion;
8383
try (Scope ignored = ctx.makeCurrent()) {
84-
completion = createWithLogs(chatCompletionCreateParams, requestOptions);
84+
completion = createWithLogs(ctx, chatCompletionCreateParams, requestOptions);
8585
} catch (Throwable t) {
8686
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
8787
throw t;
@@ -92,46 +92,50 @@ private ChatCompletion create(
9292
}
9393

9494
private ChatCompletion createWithLogs(
95-
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
95+
Context ctx,
96+
ChatCompletionCreateParams chatCompletionCreateParams,
97+
RequestOptions requestOptions) {
9698
ChatCompletionEventsHelper.emitPromptLogEvents(
97-
eventLogger, chatCompletionCreateParams, captureMessageContent);
99+
ctx, eventLogger, chatCompletionCreateParams, captureMessageContent);
98100
ChatCompletion result = delegate.create(chatCompletionCreateParams, requestOptions);
99-
ChatCompletionEventsHelper.emitCompletionLogEvents(eventLogger, result, captureMessageContent);
101+
ChatCompletionEventsHelper.emitCompletionLogEvents(
102+
ctx, eventLogger, result, captureMessageContent);
100103
return result;
101104
}
102105

103106
private StreamResponse<ChatCompletionChunk> createStreaming(
104107
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
105108
Context parentCtx = Context.current();
106109
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
107-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, parentCtx, false);
110+
return createStreamingWithLogs(parentCtx, chatCompletionCreateParams, requestOptions, false);
108111
}
109112

110113
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
111114
try (Scope ignored = ctx.makeCurrent()) {
112-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, ctx, true);
115+
return createStreamingWithLogs(ctx, chatCompletionCreateParams, requestOptions, true);
113116
} catch (Throwable t) {
114117
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
115118
throw t;
116119
}
117120
}
118121

119122
private StreamResponse<ChatCompletionChunk> createStreamingWithLogs(
123+
Context ctx,
120124
ChatCompletionCreateParams chatCompletionCreateParams,
121125
RequestOptions requestOptions,
122-
Context parentCtx,
123126
boolean newSpan) {
124127
ChatCompletionEventsHelper.emitPromptLogEvents(
125-
eventLogger, chatCompletionCreateParams, captureMessageContent);
128+
ctx, eventLogger, chatCompletionCreateParams, captureMessageContent);
126129
StreamResponse<ChatCompletionChunk> result =
127130
delegate.createStreaming(chatCompletionCreateParams, requestOptions);
128131
return new TracingStreamedResponse(
129132
result,
130-
parentCtx,
131-
chatCompletionCreateParams,
132-
instrumenter,
133-
eventLogger,
134-
captureMessageContent,
135-
newSpan);
133+
new StreamListener(
134+
ctx,
135+
chatCompletionCreateParams,
136+
instrumenter,
137+
eventLogger,
138+
captureMessageContent,
139+
newSpan));
136140
}
137141
}

0 commit comments

Comments
 (0)