Skip to content

Commit 970fb2e

Browse files
committed
Merge branch 'openai-async' into openai-embeddings
2 parents 6406945 + bfa7202 commit 970fb2e

File tree

16 files changed

+720
-223
lines changed

16 files changed

+720
-223
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.openai.v1_1.OpenAiSingletons.TELEMETRY;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.returns;
11+
12+
import com.openai.client.OpenAIClientAsync;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class OpenAiClientAsyncInstrumentation implements TypeInstrumentation {
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("com.openai.client.okhttp.OpenAIOkHttpClientAsync$Builder");
23+
}
24+
25+
@Override
26+
public void transform(TypeTransformer transformer) {
27+
transformer.applyAdviceToMethod(
28+
named("build").and(returns(named("com.openai.client.OpenAIClientAsync"))),
29+
OpenAiClientAsyncInstrumentation.class.getName() + "$BuildAdvice");
30+
}
31+
32+
@SuppressWarnings("unused")
33+
public static class BuildAdvice {
34+
@Advice.OnMethodExit(suppress = Throwable.class)
35+
@Advice.AssignReturned.ToReturned
36+
public static OpenAIClientAsync onExit(@Advice.Return OpenAIClientAsync client) {
37+
return TELEMETRY.wrap(client);
38+
}
39+
}
40+
}

instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9-
import static java.util.Collections.singletonList;
9+
import static java.util.Arrays.asList;
1010

1111
import com.google.auto.service.AutoService;
1212
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -27,6 +27,6 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
2727

2828
@Override
2929
public List<TypeInstrumentation> typeInstrumentations() {
30-
return singletonList(new OpenAiClientInstrumentation());
30+
return asList(new OpenAiClientInstrumentation(), new OpenAiClientAsyncInstrumentation());
3131
}
3232
}

instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.openai.v1_1;
77

88
import com.openai.client.OpenAIClient;
9+
import com.openai.client.OpenAIClientAsync;
910
import io.opentelemetry.instrumentation.openai.v1_1.AbstractChatTest;
1011
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
1112
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@@ -31,6 +32,11 @@ protected OpenAIClient wrap(OpenAIClient client) {
3132
return client;
3233
}
3334

35+
@Override
36+
protected OpenAIClientAsync wrap(OpenAIClientAsync client) {
37+
return client;
38+
}
39+
3440
@Override
3541
protected final List<Consumer<SpanDataAssert>> maybeWithTransportSpan(
3642
Consumer<SpanDataAssert> span) {

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@
2929
import java.util.Map;
3030
import java.util.Objects;
3131
import java.util.stream.Collectors;
32-
import javax.annotation.Nullable;
3332

3433
final class ChatCompletionEventsHelper {
3534

3635
private static final AttributeKey<String> EVENT_NAME = stringKey("event.name");
3736

3837
public static void emitPromptLogEvents(
39-
Logger eventLogger, ChatCompletionCreateParams request, boolean captureMessageContent) {
38+
Context context,
39+
Logger eventLogger,
40+
ChatCompletionCreateParams request,
41+
boolean captureMessageContent) {
4042
for (ChatCompletionMessageParam msg : request.messages()) {
4143
String eventType;
4244
Map<String, Value<?>> body = new HashMap<>();
@@ -84,7 +86,7 @@ public static void emitPromptLogEvents(
8486
} else {
8587
continue;
8688
}
87-
newEvent(eventLogger, eventType).setBody(Value.of(body)).emit();
89+
newEvent(eventLogger, eventType).setContext(context).setBody(Value.of(body)).emit();
8890
}
8991
}
9092

@@ -160,7 +162,10 @@ private static String joinContentParts(List<ChatCompletionContentPartText> conte
160162
}
161163

162164
public static void emitCompletionLogEvents(
163-
Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) {
165+
Context context,
166+
Logger eventLogger,
167+
ChatCompletion completion,
168+
boolean captureMessageContent) {
164169
for (ChatCompletion.Choice choice : completion.choices()) {
165170
ChatCompletionMessage choiceMsg = choice.message();
166171
Map<String, Value<?>> message = new HashMap<>();
@@ -179,25 +184,25 @@ public static void emitCompletionLogEvents(
179184
.collect(Collectors.toList())));
180185
});
181186
emitCompletionLogEvent(
182-
eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message), null);
187+
context,
188+
eventLogger,
189+
choice.index(),
190+
choice.finishReason().toString(),
191+
Value.of(message));
183192
}
184193
}
185194

