From 085fa4d86fe7bf146a351b5c714477371311c0b9 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 24 Jul 2025 17:01:52 +0900 Subject: [PATCH 1/4] Instrument openai async client --- .../OpenAiClientAsyncInstrumentation.java | 40 +++++ .../v1_1/OpenAiInstrumentationModule.java | 4 +- .../instrumentation/openai/v1_1/ChatTest.java | 6 + .../v1_1/ChatCompletionEventsHelper.java | 22 ++- .../openai/v1_1/CompletableFutureWrapper.java | 30 ++++ .../InstrumentedChatCompletionService.java | 34 ++-- ...nstrumentedChatCompletionServiceAsync.java | 147 ++++++++++++++++ .../v1_1/InstrumentedChatServiceAsync.java | 49 ++++++ .../openai/v1_1/InstrumentedOpenAiClient.java | 5 + .../v1_1/InstrumentedOpenAiClientAsync.java | 54 ++++++ .../openai/v1_1/OpenAITelemetry.java | 8 + .../openai/v1_1/StreamListener.java | 112 ++++++++++++ .../v1_1/TracingAsyncStreamedResponse.java | 70 ++++++++ .../openai/v1_1/TracingStreamedResponse.java | 107 +----------- .../instrumentation/openai/v1_1/ChatTest.java | 80 ++++----- .../openai/v1_1/AbstractChatTest.java | 162 ++++++++++++++---- 16 files changed, 719 insertions(+), 211 deletions(-) create mode 100644 instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiClientAsyncInstrumentation.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClientAsync.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java create mode 100644 instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingAsyncStreamedResponse.java diff --git a/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiClientAsyncInstrumentation.java b/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiClientAsyncInstrumentation.java new file mode 100644 index 000000000000..b37eef3c54e9 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiClientAsyncInstrumentation.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.openai.v1_1; + +import static io.opentelemetry.javaagent.instrumentation.openai.v1_1.OpenAiSingletons.TELEMETRY; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import com.openai.client.OpenAIClientAsync; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class OpenAiClientAsyncInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("com.openai.client.okhttp.OpenAIOkHttpClientAsync$Builder"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("build").and(returns(named("com.openai.client.OpenAIClientAsync"))), + OpenAiClientAsyncInstrumentation.class.getName() + "$BuildAdvice"); + } + + @SuppressWarnings("unused") + public static class BuildAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + @Advice.AssignReturned.ToReturned + public static OpenAIClientAsync onExit(@Advice.Return OpenAIClientAsync client) { + return TELEMETRY.wrap(client); + } + } +} diff --git a/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java b/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java index dc1d61d9c4f7..c2af04e5e730 100644 --- a/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java +++ b/instrumentation/openai/openai-java-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/OpenAiInstrumentationModule.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.openai.v1_1; import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; @@ -27,6 +27,6 @@ public ElementMatcher.Junction classLoaderMatcher() { @Override public List typeInstrumentations() { - return singletonList(new OpenAiClientInstrumentation()); + return asList(new OpenAiClientInstrumentation(), new OpenAiClientAsyncInstrumentation()); } } diff --git a/instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java b/instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java index e66921251783..adf82527a999 100644 --- a/instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java +++ b/instrumentation/openai/openai-java-1.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/openai/v1_1/ChatTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.openai.v1_1; import com.openai.client.OpenAIClient; +import com.openai.client.OpenAIClientAsync; import io.opentelemetry.instrumentation.openai.v1_1.AbstractChatTest; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; @@ -31,6 +32,11 @@ protected OpenAIClient wrap(OpenAIClient client) { return client; } + @Override + protected OpenAIClientAsync wrap(OpenAIClientAsync client) { + return client; + } + @Override protected final List> maybeWithTransportSpan( Consumer span) { diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java index 454fb546a3fe..0703c2cc4e0c 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java @@ -29,14 +29,16 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -import javax.annotation.Nullable; final class ChatCompletionEventsHelper { private static final AttributeKey EVENT_NAME = stringKey("event.name"); public static void emitPromptLogEvents( - Logger eventLogger, ChatCompletionCreateParams request, boolean captureMessageContent) { + Context ctx, + Logger eventLogger, + ChatCompletionCreateParams request, + boolean captureMessageContent) { for (ChatCompletionMessageParam msg : request.messages()) { String eventType; Map> body = new HashMap<>(); @@ -84,7 +86,7 @@ public static void emitPromptLogEvents( } else { continue; } - newEvent(eventLogger, eventType).setBody(Value.of(body)).emit(); + newEvent(eventLogger, eventType).setContext(ctx).setBody(Value.of(body)).emit(); } } @@ -160,7 +162,7 @@ private static String joinContentParts(List conte } public static void emitCompletionLogEvents( - Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) { + Context ctx, Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) { for (ChatCompletion.Choice choice : completion.choices()) { ChatCompletionMessage choiceMsg = choice.message(); Map> message = new HashMap<>(); @@ -179,25 +181,21 @@ public static void emitCompletionLogEvents( .collect(Collectors.toList()))); }); emitCompletionLogEvent( - eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message), null); + ctx, eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message)); } } public static void emitCompletionLogEvent( + Context ctx, Logger eventLogger, long index, String finishReason, - Value eventMessageObject, - @Nullable Context contextOverride) { + Value eventMessageObject) { Map> body = new HashMap<>(); body.put("finish_reason", Value.of(finishReason)); body.put("index", Value.of(index)); body.put("message", eventMessageObject); - LogRecordBuilder builder = newEvent(eventLogger, "gen_ai.choice").setBody(Value.of(body)); - if (contextOverride != null) { - builder.setContext(contextOverride); - } - builder.emit(); + newEvent(eventLogger, "gen_ai.choice").setContext(ctx).setBody(Value.of(body)).emit(); } private static LogRecordBuilder newEvent(Logger eventLogger, String name) { diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java new file mode 100644 index 000000000000..96b1b565daa7 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.concurrent.CompletableFuture; + +final class CompletableFutureWrapper { + private CompletableFutureWrapper() {} + + public static CompletableFuture wrap(CompletableFuture future, Context context) { + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete( + (T value, Throwable throwable) -> { + try (Scope ignored = context.makeCurrent()) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(value); + } + } + }); + + return result; + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java index a0d7b88af421..ec171203631b 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java @@ -75,13 +75,13 @@ private ChatCompletion create( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { Context parentCtx = Context.current(); if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createWithLogs(chatCompletionCreateParams, requestOptions); + return createWithLogs(parentCtx, chatCompletionCreateParams, requestOptions); } Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); ChatCompletion completion; try (Scope ignored = ctx.makeCurrent()) { - completion = createWithLogs(chatCompletionCreateParams, requestOptions); + completion = createWithLogs(ctx, chatCompletionCreateParams, requestOptions); } catch (Throwable t) { instrumenter.end(ctx, chatCompletionCreateParams, null, t); throw t; @@ -92,11 +92,14 @@ private ChatCompletion create( } private ChatCompletion createWithLogs( - ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { + Context ctx, + ChatCompletionCreateParams chatCompletionCreateParams, + RequestOptions requestOptions) { ChatCompletionEventsHelper.emitPromptLogEvents( - eventLogger, chatCompletionCreateParams, captureMessageContent); + ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); ChatCompletion result = delegate.create(chatCompletionCreateParams, requestOptions); - ChatCompletionEventsHelper.emitCompletionLogEvents(eventLogger, result, captureMessageContent); + ChatCompletionEventsHelper.emitCompletionLogEvents( + ctx, eventLogger, result, captureMessageContent); return result; } @@ -104,12 +107,12 @@ private StreamResponse createStreaming( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { Context parentCtx = Context.current(); if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, parentCtx, false); + return createStreamingWithLogs(parentCtx, chatCompletionCreateParams, requestOptions, false); } Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); try (Scope ignored = ctx.makeCurrent()) { - return createStreamingWithLogs(chatCompletionCreateParams, requestOptions, ctx, true); + return createStreamingWithLogs(ctx, chatCompletionCreateParams, requestOptions, true); } catch (Throwable t) { instrumenter.end(ctx, chatCompletionCreateParams, null, t); throw t; @@ -117,21 +120,22 @@ private StreamResponse createStreaming( } private StreamResponse createStreamingWithLogs( + Context ctx, ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions, - Context parentCtx, boolean newSpan) { ChatCompletionEventsHelper.emitPromptLogEvents( - eventLogger, chatCompletionCreateParams, captureMessageContent); + ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); StreamResponse result = delegate.createStreaming(chatCompletionCreateParams, requestOptions); return new TracingStreamedResponse( result, - parentCtx, - chatCompletionCreateParams, - instrumenter, - eventLogger, - captureMessageContent, - newSpan); + new StreamListener( + ctx, + chatCompletionCreateParams, + instrumenter, + eventLogger, + captureMessageContent, + newSpan)); } } diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java new file mode 100644 index 000000000000..f8e8d88903a7 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java @@ -0,0 +1,147 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import com.openai.core.RequestOptions; +import com.openai.core.http.AsyncStreamResponse; +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionChunk; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.services.async.chat.ChatCompletionServiceAsync; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.lang.reflect.Method; +import java.util.concurrent.CompletableFuture; + +final class InstrumentedChatCompletionServiceAsync + extends DelegatingInvocationHandler< + ChatCompletionServiceAsync, InstrumentedChatCompletionServiceAsync> { + + private final Instrumenter instrumenter; + private final Logger eventLogger; + private final boolean captureMessageContent; + + InstrumentedChatCompletionServiceAsync( + ChatCompletionServiceAsync delegate, + Instrumenter instrumenter, + Logger eventLogger, + boolean captureMessageContent) { + super(delegate); + this.instrumenter = instrumenter; + this.eventLogger = eventLogger; + this.captureMessageContent = captureMessageContent; + } + + @Override + protected Class getProxyType() { + return ChatCompletionServiceAsync.class; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + + switch (methodName) { + case "create": + if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) { + if (parameterTypes.length == 1) { + return create((ChatCompletionCreateParams) args[0], RequestOptions.none()); + } else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) { + return create((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]); + } + } + break; + case "createStreaming": + if (parameterTypes.length >= 1 && parameterTypes[0] == ChatCompletionCreateParams.class) { + if (parameterTypes.length == 1) { + return createStreaming((ChatCompletionCreateParams) args[0], RequestOptions.none()); + } else if (parameterTypes.length == 2 && parameterTypes[1] == RequestOptions.class) { + return createStreaming((ChatCompletionCreateParams) args[0], (RequestOptions) args[1]); + } + } + break; + default: + // fallthrough + } + + return super.invoke(proxy, method, args); + } + + private CompletableFuture create( + ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { + Context parentCtx = Context.current(); + if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { + return createWithLogs(parentCtx, chatCompletionCreateParams, requestOptions); + } + + Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); + CompletableFuture future; + try (Scope ignored = ctx.makeCurrent()) { + future = createWithLogs(ctx, chatCompletionCreateParams, requestOptions); + } catch (Throwable t) { + instrumenter.end(ctx, chatCompletionCreateParams, null, t); + throw t; + } + + future = + future.whenComplete((res, t) -> instrumenter.end(ctx, chatCompletionCreateParams, res, t)); + return CompletableFutureWrapper.wrap(future, ctx); + } + + private CompletableFuture createWithLogs( + Context ctx, + ChatCompletionCreateParams chatCompletionCreateParams, + RequestOptions requestOptions) { + ChatCompletionEventsHelper.emitPromptLogEvents( + ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + CompletableFuture future = + delegate.create(chatCompletionCreateParams, requestOptions); + future.thenAccept( + r -> + ChatCompletionEventsHelper.emitCompletionLogEvents( + ctx, eventLogger, r, captureMessageContent)); + return future; + } + + private AsyncStreamResponse createStreaming( + ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { + Context parentCtx = Context.current(); + if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { + return createStreamingWithLogs(parentCtx, chatCompletionCreateParams, requestOptions, false); + } + + Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); + try (Scope ignored = ctx.makeCurrent()) { + return createStreamingWithLogs(ctx, chatCompletionCreateParams, requestOptions, true); + } catch (Throwable t) { + instrumenter.end(ctx, chatCompletionCreateParams, null, t); + throw t; + } + } + + private AsyncStreamResponse createStreamingWithLogs( + Context ctx, + ChatCompletionCreateParams chatCompletionCreateParams, + RequestOptions requestOptions, + boolean newSpan) { + ChatCompletionEventsHelper.emitPromptLogEvents( + ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + AsyncStreamResponse result = + delegate.createStreaming(chatCompletionCreateParams, requestOptions); + return new TracingAsyncStreamedResponse( + result, + new StreamListener( + ctx, + chatCompletionCreateParams, + instrumenter, + eventLogger, + captureMessageContent, + newSpan)); + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java new file mode 100644 index 000000000000..8dc8bbc09ed4 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.services.async.ChatServiceAsync; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.lang.reflect.Method; + +final class InstrumentedChatServiceAsync + extends DelegatingInvocationHandler { + + private final Instrumenter instrumenter; + private final Logger eventLogger; + private final boolean captureMessageContent; + + public InstrumentedChatServiceAsync( + ChatServiceAsync delegate, + Instrumenter instrumenter, + Logger eventLogger, + boolean captureMessageContent) { + super(delegate); + this.instrumenter = instrumenter; + this.eventLogger = eventLogger; + this.captureMessageContent = captureMessageContent; + } + + @Override + protected Class getProxyType() { + return ChatServiceAsync.class; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + if (methodName.equals("completions") && parameterTypes.length == 0) { + return new InstrumentedChatCompletionServiceAsync( + delegate.completions(), instrumenter, eventLogger, captureMessageContent) + .createProxy(); + } + return super.invoke(proxy, method, args); + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClient.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClient.java index cc118a85cf33..754c5e8ba7f2 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClient.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClient.java @@ -44,6 +44,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl delegate.chat(), chatInstrumenter, eventLogger, captureMessageContent) .createProxy(); } + if (methodName.equals("async") && parameterTypes.length == 0) { + return new InstrumentedOpenAiClientAsync( + delegate.async(), chatInstrumenter, eventLogger, captureMessageContent) + .createProxy(); + } return super.invoke(proxy, method, args); } } diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClientAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClientAsync.java new file mode 100644 index 000000000000..bc7635590c79 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedOpenAiClientAsync.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import com.openai.client.OpenAIClientAsync; +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.lang.reflect.Method; + +final class InstrumentedOpenAiClientAsync + extends DelegatingInvocationHandler { + + private final Instrumenter chatInstrumenter; + private final Logger eventLogger; + private final boolean captureMessageContent; + + InstrumentedOpenAiClientAsync( + OpenAIClientAsync delegate, + Instrumenter chatInstrumenter, + Logger eventLogger, + boolean captureMessageContent) { + super(delegate); + this.chatInstrumenter = chatInstrumenter; + this.eventLogger = eventLogger; + this.captureMessageContent = captureMessageContent; + } + + @Override + protected Class getProxyType() { + return OpenAIClientAsync.class; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + String methodName = method.getName(); + Class[] parameterTypes = method.getParameterTypes(); + if (methodName.equals("chat") && parameterTypes.length == 0) { + return new InstrumentedChatServiceAsync( + delegate.chat(), chatInstrumenter, eventLogger, captureMessageContent) + .createProxy(); + } + if (methodName.equals("sync") && parameterTypes.length == 0) { + return new InstrumentedOpenAiClient( + delegate.sync(), chatInstrumenter, eventLogger, captureMessageContent) + .createProxy(); + } + return super.invoke(proxy, method, args); + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/OpenAITelemetry.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/OpenAITelemetry.java index ca9f26d1f2ff..7a44eec4a581 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/OpenAITelemetry.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/OpenAITelemetry.java @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.openai.v1_1; import com.openai.client.OpenAIClient; +import com.openai.client.OpenAIClientAsync; import com.openai.models.chat.completions.ChatCompletion; import com.openai.models.chat.completions.ChatCompletionCreateParams; import io.opentelemetry.api.OpenTelemetry; @@ -48,4 +49,11 @@ public OpenAIClient wrap(OpenAIClient client) { client, chatInstrumenter, eventLogger, captureMessageContent) .createProxy(); } + + /** Wraps the provided OpenAIClientAsync, enabling telemetry for it. */ + public OpenAIClientAsync wrap(OpenAIClientAsync client) { + return new InstrumentedOpenAiClientAsync( + client, chatInstrumenter, eventLogger, captureMessageContent) + .createProxy(); + } } diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java new file mode 100644 index 000000000000..533ca3214baf --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import com.openai.models.chat.completions.ChatCompletion; +import com.openai.models.chat.completions.ChatCompletionChunk; +import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.completions.CompletionUsage; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +final class StreamListener { + + private final Context parentCtx; + private final ChatCompletionCreateParams request; + private final List choiceBuffers; + + private final Instrumenter instrumenter; + private final Logger eventLogger; + private final boolean captureMessageContent; + private final boolean newSpan; + private final AtomicBoolean hasEnded; + + @Nullable private CompletionUsage usage; + @Nullable private String model; + @Nullable private String responseId; + + StreamListener( + Context parentCtx, + ChatCompletionCreateParams request, + Instrumenter instrumenter, + Logger eventLogger, + boolean captureMessageContent, + boolean newSpan) { + this.parentCtx = parentCtx; + this.request = request; + this.instrumenter = instrumenter; + this.eventLogger = eventLogger; + this.captureMessageContent = captureMessageContent; + this.newSpan = newSpan; + choiceBuffers = new ArrayList<>(); + hasEnded = new AtomicBoolean(); + } + + void onChunk(ChatCompletionChunk chunk) { + model = chunk.model(); + responseId = chunk.id(); + chunk.usage().ifPresent(u -> usage = u); + + for (ChatCompletionChunk.Choice choice : chunk.choices()) { + while (choiceBuffers.size() <= choice.index()) { + choiceBuffers.add(null); + } + StreamedMessageBuffer buffer = choiceBuffers.get((int) choice.index()); + if (buffer == null) { + buffer = new StreamedMessageBuffer(choice.index(), captureMessageContent); + choiceBuffers.set((int) choice.index(), buffer); + } + buffer.append(choice.delta()); + if (choice.finishReason().isPresent()) { + buffer.finishReason = choice.finishReason().get().toString(); + + // message has ended, let's emit + ChatCompletionEventsHelper.emitCompletionLogEvent( + parentCtx, eventLogger, choice.index(), buffer.finishReason, buffer.toEventBody()); + } + } + } + + void endSpan(@Nullable Throwable error) { + // Use an atomic operation since close() type of methods are exposed to the user + // and can come from any thread. + if (!hasEnded.compareAndSet(false, true)) { + return; + } + + if (model == null || responseId == null) { + // Only happens if we got no chunks, so we have no response. + if (newSpan) { + instrumenter.end(parentCtx, request, null, error); + } + return; + } + + ChatCompletion.Builder result = + ChatCompletion.builder() + .created(0) + .model(model) + .id(responseId) + .choices( + choiceBuffers.stream() + .map(StreamedMessageBuffer::toChoice) + .collect(Collectors.toList())); + + if (usage != null) { + result.usage(usage); + } + + if (newSpan) { + instrumenter.end(parentCtx, request, result.build(), error); + } + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingAsyncStreamedResponse.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingAsyncStreamedResponse.java new file mode 100644 index 000000000000..2c3ca8ee3231 --- /dev/null +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingAsyncStreamedResponse.java @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.openai.v1_1; + +import com.openai.core.http.AsyncStreamResponse; +import com.openai.models.chat.completions.ChatCompletionChunk; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +final class TracingAsyncStreamedResponse implements AsyncStreamResponse { + + private final AsyncStreamResponse delegate; + private final StreamListener listener; + + TracingAsyncStreamedResponse( + AsyncStreamResponse delegate, StreamListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public void close() { + listener.endSpan(null); + delegate.close(); + } + + @Override + public AsyncStreamResponse subscribe( + Handler handler) { + delegate.subscribe(new TracingHandler(handler)); + return this; + } + + @Override + public AsyncStreamResponse subscribe( + Handler handler, Executor executor) { + delegate.subscribe(new TracingHandler(handler), executor); + return this; + } + + @Override + public CompletableFuture onCompleteFuture() { + return delegate.onCompleteFuture(); + } + + private class TracingHandler implements Handler { + + private final Handler delegate; + + private TracingHandler(Handler delegate) { + this.delegate = delegate; + } + + @Override + public void onNext(ChatCompletionChunk chunk) { + listener.onChunk(chunk); + delegate.onNext(chunk); + } + + @Override + public void onComplete(Optional error) { + listener.endSpan(error.orElse(null)); + delegate.onComplete(error); + } + } +} diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingStreamedResponse.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingStreamedResponse.java index e5793c4ed840..5795bb2d1b3f 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingStreamedResponse.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/TracingStreamedResponse.java @@ -5,21 +5,11 @@ package io.opentelemetry.instrumentation.openai.v1_1; -import com.openai.core.JsonField; import com.openai.core.http.StreamResponse; -import com.openai.models.chat.completions.ChatCompletion; import com.openai.models.chat.completions.ChatCompletionChunk; -import com.openai.models.chat.completions.ChatCompletionCreateParams; -import com.openai.models.completions.CompletionUsage; -import io.opentelemetry.api.logs.Logger; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -27,36 +17,11 @@ final class TracingStreamedResponse implements StreamResponse { private final StreamResponse delegate; - private final Context parentCtx; - private final ChatCompletionCreateParams request; - private final List choiceBuffers; - - private final Instrumenter instrumenter; - private final Logger eventLogger; - private final boolean captureMessageContent; - private final boolean newSpan; - - @Nullable private CompletionUsage usage; - @Nullable private String model; - @Nullable private String responseId; - private boolean hasEnded = false; - - TracingStreamedResponse( - StreamResponse delegate, - Context parentCtx, - ChatCompletionCreateParams request, - Instrumenter instrumenter, - Logger eventLogger, - boolean captureMessageContent, - boolean newSpan) { + private final StreamListener listener; + + TracingStreamedResponse(StreamResponse delegate, StreamListener listener) { this.delegate = delegate; - this.parentCtx = parentCtx; - this.request = request; - this.instrumenter = instrumenter; - this.eventLogger = eventLogger; - this.captureMessageContent = captureMessageContent; - this.newSpan = newSpan; - choiceBuffers = new ArrayList<>(); + this.listener = listener; } @Override @@ -66,42 +31,10 @@ public Stream stream() { @Override public void close() { - endSpan(); + listener.endSpan(null); delegate.close(); } - private synchronized void endSpan() { - if (hasEnded) { - return; - } - hasEnded = true; - - ChatCompletion.Builder result = - ChatCompletion.builder() - .created(0) - .choices( - choiceBuffers.stream() - .map(StreamedMessageBuffer::toChoice) - .collect(Collectors.toList())); - if (model != null) { - result.model(model); - } else { - result.model(JsonField.ofNullable(null)); - } - if (responseId != null) { - result.id(responseId); - } else { - result.id(JsonField.ofNullable(null)); - } - if (usage != null) { - result.usage(usage); - } - - if (newSpan) { - instrumenter.end(parentCtx, request, result.build(), null); - } - } - private class TracingSpliterator implements Spliterator { private final Spliterator delegateSpliterator; @@ -115,37 +48,11 @@ public boolean tryAdvance(Consumer action) { boolean chunkReceived = delegateSpliterator.tryAdvance( chunk -> { - model = chunk.model(); - responseId = chunk.id(); - chunk.usage().ifPresent(u -> usage = u); - - for (ChatCompletionChunk.Choice choice : chunk.choices()) { - while (choiceBuffers.size() <= choice.index()) { - choiceBuffers.add(null); - } - StreamedMessageBuffer buffer = choiceBuffers.get((int) choice.index()); - if (buffer == null) { - buffer = new StreamedMessageBuffer(choice.index(), captureMessageContent); - choiceBuffers.set((int) choice.index(), buffer); - } - buffer.append(choice.delta()); - if (choice.finishReason().isPresent()) { - buffer.finishReason = choice.finishReason().get().toString(); - - // message has ended, let's emit - ChatCompletionEventsHelper.emitCompletionLogEvent( - eventLogger, - choice.index(), - buffer.finishReason, - buffer.toEventBody(), - parentCtx); - } - } - + listener.onChunk(chunk); action.accept(chunk); }); if (!chunkReceived) { - endSpan(); + listener.endSpan(null); } return chunkReceived; } diff --git a/instrumentation/openai/openai-java-1.1/library/src/test/java/io/opentelemetry/instrumentation/openai/v1_1/ChatTest.java b/instrumentation/openai/openai-java-1.1/library/src/test/java/io/opentelemetry/instrumentation/openai/v1_1/ChatTest.java index 5b8e78edb066..1937c11f75d7 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/test/java/io/opentelemetry/instrumentation/openai/v1_1/ChatTest.java +++ b/instrumentation/openai/openai-java-1.1/library/src/test/java/io/opentelemetry/instrumentation/openai/v1_1/ChatTest.java @@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.openai.client.OpenAIClient; -import com.openai.core.http.StreamResponse; +import com.openai.client.OpenAIClientAsync; import com.openai.models.chat.completions.ChatCompletion; import com.openai.models.chat.completions.ChatCompletionChunk; import com.openai.models.chat.completions.ChatCompletionCreateParams; @@ -72,6 +72,11 @@ protected OpenAIClient wrap(OpenAIClient client) { return telemetry.wrap(client); } + @Override + protected OpenAIClientAsync wrap(OpenAIClientAsync client) { + return telemetry.wrap(client); + } + @Override // OpenAI SDK does not expose OkHttp client in a way we can wrap. protected final List> maybeWithTransportSpan( @@ -79,18 +84,24 @@ protected final List> maybeWithTransportSpan( return singletonList(span); } + private OpenAIClient clientNoCaptureContent() { + return OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); + } + + private OpenAIClientAsync clientAsyncNoCaptureContent() { + return OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClientAsync()); + } + @Test void basicNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() .messages(singletonList(createUserMessage(TEST_CHAT_INPUT))) .model(TEST_CHAT_MODEL) .build(); - ChatCompletion response = client.chat().completions().create(params); + ChatCompletion response = + doCompletions(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); String content = "Atlantic Ocean"; assertThat(response.choices().get(0).message().content()).hasValue(content); @@ -179,9 +190,6 @@ void basicNoCaptureContent() { @Test void multipleChoicesNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() .messages(Collections.singletonList(createUserMessage(TEST_CHAT_INPUT))) @@ -189,7 +197,8 @@ void multipleChoicesNoCaptureContent() { .n(2) .build(); - ChatCompletion response = client.chat().completions().create(params); + ChatCompletion response = + doCompletions(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); String content1 = "South Atlantic Ocean."; assertThat(response.choices().get(0).message().content()).hasValue(content1); String content2 = "Atlantic Ocean."; @@ -288,9 +297,6 @@ void multipleChoicesNoCaptureContent() { @Test void toolCallsNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - List chatMessages = new ArrayList<>(); chatMessages.add(createSystemMessage("You are a helpful assistant providing weather updates.")); chatMessages.add(createUserMessage("What is the weather in New York City and London?")); @@ -302,7 +308,8 @@ void toolCallsNoCaptureContent() { .addTool(buildGetWeatherToolDefinition()) .build(); - ChatCompletion response = client.chat().completions().create(params); + ChatCompletion response = + doCompletions(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); assertThat(response.choices().get(0).message().content()).isEmpty(); @@ -441,14 +448,10 @@ void toolCallsNoCaptureContent() { chatMessages.add(createToolMessage("25 degrees and sunny", newYorkCallId)); chatMessages.add(createToolMessage("15 degrees and raining", londonCallId)); - client - .chat() - .completions() - .create( - ChatCompletionCreateParams.builder() - .messages(chatMessages) - .model(TEST_CHAT_MODEL) - .build()); + doCompletions( + ChatCompletionCreateParams.builder().messages(chatMessages).model(TEST_CHAT_MODEL).build(), + clientNoCaptureContent(), + clientAsyncNoCaptureContent()); getTesting() .waitAndAssertTraces( @@ -572,20 +575,14 @@ void toolCallsNoCaptureContent() { @Test void streamNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() .messages(Collections.singletonList(createUserMessage(TEST_CHAT_INPUT))) .model(TEST_CHAT_MODEL) .build(); - List chunks; - try (StreamResponse result = - client.chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = + doCompletionsStreaming(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); String fullMessage = chunks.stream() @@ -662,9 +659,6 @@ void streamNoCaptureContent() { @Test void streamMultipleChoicesNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() .messages(Collections.singletonList(createUserMessage(TEST_CHAT_INPUT))) @@ -672,11 +666,8 @@ void streamMultipleChoicesNoCaptureContent() { .n(2) .build(); - List chunks; - try (StreamResponse result = - client.chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = + doCompletionsStreaming(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); StringBuilder content1Builder = new StringBuilder(); StringBuilder content2Builder = new StringBuilder(); @@ -769,9 +760,6 @@ void streamMultipleChoicesNoCaptureContent() { @Test void streamToolCallsNoCaptureContent() { - OpenAIClient client = - OpenAITelemetry.builder(testing.getOpenTelemetry()).build().wrap(getRawClient()); - List chatMessages = new ArrayList<>(); chatMessages.add(createSystemMessage("You are a helpful assistant providing weather updates.")); chatMessages.add(createUserMessage("What is the weather in New York City and London?")); @@ -783,11 +771,8 @@ void streamToolCallsNoCaptureContent() { .addTool(buildGetWeatherToolDefinition()) .build(); - List chunks; - try (StreamResponse result = - client.chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = + doCompletionsStreaming(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); List toolCalls = new ArrayList<>(); @@ -949,10 +934,7 @@ void streamToolCallsNoCaptureContent() { .addTool(buildGetWeatherToolDefinition()) .build(); - try (StreamResponse result = - client.chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + doCompletionsStreaming(params, clientNoCaptureContent(), clientAsyncNoCaptureContent()); getTesting() .waitAndAssertTraces( diff --git a/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java b/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java index 8e4a27f26ca2..e2a6ad65dae9 100644 --- a/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java +++ b/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java @@ -33,9 +33,12 @@ import static org.assertj.core.api.Assertions.catchThrowable; import com.openai.client.OpenAIClient; +import com.openai.client.OpenAIClientAsync; import com.openai.client.okhttp.OpenAIOkHttpClient; +import com.openai.client.okhttp.OpenAIOkHttpClientAsync; import com.openai.core.JsonObject; import com.openai.core.JsonValue; +import com.openai.core.http.AsyncStreamResponse; import com.openai.core.http.StreamResponse; import com.openai.errors.OpenAIIoException; import com.openai.models.FunctionDefinition; @@ -67,12 +70,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionException; import java.util.function.Consumer; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.Parameter; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.EnumSource; +@ParameterizedClass +@EnumSource(AbstractChatTest.TestType.class) public abstract class AbstractChatTest { + enum TestType { + SYNC, + SYNC_FROM_ASYNC, + ASYNC, + ASYNC_FROM_SYNC, + } + protected static final String INSTRUMENTATION_NAME = "io.opentelemetry.openai-java-1.1"; private static final String API_URL = "https://api.openai.com/v1"; @@ -90,7 +106,9 @@ public abstract class AbstractChatTest { protected abstract OpenAIClient wrap(OpenAIClient client); - protected OpenAIClient getRawClient() { + protected abstract OpenAIClientAsync wrap(OpenAIClientAsync client); + + protected final OpenAIClient getRawClient() { OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder().baseUrl("http://localhost:" + recording.getPort()); if (recording.isRecording()) { @@ -101,13 +119,96 @@ protected OpenAIClient getRawClient() { return builder.build(); } - protected OpenAIClient getClient() { + protected final OpenAIClientAsync getRawClientAsync() { + OpenAIOkHttpClientAsync.Builder builder = + OpenAIOkHttpClientAsync.builder().baseUrl("http://localhost:" + recording.getPort()); + if (recording.isRecording()) { + builder.apiKey(System.getenv("OPENAI_API_KEY")); + } else { + builder.apiKey("unused"); + } + return builder.build(); + } + + protected final OpenAIClient getClient() { return wrap(getRawClient()); } + protected final OpenAIClientAsync getClientAsync() { + return wrap(getRawClientAsync()); + } + protected abstract List> maybeWithTransportSpan( Consumer span); + @Parameter TestType testType; + + protected final ChatCompletion doCompletions(ChatCompletionCreateParams params) { + return doCompletions(params, getClient(), getClientAsync()); + } + + protected final ChatCompletion doCompletions( + ChatCompletionCreateParams params, OpenAIClient client, OpenAIClientAsync clientAsync) { + switch (testType) { + case SYNC: + return client.chat().completions().create(params); + case SYNC_FROM_ASYNC: + return clientAsync.sync().chat().completions().create(params); + case ASYNC: + case ASYNC_FROM_SYNC: + OpenAIClientAsync cl = testType == TestType.ASYNC ? clientAsync : client.async(); + try { + return cl.chat().completions().create(params).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof OpenAIIoException) { + throw ((OpenAIIoException) e.getCause()); + } + throw e; + } + } + throw new AssertionError(); + } + + protected final List doCompletionsStreaming( + ChatCompletionCreateParams params) { + return doCompletionsStreaming(params, getClient(), getClientAsync()); + } + + protected final List doCompletionsStreaming( + ChatCompletionCreateParams params, OpenAIClient client, OpenAIClientAsync clientAsync) { + switch (testType) { + case SYNC: + try (StreamResponse result = + client.chat().completions().createStreaming(params)) { + return result.stream().collect(Collectors.toList()); + } + case SYNC_FROM_ASYNC: + try (StreamResponse result = + clientAsync.sync().chat().completions().createStreaming(params)) { + return result.stream().collect(Collectors.toList()); + } + case ASYNC: + case ASYNC_FROM_SYNC: + { + OpenAIClientAsync cl = testType == TestType.ASYNC ? clientAsync : client.async(); + AsyncStreamResponse stream = + cl.chat().completions().createStreaming(params); + List result = new ArrayList<>(); + stream.subscribe(result::add); + try { + stream.onCompleteFuture().join(); + } catch (CompletionException e) { + if (e.getCause() instanceof OpenAIIoException) { + throw ((OpenAIIoException) e.getCause()); + } + throw e; + } + return result; + } + } + throw new AssertionError(); + } + @Test void basic() { ChatCompletionCreateParams params = @@ -116,7 +217,7 @@ void basic() { .model(TEST_CHAT_MODEL) .build(); - ChatCompletion response = getClient().chat().completions().create(params); + ChatCompletion response = doCompletions(params); String content = "Atlantic Ocean"; assertThat(response.choices().get(0).message().content()).hasValue(content); @@ -217,7 +318,7 @@ void testDeveloperMessage() { .model(TEST_CHAT_MODEL) .build(); - ChatCompletion response = getClient().chat().completions().create(params); + ChatCompletion response = doCompletions(params); String content = "Tomato."; assertThat(response.choices().get(0).message().content()).hasValue(content); @@ -271,7 +372,7 @@ void allTheClientOptions() { .responseFormat(ResponseFormatText.builder().build()) .build(); - ChatCompletion response = getClient().chat().completions().create(params); + ChatCompletion response = doCompletions(params); String content = "Southern Ocean."; assertThat(response.choices().get(0).message().content()).hasValue(content); @@ -376,7 +477,7 @@ void multipleChoices() { .n(2) .build(); - ChatCompletion response = getClient().chat().completions().create(params); + ChatCompletion response = doCompletions(params); String content1 = "South Atlantic Ocean."; assertThat(response.choices().get(0).message().content()).hasValue(content1); String content2 = "Atlantic Ocean."; @@ -489,7 +590,7 @@ void toolCalls() { .addTool(buildGetWeatherToolDefinition()) .build(); - ChatCompletion response = getClient().chat().completions().create(params); + ChatCompletion response = doCompletions(params); assertThat(response.choices().get(0).message().content()).isEmpty(); @@ -815,6 +916,13 @@ void connectionError() { .apiKey("testing") .maxRetries(0) .build()); + OpenAIClientAsync clientAsync = + wrap( + OpenAIOkHttpClientAsync.builder() + .baseUrl("http://localhost:9999/v5") + .apiKey("testing") + .maxRetries(0) + .build()); ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() @@ -822,7 +930,7 @@ void connectionError() { .model(TEST_CHAT_MODEL) .build(); - Throwable thrown = catchThrowable(() -> client.chat().completions().create(params)); + Throwable thrown = catchThrowable(() -> doCompletions(params, client, clientAsync)); assertThat(thrown).isInstanceOf(OpenAIIoException.class); getTesting() @@ -873,11 +981,7 @@ void stream() { .model(TEST_CHAT_MODEL) .build(); - List chunks; - try (StreamResponse result = - getClient().chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = doCompletionsStreaming(params); String fullMessage = chunks.stream() @@ -962,11 +1066,7 @@ void streamIncludeUsage() { .streamOptions(ChatCompletionStreamOptions.builder().includeUsage(true).build()) .build(); - List chunks; - try (StreamResponse result = - getClient().chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = doCompletionsStreaming(params); String fullMessage = chunks.stream() @@ -1078,11 +1178,7 @@ void streamMultipleChoices() { .n(2) .build(); - List chunks; - try (StreamResponse result = - getClient().chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = doCompletionsStreaming(params); StringBuilder content1Builder = new StringBuilder(); StringBuilder content2Builder = new StringBuilder(); @@ -1188,11 +1284,7 @@ void streamToolCalls() { .addTool(buildGetWeatherToolDefinition()) .build(); - List chunks; - try (StreamResponse result = - getClient().chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + List chunks = doCompletionsStreaming(params); List toolCalls = new ArrayList<>(); @@ -1363,10 +1455,7 @@ void streamToolCalls() { .addTool(buildGetWeatherToolDefinition()) .build(); - try (StreamResponse result = - getClient().chat().completions().createStreaming(params)) { - chunks = result.stream().collect(Collectors.toList()); - } + chunks = doCompletionsStreaming(params); String finalAnswer = chunks.stream() @@ -1510,6 +1599,13 @@ void streamConnectionError() { .apiKey("testing") .maxRetries(0) .build()); + OpenAIClientAsync clientAsync = + wrap( + OpenAIOkHttpClientAsync.builder() + .baseUrl("http://localhost:9999/v5") + .apiKey("testing") + .maxRetries(0) + .build()); ChatCompletionCreateParams params = ChatCompletionCreateParams.builder() @@ -1517,7 +1613,7 @@ void streamConnectionError() { .model(TEST_CHAT_MODEL) .build(); - Throwable thrown = catchThrowable(() -> client.chat().completions().createStreaming(params)); + Throwable thrown = catchThrowable(() -> doCompletionsStreaming(params, client, clientAsync)); assertThat(thrown).isInstanceOf(OpenAIIoException.class); getTesting() From 4b78428fed68bb9282f1ff70e2ca864b1ea8f4df Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 24 Jul 2025 17:05:38 +0900 Subject: [PATCH 2/4] Cleanup --- .../instrumentation/openai/v1_1/CompletableFutureWrapper.java | 2 +- .../instrumentation/openai/v1_1/InstrumentedChatService.java | 2 +- .../openai/v1_1/InstrumentedChatServiceAsync.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java index 96b1b565daa7..3f634b856b49 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/CompletableFutureWrapper.java @@ -12,7 +12,7 @@ final class CompletableFutureWrapper { private CompletableFutureWrapper() {} - public static CompletableFuture wrap(CompletableFuture future, Context context) { + static CompletableFuture wrap(CompletableFuture future, Context context) { CompletableFuture result = new CompletableFuture<>(); future.whenComplete( (T value, Throwable throwable) -> { diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatService.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatService.java index 11c9dba134da..0397279f119a 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatService.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatService.java @@ -19,7 +19,7 @@ final class InstrumentedChatService private final Logger eventLogger; private final boolean captureMessageContent; - public InstrumentedChatService( + InstrumentedChatService( ChatService delegate, Instrumenter instrumenter, Logger eventLogger, diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java index 8dc8bbc09ed4..4bbf888b50c3 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatServiceAsync.java @@ -19,7 +19,7 @@ final class InstrumentedChatServiceAsync private final Logger eventLogger; private final boolean captureMessageContent; - public InstrumentedChatServiceAsync( + InstrumentedChatServiceAsync( ChatServiceAsync delegate, Instrumenter instrumenter, Logger eventLogger, From df19b40919da9d50345aae62b33d542920ce4373 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 25 Jul 2025 09:14:35 +0900 Subject: [PATCH 3/4] Rename context --- .../v1_1/ChatCompletionEventsHelper.java | 19 +++++--- .../InstrumentedChatCompletionService.java | 43 ++++++++--------- ...nstrumentedChatCompletionServiceAsync.java | 46 ++++++++++--------- .../openai/v1_1/StreamListener.java | 12 ++--- 4 files changed, 65 insertions(+), 55 deletions(-) diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java index 0703c2cc4e0c..219bd86b4e6d 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/ChatCompletionEventsHelper.java @@ -35,7 +35,7 @@ final class ChatCompletionEventsHelper { private static final AttributeKey EVENT_NAME = stringKey("event.name"); public static void emitPromptLogEvents( - Context ctx, + Context context, Logger eventLogger, ChatCompletionCreateParams request, boolean captureMessageContent) { @@ -86,7 +86,7 @@ public static void emitPromptLogEvents( } else { continue; } - newEvent(eventLogger, eventType).setContext(ctx).setBody(Value.of(body)).emit(); + newEvent(eventLogger, eventType).setContext(context).setBody(Value.of(body)).emit(); } } @@ -162,7 +162,10 @@ private static String joinContentParts(List conte } public static void emitCompletionLogEvents( - Context ctx, Logger eventLogger, ChatCompletion completion, boolean captureMessageContent) { + Context context, + Logger eventLogger, + ChatCompletion completion, + boolean captureMessageContent) { for (ChatCompletion.Choice choice : completion.choices()) { ChatCompletionMessage choiceMsg = choice.message(); Map> message = new HashMap<>(); @@ -181,12 +184,16 @@ public static void emitCompletionLogEvents( .collect(Collectors.toList()))); }); emitCompletionLogEvent( - ctx, eventLogger, choice.index(), choice.finishReason().toString(), Value.of(message)); + context, + eventLogger, + choice.index(), + choice.finishReason().toString(), + Value.of(message)); } } public static void emitCompletionLogEvent( - Context ctx, + Context context, Logger eventLogger, long index, String finishReason, @@ -195,7 +202,7 @@ public static void emitCompletionLogEvent( body.put("finish_reason", Value.of(finishReason)); body.put("index", Value.of(index)); body.put("message", eventMessageObject); - newEvent(eventLogger, "gen_ai.choice").setContext(ctx).setBody(Value.of(body)).emit(); + newEvent(eventLogger, "gen_ai.choice").setContext(context).setBody(Value.of(body)).emit(); } private static LogRecordBuilder newEvent(Logger eventLogger, String name) { diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java index ec171203631b..cef6b2ba2f68 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionService.java @@ -73,65 +73,66 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl private ChatCompletion create( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { - Context parentCtx = Context.current(); - if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createWithLogs(parentCtx, chatCompletionCreateParams, requestOptions); + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) { + return createWithLogs(parentContext, chatCompletionCreateParams, requestOptions); } - Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); + Context context = instrumenter.start(parentContext, chatCompletionCreateParams); ChatCompletion completion; - try (Scope ignored = ctx.makeCurrent()) { - completion = createWithLogs(ctx, chatCompletionCreateParams, requestOptions); + try (Scope ignored = context.makeCurrent()) { + completion = createWithLogs(context, chatCompletionCreateParams, requestOptions); } catch (Throwable t) { - instrumenter.end(ctx, chatCompletionCreateParams, null, t); + instrumenter.end(context, chatCompletionCreateParams, null, t); throw t; } - instrumenter.end(ctx, chatCompletionCreateParams, completion, null); + instrumenter.end(context, chatCompletionCreateParams, completion, null); return completion; } private ChatCompletion createWithLogs( - Context ctx, + Context context, ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { ChatCompletionEventsHelper.emitPromptLogEvents( - ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + context, eventLogger, chatCompletionCreateParams, captureMessageContent); ChatCompletion result = delegate.create(chatCompletionCreateParams, requestOptions); ChatCompletionEventsHelper.emitCompletionLogEvents( - ctx, eventLogger, result, captureMessageContent); + context, eventLogger, result, captureMessageContent); return result; } private StreamResponse createStreaming( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { - Context parentCtx = Context.current(); - if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createStreamingWithLogs(parentCtx, chatCompletionCreateParams, requestOptions, false); + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) { + return createStreamingWithLogs( + parentContext, chatCompletionCreateParams, requestOptions, false); } - Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); - try (Scope ignored = ctx.makeCurrent()) { - return createStreamingWithLogs(ctx, chatCompletionCreateParams, requestOptions, true); + Context context = instrumenter.start(parentContext, chatCompletionCreateParams); + try (Scope ignored = context.makeCurrent()) { + return createStreamingWithLogs(context, chatCompletionCreateParams, requestOptions, true); } catch (Throwable t) { - instrumenter.end(ctx, chatCompletionCreateParams, null, t); + instrumenter.end(context, chatCompletionCreateParams, null, t); throw t; } } private StreamResponse createStreamingWithLogs( - Context ctx, + Context context, ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions, boolean newSpan) { ChatCompletionEventsHelper.emitPromptLogEvents( - ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + context, eventLogger, chatCompletionCreateParams, captureMessageContent); StreamResponse result = delegate.createStreaming(chatCompletionCreateParams, requestOptions); return new TracingStreamedResponse( result, new StreamListener( - ctx, + context, chatCompletionCreateParams, instrumenter, eventLogger, diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java index f8e8d88903a7..d8908d6229ce 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java @@ -75,69 +75,71 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl private CompletableFuture create( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { - Context parentCtx = Context.current(); - if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createWithLogs(parentCtx, chatCompletionCreateParams, requestOptions); + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) { + return createWithLogs(parentContext, chatCompletionCreateParams, requestOptions); } - Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); + Context context = instrumenter.start(parentContext, chatCompletionCreateParams); CompletableFuture future; - try (Scope ignored = ctx.makeCurrent()) { - future = createWithLogs(ctx, chatCompletionCreateParams, requestOptions); + try (Scope ignored = context.makeCurrent()) { + future = createWithLogs(context, chatCompletionCreateParams, requestOptions); } catch (Throwable t) { - instrumenter.end(ctx, chatCompletionCreateParams, null, t); + instrumenter.end(context, chatCompletionCreateParams, null, t); throw t; } future = - future.whenComplete((res, t) -> instrumenter.end(ctx, chatCompletionCreateParams, res, t)); - return CompletableFutureWrapper.wrap(future, ctx); + future.whenComplete( + (res, t) -> instrumenter.end(context, chatCompletionCreateParams, res, t)); + return CompletableFutureWrapper.wrap(future, context); } private CompletableFuture createWithLogs( - Context ctx, + Context context, ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { ChatCompletionEventsHelper.emitPromptLogEvents( - ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + context, eventLogger, chatCompletionCreateParams, captureMessageContent); CompletableFuture future = delegate.create(chatCompletionCreateParams, requestOptions); future.thenAccept( r -> ChatCompletionEventsHelper.emitCompletionLogEvents( - ctx, eventLogger, r, captureMessageContent)); + context, eventLogger, r, captureMessageContent)); return future; } private AsyncStreamResponse createStreaming( ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions) { - Context parentCtx = Context.current(); - if (!instrumenter.shouldStart(parentCtx, chatCompletionCreateParams)) { - return createStreamingWithLogs(parentCtx, chatCompletionCreateParams, requestOptions, false); + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, chatCompletionCreateParams)) { + return createStreamingWithLogs( + parentContext, chatCompletionCreateParams, requestOptions, false); } - Context ctx = instrumenter.start(parentCtx, chatCompletionCreateParams); - try (Scope ignored = ctx.makeCurrent()) { - return createStreamingWithLogs(ctx, chatCompletionCreateParams, requestOptions, true); + Context context = instrumenter.start(parentContext, chatCompletionCreateParams); + try (Scope ignored = context.makeCurrent()) { + return createStreamingWithLogs(context, chatCompletionCreateParams, requestOptions, true); } catch (Throwable t) { - instrumenter.end(ctx, chatCompletionCreateParams, null, t); + instrumenter.end(context, chatCompletionCreateParams, null, t); throw t; } } private AsyncStreamResponse createStreamingWithLogs( - Context ctx, + Context context, ChatCompletionCreateParams chatCompletionCreateParams, RequestOptions requestOptions, boolean newSpan) { ChatCompletionEventsHelper.emitPromptLogEvents( - ctx, eventLogger, chatCompletionCreateParams, captureMessageContent); + context, eventLogger, chatCompletionCreateParams, captureMessageContent); AsyncStreamResponse result = delegate.createStreaming(chatCompletionCreateParams, requestOptions); return new TracingAsyncStreamedResponse( result, new StreamListener( - ctx, + context, chatCompletionCreateParams, instrumenter, eventLogger, diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java index 533ca3214baf..9cda4c1f01f7 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/StreamListener.java @@ -20,7 +20,7 @@ final class StreamListener { - private final Context parentCtx; + private final Context context; private final ChatCompletionCreateParams request; private final List choiceBuffers; @@ -35,13 +35,13 @@ final class StreamListener { @Nullable private String responseId; StreamListener( - Context parentCtx, + Context context, ChatCompletionCreateParams request, Instrumenter instrumenter, Logger eventLogger, boolean captureMessageContent, boolean newSpan) { - this.parentCtx = parentCtx; + this.context = context; this.request = request; this.instrumenter = instrumenter; this.eventLogger = eventLogger; @@ -71,7 +71,7 @@ void onChunk(ChatCompletionChunk chunk) { // message has ended, let's emit ChatCompletionEventsHelper.emitCompletionLogEvent( - parentCtx, eventLogger, choice.index(), buffer.finishReason, buffer.toEventBody()); + context, eventLogger, choice.index(), buffer.finishReason, buffer.toEventBody()); } } } @@ -86,7 +86,7 @@ void endSpan(@Nullable Throwable error) { if (model == null || responseId == null) { // Only happens if we got no chunks, so we have no response. if (newSpan) { - instrumenter.end(parentCtx, request, null, error); + instrumenter.end(context, request, null, error); } return; } @@ -106,7 +106,7 @@ void endSpan(@Nullable Throwable error) { } if (newSpan) { - instrumenter.end(parentCtx, request, result.build(), error); + instrumenter.end(context, request, result.build(), error); } } } From 87820df3b064ea2b80503268127a47f4653dea7b Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 25 Jul 2025 15:05:05 +0900 Subject: [PATCH 4/4] Fix context --- .../v1_1/InstrumentedChatCompletionServiceAsync.java | 2 +- .../openai/v1_1/AbstractChatTest.java | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java index d8908d6229ce..5b2887b304eb 100644 --- a/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java +++ b/instrumentation/openai/openai-java-1.1/library/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/InstrumentedChatCompletionServiceAsync.java @@ -92,7 +92,7 @@ private CompletableFuture create( future = future.whenComplete( (res, t) -> instrumenter.end(context, chatCompletionCreateParams, res, t)); - return CompletableFutureWrapper.wrap(future, context); + return CompletableFutureWrapper.wrap(future, parentContext); } private CompletableFuture createWithLogs( diff --git a/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java b/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java index e2a6ad65dae9..e1f11a464405 100644 --- a/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java +++ b/instrumentation/openai/openai-java-1.1/testing/src/main/java/io/opentelemetry/instrumentation/openai/v1_1/AbstractChatTest.java @@ -59,7 +59,9 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.KeyValue; import io.opentelemetry.api.common.Value; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.recording.RecordingExtension; import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; @@ -158,7 +160,15 @@ protected final ChatCompletion doCompletions( case ASYNC_FROM_SYNC: OpenAIClientAsync cl = testType == TestType.ASYNC ? clientAsync : client.async(); try { - return cl.chat().completions().create(params).join(); + return cl.chat() + .completions() + .create(params) + .thenApply( + res -> { + assertThat(Span.fromContextOrNull(Context.current())).isNull(); + return res; + }) + .join(); } catch (CompletionException e) { if (e.getCause() instanceof OpenAIIoException) { throw ((OpenAIIoException) e.getCause());