Skip to content

Commit 2aba3f9

Browse files
Expose API to disable bidirectional streaming (#522)
1 parent df461e1 commit 2aba3f9

File tree

2 files changed

+39
-4
lines changed

2 files changed

+39
-4
lines changed

sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/HttpEndpointRequestHandler.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package dev.restate.sdk.http.vertx;
1010

1111
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
12-
import static io.netty.handler.codec.http.HttpResponseStatus.*;
1312

1413
import dev.restate.sdk.core.EndpointRequestHandler;
1514
import dev.restate.sdk.core.ProtocolException;
@@ -40,9 +39,11 @@ public class HttpEndpointRequestHandler implements Handler<HttpServerRequest> {
4039
AsciiString.cached(Version.X_RESTATE_SERVER);
4140

4241
private final EndpointRequestHandler endpoint;
42+
private final boolean enableBidirectionalStreaming;
4343

44-
private HttpEndpointRequestHandler(Endpoint endpoint) {
44+
private HttpEndpointRequestHandler(Endpoint endpoint, boolean enableBidirectionalStreaming) {
4545
this.endpoint = EndpointRequestHandler.create(endpoint);
46+
this.enableBidirectionalStreaming = enableBidirectionalStreaming;
4647
}
4748

4849
@Override
@@ -68,7 +69,7 @@ public Iterable<String> keys() {
6869
},
6970
ContextualData::put,
7071
currentContextExecutor(vertxCurrentContext),
71-
request.version() == HttpVersion.HTTP_2);
72+
enableBidirectionalStreaming && request.version() == HttpVersion.HTTP_2);
7273
} catch (ProtocolException e) {
7374
LOG.warn("Error when handling the request", e);
7475
request
@@ -101,7 +102,29 @@ private Executor currentContextExecutor(Context currentContext) {
101102
return runnable -> currentContext.runOnContext(v -> runnable.run());
102103
}
103104

105+
/**
106+
* Create a {@link HttpEndpointRequestHandler}
107+
*
108+
* @param endpoint the endpoint to wrap
109+
* @return the built handler
110+
*/
104111
public static HttpEndpointRequestHandler fromEndpoint(Endpoint endpoint) {
105-
return new HttpEndpointRequestHandler(endpoint);
112+
return new HttpEndpointRequestHandler(endpoint, true);
113+
}
114+
115+
/**
116+
* Create a {@link HttpEndpointRequestHandler}
117+
*
118+
* @param endpoint the endpoint to wrap
119+
* @param disableBidirectionalStreaming if true, disable bidirectional streaming with HTTP/2
120+
* requests. Restate initiates for each invocation a bidirectional streaming using HTTP/2
121+
* between restate-server and the SDK. In some network setups, for example when using a load
122+
* balancers that buffer request/response, bidirectional streaming will not work correctly.
123+
* Only in these scenarios, we suggest disabling bidirectional streaming.
124+
* @return the built handler
125+
*/
126+
public static HttpEndpointRequestHandler fromEndpoint(
127+
Endpoint endpoint, boolean disableBidirectionalStreaming) {
128+
return new HttpEndpointRequestHandler(endpoint, !disableBidirectionalStreaming);
106129
}
107130
}

sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RestateHttpServer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ public static int listen(Endpoint.Builder endpointBuilder, int port) {
8383
return listen(endpointBuilder.build(), port);
8484
}
8585

86+
/** Like {@link #listen(Endpoint)}, with an already built request handler */
87+
public static int listen(HttpEndpointRequestHandler requestHandler) {
88+
return listen(requestHandler, DEFAULT_PORT);
89+
}
90+
91+
/** Like {@link #listen(Endpoint, int)}, with an already built request handler */
92+
public static int listen(HttpEndpointRequestHandler requestHandler, int port) {
93+
HttpServer server = Vertx.vertx().createHttpServer(DEFAULT_OPTIONS);
94+
server.requestHandler(requestHandler);
95+
return handleStart(server.listen(port));
96+
}
97+
8698
/** Create a Vert.x {@link HttpServer} from the provided endpoint. */
8799
public static HttpServer fromEndpoint(Endpoint endpoint) {
88100
return fromEndpoint(endpoint, DEFAULT_OPTIONS);

0 commit comments

Comments
 (0)