186195
public static void emitCompletionLogEvent(
196+
Context context,
187197
Logger eventLogger,
188198
long index,
189199
String finishReason,
190-
Value<?> eventMessageObject,
191-
@Nullable Context contextOverride) {
200+
Value<?> eventMessageObject) {
192201
Map<String, Value<?>> body = new HashMap<>();
193202
body.put("finish_reason", Value.of(finishReason));
194203
body.put("index", Value.of(index));
195204
body.put("message", eventMessageObject);
196-
LogRecordBuilder builder = newEvent(eventLogger, "gen_ai.choice").setBody(Value.of(body));
197-
if (contextOverride != null) {
198-
builder.setContext(contextOverride);
199-
}
200-
builder.emit();
205+
newEvent(eventLogger, "gen_ai.choice").setContext(context).setBody(Value.of(body)).emit();
201206
}
202207

203208
private static LogRecordBuilder newEvent(Logger eventLogger, String name) {

instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -73,65 +73,70 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
7373

7474
private ChatCompletion create(
7575
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
76-
Context parentCtx = Context.current();
77-
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
78-
return createWithLogs(chatCompletionCreateParams, requestOptions);
76+
Context parentContext = Context.current();
77+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
78+
return createWithLogs(parentContext, chatCompletionCreateParams, requestOptions);
7979
}
8080

81-
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
81+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
8282
ChatCompletion completion;
83-
try (Scope ignored = ctx.makeCurrent()) {
84-
completion = createWithLogs(chatCompletionCreateParams, requestOptions);
83+
try (Scope ignored = context.makeCurrent()) {
84+
completion = createWithLogs(context, chatCompletionCreateParams, requestOptions);
8585
} catch (Throwable t) {
86-
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
86+
instrumenter.end(context, chatCompletionCreateParams, null, t);
8787
throw t;
8888
}
8989

90-
instrumenter.end(ctx, chatCompletionCreateParams, completion, null);
90+
instrumenter.end(context, chatCompletionCreateParams, completion, null);
9191
return completion;
9292
}
9393

9494
private ChatCompletion createWithLogs(
95-
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
95+
Context context,
96+
ChatCompletionCreateParams chatCompletionCreateParams,
97+
RequestOptions requestOptions) {
9698
ChatCompletionEventsHelper.emitPromptLogEvents(
97-
eventLogger, chatCompletionCreateParams, captureMessageContent);
99+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
98100
ChatCompletion result = delegate.create(chatCompletionCreateParams, requestOptions);
99-
ChatCompletionEventsHelper.emitCompletionLogEvents(eventLogger, result, captureMessageContent);
101+
ChatCompletionEventsHelper.emitCompletionLogEvents(
102+
context, eventLogger, result, captureMessageContent);
100103
return result;
101104
}
102105

103106
private StreamResponse<ChatCompletionChunk> createStreaming(
104107
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
105-
Context parentCtx = Context.current();
106-
if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) {
107-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, parentCtx, false);
108+
Context parentContext = Context.current();
109+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
110+
return createStreamingWithLogs(
111+
parentContext, chatCompletionCreateParams, requestOptions, false);
108112
}
109113

110-
Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams);
111-
try (Scope ignored = ctx.makeCurrent()) {
112-
return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, ctx, true);
114+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
115+
try (Scope ignored = context.makeCurrent()) {
116+
return createStreamingWithLogs(context, chatCompletionCreateParams, requestOptions, true);
113117
} catch (Throwable t) {
114-
instrumenter.end(ctx, chatCompletionCreateParams, null, t);
118+
instrumenter.end(context, chatCompletionCreateParams, null, t);
115119
throw t;
116120
}
117121
}
118122

