Skip to content

Commit d4458b1

Browse files
committed
Depracate incorrect grpc stream proxy implementation
1 parent 35af522 commit d4458b1

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

core/src/main/java/tech/ydb/core/impl/call/ProxyReadStream.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* @param <BaseR> type of origin stream message
1212
* @param <DestR> new stream message type
1313
*/
14+
@Deprecated
1415
public class ProxyReadStream<BaseR, DestR> implements GrpcReadStream<DestR> {
1516
public interface MessageFunctor<BaseR, DestR> {
1617
void apply(BaseR message, CompletableFuture<Status> promise, Observer<DestR> observer);
@@ -25,18 +26,19 @@ public ProxyReadStream(GrpcReadStream<BaseR> origin, MessageFunctor<BaseR, DestR
2526
this.functor = functor;
2627
}
2728

29+
protected void onClose(Status status, Throwable th) {
30+
// promise may be completed by functor and in that case this code will be ignored
31+
if (th != null) {
32+
future.completeExceptionally(th);
33+
}
34+
if (status != null) {
35+
future.complete(status);
36+
}
37+
}
38+
2839
@Override
2940
public CompletableFuture<Status> start(Observer<DestR> observer) {
30-
origin.start(response -> functor.apply(response, future, observer)).whenComplete((status, th) -> {
31-
// promise may be completed by functor and in that case this code will be ignored
32-
if (th != null) {
33-
future.completeExceptionally(th);
34-
}
35-
if (status != null) {
36-
future.complete(status);
37-
}
38-
});
39-
41+
origin.start(response -> functor.apply(response, future, observer)).whenComplete(this::onClose);
4042
return future;
4143
}
4244

0 commit comments

Comments
 (0)