|
38 | 38 | import io.reactivex.rxjava3.core.Flowable; |
39 | 39 | import io.reactivex.rxjava3.core.Maybe; |
40 | 40 | import io.reactivex.rxjava3.core.Single; |
41 | | -import io.reactivex.rxjava3.core.Completable; |
42 | 41 | import java.util.ArrayList; |
43 | 42 | import java.util.Collections; |
44 | 43 | import java.util.List; |
@@ -84,7 +83,7 @@ public BaseSessionService sessionService() { |
84 | 83 | * |
85 | 84 | * @throws IllegalArgumentException if message has no parts. |
86 | 85 | */ |
87 | | - private Completable appendNewMessageToSession( |
| 86 | + private void appendNewMessageToSession( |
88 | 87 | Session session, |
89 | 88 | Content newMessage, |
90 | 89 | InvocationContext invocationContext, |
@@ -124,7 +123,7 @@ private Completable appendNewMessageToSession( |
124 | 123 | .author("user") |
125 | 124 | .content(Optional.of(newMessage)) |
126 | 125 | .build(); |
127 | | - return this.sessionService.appendEvent(session, event).ignoreElement(); |
| 126 | + this.sessionService.appendEvent(session, event); |
128 | 127 | } |
129 | 128 |
|
130 | 129 | /** |
@@ -191,20 +190,14 @@ public Flowable<Event> runAsync(Session session, Content newMessage, RunConfig r |
191 | 190 | newMessage, |
192 | 191 | runConfig); |
193 | 192 |
|
194 | | - Completable newMessageCompletable = Completable.complete(); |
195 | 193 | if (newMessage != null) { |
196 | | - newMessageCompletable = |
197 | | - appendNewMessageToSession( |
198 | | - sess, |
199 | | - newMessage, |
200 | | - invocationContext, |
201 | | - runConfig.saveInputBlobsAsArtifacts()); |
| 194 | + appendNewMessageToSession( |
| 195 | + sess, newMessage, invocationContext, runConfig.saveInputBlobsAsArtifacts()); |
202 | 196 | } |
203 | 197 |
|
204 | 198 | invocationContext.agent(this.findAgentToRun(sess, rootAgent)); |
205 | 199 | Flowable<Event> events = invocationContext.agent().runAsync(invocationContext); |
206 | | - return newMessageCompletable.andThen( |
207 | | - events.concatMapSingle(event -> this.sessionService.appendEvent(sess, event))); |
| 200 | + return events.doOnNext(event -> this.sessionService.appendEvent(sess, event)); |
208 | 201 | }) |
209 | 202 | .doOnError( |
210 | 203 | throwable -> { |
|
0 commit comments