119123
private StreamResponse<ChatCompletionChunk> createStreamingWithLogs(
124+
Context context,
120125
ChatCompletionCreateParams chatCompletionCreateParams,
121126
RequestOptions requestOptions,
122-
Context parentCtx,
123127
boolean newSpan) {
124128
ChatCompletionEventsHelper.emitPromptLogEvents(
125-
eventLogger, chatCompletionCreateParams, captureMessageContent);
129+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
126130
StreamResponse<ChatCompletionChunk> result =
127131
delegate.createStreaming(chatCompletionCreateParams, requestOptions);
128132
return new TracingStreamedResponse(
129133
result,
130-
parentCtx,
131-
chatCompletionCreateParams,
132-
instrumenter,
133-
eventLogger,
134-
captureMessageContent,
135-
newSpan);
134+
new StreamListener(
135+
context,
136+
chatCompletionCreateParams,
137+
instrumenter,
138+
eventLogger,
139+
captureMessageContent,
140+
newSpan));
136141
}
137142
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.openai.v1_1;
7+
8+
import com.openai.core.RequestOptions;
9+
import com.openai.core.http.AsyncStreamResponse;
10+
import com.openai.models.chat.completions.ChatCompletion;
11+
import com.openai.models.chat.completions.ChatCompletionChunk;
12+
import com.openai.models.chat.completions.ChatCompletionCreateParams;
13+
import com.openai.services.async.chat.ChatCompletionServiceAsync;
14+
import io.opentelemetry.api.logs.Logger;
15+
import io.opentelemetry.context.Context;
16+
import io.opentelemetry.context.Scope;
17+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
18+
import java.lang.reflect.Method;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
final class InstrumentedChatCompletionServiceAsync
22+
extends DelegatingInvocationHandler<
23+
ChatCompletionServiceAsync, InstrumentedChatCompletionServiceAsync> {
24+
25+
private final Instrumenter<ChatCompletionCreateParams, ChatCompletion> instrumenter;
26+
private final Logger eventLogger;
27+
private final boolean captureMessageContent;
28+
29+
InstrumentedChatCompletionServiceAsync(
30+
ChatCompletionServiceAsync delegate,
31+
Instrumenter<ChatCompletionCreateParams, ChatCompletion> instrumenter,
32+
Logger eventLogger,
33+
boolean captureMessageContent) {
34+
super(delegate);
35+
this.instrumenter = instrumenter;
36+
this.eventLogger = eventLogger;
37+
this.captureMessageContent = captureMessageContent;
38+
}
39+
40+
@Override
41+
protected Class<ChatCompletionServiceAsync> getProxyType() {
42+
return ChatCompletionServiceAsync.class;
43+
}
44+
45+
@Override
46+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
47+
String methodName = method.getName();
48+
Class<?>[] parameterTypes = method.getParameterTypes();
49+
50+
switch (methodName) {
51+
case "create":
52+
if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) {
53+
if (parameterTypes.length == 1) {
54+
return create((ChatCompletionCreateParams) args[0], RequestOptions.none());
55+
} else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) {
56+
return create((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]);
57+
}
58+
}
59+
break;
60+
case "createStreaming":
61+
if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) {
62+
if (parameterTypes.length == 1) {
63+
return createStreaming((ChatCompletionCreateParams) args[0], RequestOptions.none());
64+
} else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) {
65+
return createStreaming((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]);
66+
}
67+
}
68+
break;
69+
default:
70+
// fallthrough
71+
}
72+
73+
return super.invoke(proxy, method, args);
74+
}
75+
76+
private CompletableFuture<ChatCompletion> create(
77+
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
78+
Context parentContext = Context.current();
79+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
80+
return createWithLogs(parentContext, chatCompletionCreateParams, requestOptions);
81+
}
82+
83+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
84+
CompletableFuture<ChatCompletion> future;
85+
try (Scope ignored = context.makeCurrent()) {
86+
future = createWithLogs(context, chatCompletionCreateParams, requestOptions);
87+
} catch (Throwable t) {
88+
instrumenter.end(context, chatCompletionCreateParams, null, t);
89+
throw t;
90+
}
91+
92+
future =
93+
future.whenComplete(
94+
(res, t) -> instrumenter.end(context, chatCompletionCreateParams, res, t));
95+
return CompletableFutureWrapper.wrap(future, parentContext);
96+
}
97+
98+
private CompletableFuture<ChatCompletion> createWithLogs(
99+
Context context,
100+
ChatCompletionCreateParams chatCompletionCreateParams,
101+
RequestOptions requestOptions) {
102+
ChatCompletionEventsHelper.emitPromptLogEvents(
103+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
104+
CompletableFuture<ChatCompletion> future =
105+
delegate.create(chatCompletionCreateParams, requestOptions);
106+
future.thenAccept(
107+
r ->
108+
ChatCompletionEventsHelper.emitCompletionLogEvents(
109+
context, eventLogger, r, captureMessageContent));
110+
return future;
111+
}
112+
113+
private AsyncStreamResponse<ChatCompletionChunk> createStreaming(
114+
ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) {
115+
Context parentContext = Context.current();
116+
if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) {
117+
return createStreamingWithLogs(
118+
parentContext, chatCompletionCreateParams, requestOptions, false);
119+
}
120+
121+
Context context = instrumenter.start(parentContext, chatCompletionCreateParams);
122+
try (Scope ignored = context.makeCurrent()) {
123+
return createStreamingWithLogs(context, chatCompletionCreateParams, requestOptions, true);
124+
} catch (Throwable t) {
125+
instrumenter.end(context, chatCompletionCreateParams, null, t);
126+
throw t;
127+
}
128+
}
129+
130+
private AsyncStreamResponse<ChatCompletionChunk> createStreamingWithLogs(
131+
Context context,
132+
ChatCompletionCreateParams chatCompletionCreateParams,
133+
RequestOptions requestOptions,
134+
boolean newSpan) {
135+
ChatCompletionEventsHelper.emitPromptLogEvents(
136+
context, eventLogger, chatCompletionCreateParams, captureMessageContent);
137+
AsyncStreamResponse<ChatCompletionChunk> result =
138+
delegate.createStreaming(chatCompletionCreateParams, requestOptions);
139+
return new TracingAsyncStreamedResponse(
140+
result,
141+
new StreamListener(
142+
context,
143+
chatCompletionCreateParams,
144+
instrumenter,
145+
eventLogger,
146+
captureMessageContent,
147+
newSpan));
148+
}
149+
}

0 commit comments

Comments
 (0)