Skip to content

Commit b862ad4

Browse files
google-genai-botcopybara-github
authored andcommitted
fix: Runner now includes Singles from appendEvent in the Rx graph
Previously, the return values from sessionManager.appendEvent were disarded, leading to issues if that Single did not represent an immediate value. For example, if a SessionManager implementation performed async work such as persisting the new event to an external datastore that work would never occur. PiperOrigin-RevId: 782076407
1 parent c4e363a commit b862ad4

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.reactivex.rxjava3.core.Flowable;
3939
import io.reactivex.rxjava3.core.Maybe;
4040
import io.reactivex.rxjava3.core.Single;
41+
import io.reactivex.rxjava3.core.Completable;
4142
import java.util.ArrayList;
4243
import java.util.Collections;
4344
import java.util.List;
@@ -83,7 +84,7 @@ public BaseSessionService sessionService() {
8384
*
8485
* @throws IllegalArgumentException if message has no parts.
8586
*/
86-
private void appendNewMessageToSession(
87+
private Completable appendNewMessageToSession(
8788
Session session,
8889
Content newMessage,
8990
InvocationContext invocationContext,
@@ -123,7 +124,7 @@ private void appendNewMessageToSession(
123124
.author("user")
124125
.content(Optional.of(newMessage))
125126
.build();
126-
this.sessionService.appendEvent(session, event);
127+
return this.sessionService.appendEvent(session, event).ignoreElement();
127128
}
128129

129130
/**
@@ -190,14 +191,20 @@ public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig r
190191
newMessage,
191192
runConfig);
192193

194+
Completable newMessageCompletable = Completable.complete();
193195
if (newMessage != null) {
194-
appendNewMessageToSession(
195-
sess, newMessage, invocationContext, runConfig.saveInputBlobsAsArtifacts());
196+
newMessageCompletable =
197+
appendNewMessageToSession(
198+
sess,
199+
newMessage,
200+
invocationContext,
201+
runConfig.saveInputBlobsAsArtifacts());
196202
}
197203

198204
invocationContext.agent(this.findAgentToRun(sess, rootAgent));
199205
Flowable<Event> events = invocationContext.agent().runAsync(invocationContext);
200-
return events.doOnNext(event -> this.sessionService.appendEvent(sess, event));
206+
return newMessageCompletable.andThen(
207+
events.concatMapSingle(event -> this.sessionService.appendEvent(sess, event)));
201208
})
202209
.doOnError(
203210
throwable -> {

0 commit comments

Comments
 (0)