Skip to content

Commit d685c65

Browse files
committed
Fixed data race in ProxyStream
1 parent fe1b8ee commit d685c65

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,6 +1398,10 @@ private abstract class ProxyStream<R, T> implements GrpcReadStream<T> {
13981398
abstract T readValue(R message);
13991399

14001400
private void onClose(Status status, Throwable th) {
1401+
if (result.isDone()) {
1402+
return;
1403+
}
1404+
14011405
if (th != null) {
14021406
updateSessionState(th, null, false);
14031407
result.completeExceptionally(th);
@@ -1415,14 +1419,15 @@ public CompletableFuture<Status> start(Observer<T> observer) {
14151419
if (statusCode == StatusIds.StatusCode.SUCCESS) {
14161420
try {
14171421
observer.onNext(readValue(message));
1418-
} catch (Throwable t) {
1419-
result.completeExceptionally(t);
1422+
} catch (Throwable th) {
1423+
updateSessionState(th, null, false);
1424+
result.completeExceptionally(th);
14201425
origin.cancel();
14211426
}
14221427
} else {
1423-
Issue[] issues = Issue.fromPb(readIssues(message));
1424-
StatusCode code = StatusCode.fromProto(statusCode);
1425-
result.complete(Status.of(code, issues));
1428+
Status status = Status.of(StatusCode.fromProto(statusCode), Issue.fromPb(readIssues(message)));
1429+
updateSessionState(null, status.getCode(), false);
1430+
result.complete(status);
14261431
origin.cancel();
14271432
}
14281433
}).whenComplete(this::onClose);

0 commit comments

Comments
 (0)