Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 98f9575

Browse files
authored
augment bidi streaming (#540)
This allows pressure-aware streaming and reuses patterns in server streaming.
1 parent 1cbbeb6 commit 98f9575

21 files changed

+1061
-300
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.grpc;
31+
32+
import com.google.api.gax.rpc.ResponseObserver;
33+
import com.google.api.gax.rpc.StateCheckingResponseObserver;
34+
import com.google.api.gax.rpc.StreamController;
35+
import java.util.concurrent.CancellationException;
36+
37+
/** Package-private for internal use. */
38+
class ExceptionResponseObserver<RequestT, ResponseT>
39+
extends StateCheckingResponseObserver<ResponseT> {
40+
private ResponseObserver<ResponseT> innerObserver;
41+
private volatile CancellationException cancellationException;
42+
private final GrpcApiExceptionFactory exceptionFactory;
43+
44+
public ExceptionResponseObserver(
45+
ResponseObserver<ResponseT> innerObserver, GrpcApiExceptionFactory exceptionFactory) {
46+
this.innerObserver = innerObserver;
47+
this.exceptionFactory = exceptionFactory;
48+
}
49+
50+
@Override
51+
protected void onStartImpl(final StreamController controller) {
52+
innerObserver.onStart(
53+
new StreamController() {
54+
@Override
55+
public void cancel() {
56+
cancellationException = new CancellationException("User cancelled stream");
57+
controller.cancel();
58+
}
59+
60+
@Override
61+
public void disableAutoInboundFlowControl() {
62+
controller.disableAutoInboundFlowControl();
63+
}
64+
65+
@Override
66+
public void request(int count) {
67+
controller.request(count);
68+
}
69+
});
70+
}
71+
72+
@Override
73+
protected void onResponseImpl(ResponseT response) {
74+
innerObserver.onResponse(response);
75+
}
76+
77+
@Override
78+
protected void onErrorImpl(Throwable t) {
79+
if (cancellationException != null) {
80+
t = cancellationException;
81+
} else {
82+
t = exceptionFactory.create(t);
83+
}
84+
innerObserver.onError(t);
85+
}
86+
87+
@Override
88+
protected void onCompleteImpl() {
89+
innerObserver.onComplete();
90+
}
91+
}

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectBidiStreamingCallable.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@
3030
package com.google.api.gax.grpc;
3131

3232
import com.google.api.gax.rpc.ApiCallContext;
33-
import com.google.api.gax.rpc.ApiStreamObserver;
3433
import com.google.api.gax.rpc.BidiStreamingCallable;
34+
import com.google.api.gax.rpc.ClientStream;
35+
import com.google.api.gax.rpc.ClientStreamReadyObserver;
36+
import com.google.api.gax.rpc.ResponseObserver;
3537
import com.google.common.base.Preconditions;
3638
import io.grpc.ClientCall;
3739
import io.grpc.MethodDescriptor;
38-
import io.grpc.stub.ClientCalls;
3940

4041
/**
4142
* {@code GrpcDirectBidiStreamingCallable} creates bidirectional streaming gRPC calls.
@@ -53,12 +54,47 @@ class GrpcDirectBidiStreamingCallable<RequestT, ResponseT>
5354
}
5455

5556
@Override
56-
public ApiStreamObserver<RequestT> bidiStreamingCall(
57-
ApiStreamObserver<ResponseT> responseObserver, ApiCallContext context) {
57+
public ClientStream<RequestT> internalCall(
58+
ResponseObserver<ResponseT> responseObserver,
59+
final ClientStreamReadyObserver<RequestT> onReady,
60+
ApiCallContext context) {
5861
Preconditions.checkNotNull(responseObserver);
59-
ClientCall<RequestT, ResponseT> call = GrpcClientCalls.newCall(descriptor, context);
60-
return new StreamObserverDelegate<>(
61-
ClientCalls.asyncBidiStreamingCall(
62-
call, new ApiStreamObserverDelegate<>(responseObserver)));
62+
final ClientCall<RequestT, ResponseT> call = GrpcClientCalls.newCall(descriptor, context);
63+
final ClientStream<RequestT> clientStream =
64+
new ClientStream<RequestT>() {
65+
@Override
66+
public void send(RequestT request) {
67+
call.sendMessage(request);
68+
}
69+
70+
@Override
71+
public void closeSendWithError(Throwable t) {
72+
call.cancel(null, t);
73+
}
74+
75+
@Override
76+
public void closeSend() {
77+
call.halfClose();
78+
}
79+
80+
@Override
81+
public boolean isSendReady() {
82+
return call.isReady();
83+
}
84+
};
85+
86+
GrpcDirectStreamController<RequestT, ResponseT> controller =
87+
new GrpcDirectStreamController<>(
88+
call,
89+
responseObserver,
90+
new Runnable() {
91+
@Override
92+
public void run() {
93+
onReady.onReady(clientStream);
94+
}
95+
});
96+
controller.startBidi();
97+
98+
return clientStream;
6399
}
64100
}

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectStreamController.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,32 @@
4444
* <p>Package-private for internal use.
4545
*/
4646
class GrpcDirectStreamController<RequestT, ResponseT> implements StreamController {
47+
private static final Runnable NOOP_RUNNABLE =
48+
new Runnable() {
49+
@Override
50+
public void run() {}
51+
};
52+
4753
private final ClientCall<RequestT, ResponseT> clientCall;
4854
private final ResponseObserver<ResponseT> responseObserver;
55+
private final Runnable onReady;
4956
private boolean hasStarted;
5057
private boolean autoflowControl = true;
5158
private int numRequested;
5259
private volatile CancellationException cancellationException;
5360

5461
GrpcDirectStreamController(
5562
ClientCall<RequestT, ResponseT> clientCall, ResponseObserver<ResponseT> responseObserver) {
63+
this(clientCall, responseObserver, NOOP_RUNNABLE);
64+
}
65+
66+
GrpcDirectStreamController(
67+
ClientCall<RequestT, ResponseT> clientCall,
68+
ResponseObserver<ResponseT> responseObserver,
69+
Runnable onReady) {
5670
this.clientCall = clientCall;
5771
this.responseObserver = responseObserver;
72+
this.onReady = onReady;
5873
}
5974

6075
@Override
@@ -83,15 +98,22 @@ public void request(int count) {
8398
}
8499

85100
void start(RequestT request) {
101+
startCommon();
102+
clientCall.sendMessage(request);
103+
clientCall.halfClose();
104+
}
105+
106+
void startBidi() {
107+
startCommon();
108+
}
109+
110+
private void startCommon() {
86111
responseObserver.onStart(this);
87112

88113
this.hasStarted = true;
89114

90115
clientCall.start(new ResponseObserverAdapter(), new Metadata());
91116

92-
clientCall.sendMessage(request);
93-
clientCall.halfClose();
94-
95117
if (autoflowControl) {
96118
clientCall.request(1);
97119
} else if (numRequested > 0) {
@@ -127,5 +149,10 @@ public void onClose(Status status, Metadata trailers) {
127149
responseObserver.onError(status.asRuntimeException(trailers));
128150
}
129151
}
152+
153+
@Override
154+
public void onReady() {
155+
onReady.run();
156+
}
130157
}
131158
}

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcExceptionBidiStreamingCallable.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131

3232
import com.google.api.gax.rpc.ApiCallContext;
3333
import com.google.api.gax.rpc.ApiException;
34-
import com.google.api.gax.rpc.ApiStreamObserver;
3534
import com.google.api.gax.rpc.BidiStreamingCallable;
35+
import com.google.api.gax.rpc.ClientStream;
36+
import com.google.api.gax.rpc.ClientStreamReadyObserver;
37+
import com.google.api.gax.rpc.ResponseObserver;
3638
import com.google.api.gax.rpc.StatusCode;
3739
import java.util.Set;
3840

@@ -54,12 +56,11 @@ final class GrpcExceptionBidiStreamingCallable<RequestT, ResponseT>
5456
}
5557

