diff --git a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java index 5ec10309..616ab519 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java @@ -454,45 +454,16 @@ public Flowable runLive(InvocationContext invocationContext) { String eventIdForSendData = Event.generateEventId(); LlmAgent agent = (LlmAgent) invocationContext.agent(); - BaseLlm llm = - agent.resolvedModel().model().isPresent() - ? agent.resolvedModel().model().get() - : LlmRegistry.getLlm(agent.resolvedModel().modelName().get()); + BaseLlm llm = agent.resolvedModel().model() + .orElseGet(() -> LlmRegistry.getLlm(agent.resolvedModel().modelName() + .orElseThrow())); + BaseLlmConnection connection = llm.connect(llmRequestAfterPreprocess); Completable historySent = llmRequestAfterPreprocess.contents().isEmpty() ? Completable.complete() : Completable.defer( - () -> { - Span sendDataSpan = - Telemetry.getTracer().spanBuilder("send_data").startSpan(); - try (Scope scope = sendDataSpan.makeCurrent()) { - return connection - .sendHistory(llmRequestAfterPreprocess.contents()) - .doOnComplete( - () -> { - try (Scope innerScope = sendDataSpan.makeCurrent()) { - Telemetry.traceSendData( - invocationContext, - eventIdForSendData, - llmRequestAfterPreprocess.contents()); - } - }) - .doOnError( - error -> { - sendDataSpan.setStatus( - StatusCode.ERROR, error.getMessage()); - sendDataSpan.recordException(error); - try (Scope innerScope = sendDataSpan.makeCurrent()) { - Telemetry.traceSendData( - invocationContext, - eventIdForSendData, - llmRequestAfterPreprocess.contents()); - } - }) - .doFinally(sendDataSpan::end); - } - }); + () -> sendHistory(invocationContext, connection, llmRequestAfterPreprocess, eventIdForSendData)); Flowable liveRequests = invocationContext.liveRequestQueue().get().get(); Disposable sendTask = @@ -588,7 +559,33 @@ public void onError(Throwable e) { }); } - /** + private static Completable sendHistory( + final InvocationContext invocationContext, + final BaseLlmConnection connection, + final LlmRequest llmRequestAfterPreprocess, + final String eventIdForSendData) { + Span sendDataSpan = Telemetry.getTracer().spanBuilder("send_data").startSpan(); + try (Scope scope = sendDataSpan.makeCurrent()) { + return connection + .sendHistory(llmRequestAfterPreprocess.contents()) + .doOnEvent( + error -> { + if (error != null) { + sendDataSpan.setStatus(StatusCode.ERROR, error.getMessage()); + sendDataSpan.recordException(error); + } + try (Scope innerScope = sendDataSpan.makeCurrent()) { + Telemetry.traceSendData( + invocationContext, + eventIdForSendData, + llmRequestAfterPreprocess.contents()); + } + }) + .doFinally(sendDataSpan::end); + } + } + + /** * Builds an {@link Event} from LLM response, request, and base event data. * *

Populates the event with LLM output and tool function call metadata.