diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index f89e49d6b..00a3a18f5 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -38,6 +38,7 @@ import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.Completable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -83,7 +84,7 @@ public BaseSessionService sessionService() { * * @throws IllegalArgumentException if message has no parts. */ - private void appendNewMessageToSession( + private Completable appendNewMessageToSession( Session session, Content newMessage, InvocationContext invocationContext, @@ -123,7 +124,7 @@ private void appendNewMessageToSession( .author("user") .content(Optional.of(newMessage)) .build(); - this.sessionService.appendEvent(session, event); + return this.sessionService.appendEvent(session, event).ignoreElement(); } /** @@ -190,14 +191,20 @@ public Flowable runAsync(Session session, Content newMessage, RunConfig r newMessage, runConfig); + Completable newMessageCompletable = Completable.complete(); if (newMessage != null) { - appendNewMessageToSession( - sess, newMessage, invocationContext, runConfig.saveInputBlobsAsArtifacts()); + newMessageCompletable = + appendNewMessageToSession( + sess, + newMessage, + invocationContext, + runConfig.saveInputBlobsAsArtifacts()); } invocationContext.agent(this.findAgentToRun(sess, rootAgent)); Flowable events = invocationContext.agent().runAsync(invocationContext); - return events.doOnNext(event -> this.sessionService.appendEvent(sess, event)); + return newMessageCompletable.andThen( + events.concatMapSingle(event -> this.sessionService.appendEvent(sess, event))); }) .doOnError( throwable -> {