Skip to content

Commit ad951ba

Browse files
Mateusz Krawieccopybara-github
authored andcommitted
refactor: avoid using Flowable.fromFuture() which uses blocking get under the hood
PiperOrigin-RevId: 856180759
1 parent 3bf183a commit ad951ba

File tree

3 files changed

+52
-12
lines changed

3 files changed

+52
-12
lines changed

core/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
<artifactId>docker-java-transport-httpclient5</artifactId>
5656
</dependency>
5757
<dependency>
58-
<groupId> io.modelcontextprotocol.sdk</groupId>
58+
<groupId>io.modelcontextprotocol.sdk</groupId>
5959
<artifactId>mcp</artifactId>
6060
</dependency>
6161
<dependency>
@@ -189,6 +189,10 @@
189189
<artifactId>opentelemetry-sdk-testing</artifactId>
190190
<scope>test</scope>
191191
</dependency>
192+
<dependency>
193+
<groupId>net.javacrumbs.future-converter</groupId>
194+
<artifactId>future-converter-java8-guava</artifactId>
195+
</dependency>
192196
</dependencies>
193197
<build>
194198
<resources>

core/src/main/java/com/google/adk/models/Gemini.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package com.google.adk.models;
1818

1919
import static com.google.common.base.StandardSystemProperty.JAVA_VERSION;
20+
import static net.javacrumbs.futureconverter.java8guava.FutureConverter.toListenableFuture;
2021

2122
import com.google.adk.Version;
2223
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.util.concurrent.Futures;
25+
import com.google.common.util.concurrent.ListenableFuture;
2326
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2427
import com.google.genai.Client;
2528
import com.google.genai.ResponseStream;
@@ -32,11 +35,14 @@
3235
import com.google.genai.types.LiveConnectConfig;
3336
import com.google.genai.types.Part;
3437
import io.reactivex.rxjava3.core.Flowable;
38+
import io.reactivex.rxjava3.core.Scheduler;
39+
import io.reactivex.rxjava3.core.Single;
40+
import io.reactivex.rxjava3.schedulers.Schedulers;
3541
import java.util.ArrayList;
3642
import java.util.List;
3743
import java.util.Objects;
3844
import java.util.Optional;
39-
import java.util.concurrent.CompletableFuture;
45+
import java.util.concurrent.ExecutionException;
4046
import org.slf4j.Logger;
4147
import org.slf4j.LoggerFactory;
4248

@@ -205,6 +211,23 @@ public Gemini build() {
205211
}
206212
}
207213

214+
private static <T> Single<T> toSingle(ListenableFuture<T> future, Scheduler scheduler) {
215+
return Single.create(
216+
emitter -> {
217+
future.addListener(
218+
() -> {
219+
try {
220+
emitter.onSuccess(Futures.getDone(future));
221+
} catch (ExecutionException e) {
222+
emitter.onError(e.getCause());
223+
}
224+
},
225+
scheduler::scheduleDirect);
226+
227+
emitter.setCancellable(() -> future.cancel(false));
228+
});
229+
}
230+
208231
@Override
209232
public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stream) {
210233
llmRequest =
@@ -218,14 +241,17 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
218241

219242
if (stream) {
220243
logger.debug("Sending streaming generateContent request to model {}", effectiveModelName);
221-
CompletableFuture<ResponseStream<GenerateContentResponse>> streamFuture =
222-
apiClient.async.models.generateContentStream(
223-
effectiveModelName, llmRequest.contents(), config);
244+
ListenableFuture<ResponseStream<GenerateContentResponse>> streamFuture =
245+
toListenableFuture(
246+
apiClient.async.models.generateContentStream(
247+
effectiveModelName, llmRequest.contents(), config));
224248

225249
return Flowable.defer(
226250
() ->
227251
processRawResponses(
228-
Flowable.fromFuture(streamFuture).flatMapIterable(iterable -> iterable)))
252+
toSingle(streamFuture, Schedulers.io())
253+
.toFlowable()
254+
.flatMapIterable(iterable -> iterable)))
229255
.filter(
230256
llmResponse ->
231257
llmResponse
@@ -243,12 +269,16 @@ public Flowable<LlmResponse> generateContent(LlmRequest llmRequest, boolean stre
243269
.orElse(false));
244270
} else {
245271
logger.debug("Sending generateContent request to model {}", effectiveModelName);
246-
return Flowable.fromFuture(
247-
apiClient
248-
.async
249-
.models
250-
.generateContent(effectiveModelName, llmRequest.contents(), config)
251-
.thenApplyAsync(LlmResponse::create));
272+
final LlmRequest finalLlmRequest = llmRequest;
273+
return toSingle(
274+
toListenableFuture(
275+
apiClient
276+
.async
277+
.models
278+
.generateContent(effectiveModelName, finalLlmRequest.contents(), config)
279+
.thenApplyAsync(LlmResponse::create)),
280+
Schedulers.io())
281+
.toFlowable();
252282
}
253283
}
254284

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
<anthropic.version>1.4.0</anthropic.version>
7373
<maven-plugin-tools.version>3.9.0</maven-plugin-tools.version>
7474
<httpclient5.version>5.4.3</httpclient5.version>
75+
<future-converter-java8-guava.version>1.2.0</future-converter-java8-guava.version>
7576
</properties>
7677

7778
<dependencyManagement>
@@ -274,6 +275,11 @@
274275
<artifactId>assertj-core</artifactId>
275276
<version>${assertj.version}</version>
276277
</dependency>
278+
<dependency>
279+
<groupId>net.javacrumbs.future-converter</groupId>
280+
<artifactId>future-converter-java8-guava</artifactId>
281+
<version>${future-converter-java8-guava.version}</version>
282+
</dependency>
277283
</dependencies>
278284
</dependencyManagement>
279285

0 commit comments

Comments
 (0)