5658
@Override
57-
public ApiStreamObserver<RequestT> bidiStreamingCall(
58-
ApiStreamObserver<ResponseT> responseObserver, ApiCallContext context) {
59-
60-
GrpcExceptionTranslatingStreamObserver<ResponseT> innerObserver =
61-
new GrpcExceptionTranslatingStreamObserver<>(responseObserver, exceptionFactory);
62-
63-
return innerCallable.bidiStreamingCall(innerObserver, context);
59+
public ClientStream<RequestT> internalCall(
60+
final ResponseObserver<ResponseT> responseObserver,
61+
ClientStreamReadyObserver<RequestT> onReady,
62+
ApiCallContext context) {
63+
return innerCallable.internalCall(
64+
new ExceptionResponseObserver<>(responseObserver, exceptionFactory), onReady, context);
6465
}
6566
}

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcExceptionServerStreamingCallable.java

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,8 @@
3333
import com.google.api.gax.rpc.ApiException;
3434
import com.google.api.gax.rpc.ResponseObserver;
3535
import com.google.api.gax.rpc.ServerStreamingCallable;
36-
import com.google.api.gax.rpc.StateCheckingResponseObserver;
3736
import com.google.api.gax.rpc.StatusCode.Code;
38-
import com.google.api.gax.rpc.StreamController;
3937
import java.util.Set;
40-
import java.util.concurrent.CancellationException;
4138

