Skip to content

Commit 79fc8a6

Browse files
google-genai-botcopybara-github
authored andcommitted
Remove blocking calls from Runner and GcsArtifactService
PiperOrigin-RevId: 776723027
1 parent bab4bd9 commit 79fc8a6

File tree

2 files changed

+63
-59
lines changed

2 files changed

+63
-59
lines changed

core/src/main/java/com/google/adk/artifacts/GcsArtifactService.java

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -75,62 +75,63 @@ private String getBlobName(
7575
@Override
7676
public Single<Integer> saveArtifact(
7777
String appName, String userId, String sessionId, String filename, Part artifact) {
78-
ImmutableList<Integer> versions =
79-
listVersions(appName, userId, sessionId, filename).blockingGet();
80-
int nextVersion = versions.isEmpty() ? 0 : max(versions) + 1;
81-
82-
String blobName = getBlobName(appName, userId, sessionId, filename, nextVersion);
83-
BlobId blobId = BlobId.of(bucketName, blobName);
84-
85-
BlobInfo blobInfo =
86-
BlobInfo.newBuilder(blobId)
87-
.setContentType(artifact.inlineData().get().mimeType().orElse(null))
88-
.build();
89-
90-
try {
91-
byte[] dataToSave =
92-
artifact
93-
.inlineData()
94-
.get()
95-
.data()
96-
.orElseThrow(
97-
() -> new IllegalArgumentException("Saveable artifact data must be non-empty."));
98-
storageClient.create(blobInfo, dataToSave);
99-
return Single.just(nextVersion);
100-
} catch (StorageException e) {
101-
throw new VerifyException("Failed to save artifact to GCS", e);
102-
}
78+
return listVersions(appName, userId, sessionId, filename)
79+
.map(versions -> versions.isEmpty() ? 0 : max(versions) + 1)
80+
.map(
81+
nextVersion -> {
82+
String blobName = getBlobName(appName, userId, sessionId, filename, nextVersion);
83+
BlobId blobId = BlobId.of(bucketName, blobName);
84+
85+
BlobInfo blobInfo =
86+
BlobInfo.newBuilder(blobId)
87+
.setContentType(artifact.inlineData().get().mimeType().orElse(null))
88+
.build();
89+
90+
try {
91+
byte[] dataToSave =
92+
artifact
93+
.inlineData()
94+
.get()
95+
.data()
96+
.orElseThrow(
97+
() ->
98+
new IllegalArgumentException(
99+
"Saveable artifact data must be non-empty."));
100+
storageClient.create(blobInfo, dataToSave);
101+
return nextVersion;
102+
} catch (StorageException e) {
103+
throw new VerifyException("Failed to save artifact to GCS", e);
104+
}
105+
});
103106
}
104107

105108
@Override
106109
public Maybe<Part> loadArtifact(
107110
String appName, String userId, String sessionId, String filename, Optional<Integer> version) {
108-
int versionToLoad;
109-
if (version.isPresent()) {
110-
versionToLoad = version.get();
111-
} else {
112-
ImmutableList<Integer> versions =
113-
listVersions(appName, userId, sessionId, filename).blockingGet();
114-
if (versions.isEmpty()) {
115-
return Maybe.empty();
116-
}
117-
versionToLoad = max(versions);
118-
}
119-
120-
String blobName = getBlobName(appName, userId, sessionId, filename, versionToLoad);
121-
BlobId blobId = BlobId.of(bucketName, blobName);
122-
123-
try {
124-
Blob blob = storageClient.get(blobId);
125-
if (blob == null || !blob.exists()) {
126-
return Maybe.empty();
127-
}
128-
byte[] data = blob.getContent();
129-
String mimeType = blob.getContentType();
130-
return Maybe.just(Part.fromBytes(data, mimeType));
131-
} catch (StorageException e) {
132-
return Maybe.empty();
133-
}
111+
return version
112+
.map(Maybe::just)
113+
.orElseGet(
114+
() ->
115+
listVersions(appName, userId, sessionId, filename)
116+
.flatMapMaybe(
117+
versions -> versions.isEmpty() ? Maybe.empty() : Maybe.just(max(versions))))
118+
.flatMap(
119+
versionToLoad -> {
120+
String blobName = getBlobName(appName, userId, sessionId, filename, versionToLoad);
121+
BlobId blobId = BlobId.of(bucketName, blobName);
122+
123+
try {
124+
Blob blob = storageClient.get(blobId);
125+
if (blob == null || !blob.exists()) {
126+
return Maybe.empty();
127+
}
128+
byte[] data = blob.getContent();
129+
String mimeType = blob.getContentType();
130+
return Maybe.just(Part.fromBytes(data, mimeType));
131+
} catch (StorageException e) {
132+
return Maybe.empty();
133+
}
134+
});
134135
}
135136

136137
@Override

core/src/main/java/com/google/adk/runner/Runner.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -269,14 +269,17 @@ public Flowable<Event> runLive(
269269

270270
public Flowable<Event> runLive(
271271
String userId, String sessionId, LiveRequestQueue liveRequestQueue, RunConfig runConfig) {
272-
Session session =
273-
this.sessionService.getSession(appName, userId, sessionId, Optional.empty()).blockingGet();
274-
if (session == null) {
275-
return Flowable.error(
276-
new IllegalArgumentException(
277-
String.format("Session not found: %s for user %s", sessionId, userId)));
278-
}
279-
return this.runLive(session, liveRequestQueue, runConfig);
272+
return this.sessionService
273+
.getSession(appName, userId, sessionId, Optional.empty())
274+
.flatMapPublisher(
275+
session -> {
276+
if (session == null) {
277+
return Flowable.error(
278+
new IllegalArgumentException(
279+
String.format("Session not found: %s for user %s", sessionId, userId)));
280+
}
281+
return this.runLive(session, liveRequestQueue, runConfig);
282+
});
280283
}
281284

282285
public Flowable<Event> runWithSessionId(

0 commit comments

Comments
 (0)