4239
/**
4340
* Transforms all {@code Throwable}s thrown during a call into an instance of {@link ApiException}.
@@ -59,57 +56,7 @@ public GrpcExceptionServerStreamingCallable(
5956
public void call(
6057
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
6158

62-
inner.call(request, new ExceptionResponseObserver(responseObserver), context);
63-
}
64-
65-
private class ExceptionResponseObserver extends StateCheckingResponseObserver<ResponseT> {
66-
private ResponseObserver<ResponseT> innerObserver;
67-
private volatile CancellationException cancellationException;
68-
69-
public ExceptionResponseObserver(ResponseObserver<ResponseT> innerObserver) {
70-
this.innerObserver = innerObserver;
71-
}
72-
73-
@Override
74-
protected void onStartImpl(final StreamController controller) {
75-
innerObserver.onStart(
76-
new StreamController() {
77-
@Override
78-
public void cancel() {
79-
cancellationException = new CancellationException("User cancelled stream");
80-
controller.cancel();
81-
}
82-
83-
@Override
84-
public void disableAutoInboundFlowControl() {
85-
controller.disableAutoInboundFlowControl();
86-
}
87-
88-
@Override
89-
public void request(int count) {
90-
controller.request(count);
91-
}
92-
});
93-
}
94-
95-
@Override
96-
protected void onResponseImpl(ResponseT response) {
97-
innerObserver.onResponse(response);
98-
}
99-
100-
@Override
101-
protected void onErrorImpl(Throwable t) {
102-
if (cancellationException != null) {
103-
t = cancellationException;
104-
} else {
105-
t = exceptionFactory.create(t);
106-
}
107-
innerObserver.onError(t);
108-
}
109-
110-
@Override
111-
protected void onCompleteImpl() {
112-
innerObserver.onComplete();
113-
}
59+
inner.call(
60+
request, new ExceptionResponseObserver<>(responseObserver, exceptionFactory), context);
11461
}
11562
}

gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void testCancelClientCall() throws Exception {
180180
latch.await(500, TimeUnit.MILLISECONDS);
181181

182182
Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class);
183-
Truth.assertThat(moneyObserver.error).hasMessage("User cancelled stream");
183+
Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream");
184184
}
185185

186186
@Test
@@ -195,7 +195,8 @@ public void testOnResponseError() throws Throwable {
195195
Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode())
196196
.isEqualTo(StatusCode.Code.INVALID_ARGUMENT);
197197
Truth.assertThat(moneyObserver.error)
198-
.hasMessage("io.grpc.StatusRuntimeException: INVALID_ARGUMENT: red must be positive");
198+
.hasMessageThat()
199+
.isEqualTo("io.grpc.StatusRuntimeException: INVALID_ARGUMENT: red must be positive");
199200
}
200201

201202
@Test
@@ -247,7 +248,7 @@ public void testBlockingServerStreaming() throws Exception {
247248
Truth.assertThat(responseData).containsExactly(expected);
248249
}
249250

250-
private static class MoneyObserver extends StateCheckingResponseObserver<Money> {
251+
static class MoneyObserver extends StateCheckingResponseObserver<Money> {
251252
private final boolean autoFlowControl;
252253
private final CountDownLatch latch;
253254

0 commit comments

Comments
 (0)