Skip to content

Commit df461e1

Browse files
Fix HTTP/1.1 support (#521)
* Fix HTTP/1.1 support. This makes sure we correctly notify Request Response mode when using HTTP/1.1
1 parent 3259edb commit df461e1

File tree

11 files changed

+129
-83
lines changed

11 files changed

+129
-83
lines changed

examples/src/main/java/my/restate/sdk/examples/Greeter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import dev.restate.sdk.annotation.Service;
1414
import dev.restate.sdk.endpoint.Endpoint;
1515
import dev.restate.sdk.http.vertx.RestateHttpServer;
16-
import java.time.Duration;
1716

1817
@Service
1918
public class Greeter {
@@ -29,8 +28,6 @@ public GreetingResponse greet(Context ctx, Greeting req) {
2928
}
3029

3130
public static void main(String[] args) {
32-
RestateHttpServer.listen(
33-
Endpoint.bind(
34-
new Greeter(), configurator -> configurator.inactivityTimeout(Duration.ofSeconds(1))));
31+
RestateHttpServer.listen(Endpoint.bind(new Greeter()));
3532
}
3633
}

sdk-common/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import org.jetbrains.dokka.gradle.AbstractDokkaTask
2+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
23

34
plugins {
45
`java-library`
@@ -83,6 +84,7 @@ val generateVersionClass =
8384

8485
tasks {
8586
withType<JavaCompile>().configureEach { dependsOn(generateVersionClass) }
87+
withType<KotlinCompile>().configureEach { dependsOn(generateVersionClass) }
8688
withType<org.gradle.jvm.tasks.Jar>().configureEach { dependsOn(generateVersionClass) }
8789
withType<AbstractDokkaTask>().configureEach { dependsOn(generateVersionClass) }
8890
}

sdk-core/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import org.gradle.kotlin.dsl.withType
12
import org.jetbrains.dokka.gradle.AbstractDokkaTask
3+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
24

35
plugins {
46
`java-library`
@@ -100,6 +102,7 @@ protobuf { protoc { artifact = "com.google.protobuf:protoc:$protobufVersion" } }
100102

101103
tasks {
102104
withType<JavaCompile> { dependsOn(generateJsonSchema2Pojo, generateProto) }
105+
withType<KotlinCompile>().configureEach { dependsOn(generateJsonSchema2Pojo, generateProto) }
103106
withType<org.gradle.jvm.tasks.Jar>().configureEach {
104107
dependsOn(generateJsonSchema2Pojo, generateProto)
105108
}

sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java

Lines changed: 54 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import dev.restate.sdk.endpoint.definition.ServiceDefinition;
2121
import dev.restate.sdk.endpoint.definition.ServiceType;
2222
import dev.restate.serde.Serde;
23+
import java.util.List;
2324
import java.util.Objects;
2425
import java.util.function.Predicate;
2526
import java.util.stream.Collectors;
@@ -30,65 +31,63 @@ final class EndpointManifest {
3031
private static final Input EMPTY_INPUT = new Input();
3132
private static final Output EMPTY_OUTPUT = new Output().withSetContentTypeIfEmpty(false);
3233

33-
private final EndpointManifestSchema manifest;
34+
private final List<Service> services;
35+
36+
EndpointManifest(Stream<ServiceDefinition> components, boolean experimentalContextEnabled) {
37+
this.services =
38+
components
39+
.map(
40+
svc ->
41+
new Service()
42+
.withName(svc.getServiceName())
43+
.withTy(convertServiceType(svc.getServiceType()))
44+
.withDocumentation(svc.getDocumentation())
45+
.withIdempotencyRetention(
46+
svc.getIdempotencyRetention() != null
47+
? svc.getIdempotencyRetention().toMillis()
48+
: null)
49+
.withJournalRetention(
50+
svc.getJournalRetention() != null
51+
? svc.getJournalRetention().toMillis()
52+
: null)
53+
.withInactivityTimeout(
54+
svc.getInactivityTimeout() != null
55+
? svc.getInactivityTimeout().toMillis()
56+
: null)
57+
.withAbortTimeout(
58+
svc.getAbortTimeout() != null ? svc.getAbortTimeout().toMillis() : null)
59+
.withEnableLazyState(svc.getEnableLazyState())
60+
.withIngressPrivate(svc.getIngressPrivate())
61+
.withMetadata(
62+
svc.getMetadata().entrySet().stream()
63+
.reduce(
64+
new Metadata__1(),
65+
(meta, entry) ->
66+
meta.withAdditionalProperty(
67+
entry.getKey(), entry.getValue()),
68+
(m1, m2) -> {
69+
m2.getAdditionalProperties()
70+
.forEach(m1::setAdditionalProperty);
71+
return m1;
72+
}))
73+
.withHandlers(
74+
svc.getHandlers().stream()
75+
.map(EndpointManifest::convertHandler)
76+
.collect(Collectors.toList())))
77+
.collect(Collectors.toList());
78+
}
3479

35-
EndpointManifest(
36-
EndpointManifestSchema.ProtocolMode protocolMode,
37-
Stream<ServiceDefinition> components,
38-
boolean experimentalContextEnabled) {
39-
this.manifest =
80+
EndpointManifestSchema manifest(
81+
Discovery.ServiceDiscoveryProtocolVersion version,
82+
EndpointManifestSchema.ProtocolMode protocolMode) {
83+
EndpointManifestSchema manifest =
4084
new EndpointManifestSchema()
85+
.withProtocolMode(protocolMode)
4186
.withMinProtocolVersion((long) MIN_SERVICE_PROTOCOL_VERSION.getNumber())
4287
.withMaxProtocolVersion((long) MAX_SERVICE_PROTOCOL_VERSION.getNumber())
43-
.withProtocolMode(protocolMode)
44-
.withServices(
45-
components
46-
.map(
47-
svc ->
48-
new Service()
49-
.withName(svc.getServiceName())
50-
.withTy(convertServiceType(svc.getServiceType()))
51-
.withDocumentation(svc.getDocumentation())
52-
.withIdempotencyRetention(
53-
svc.getIdempotencyRetention() != null
54-
? svc.getIdempotencyRetention().toMillis()
55-
: null)
56-
.withJournalRetention(
57-
svc.getJournalRetention() != null
58-
? svc.getJournalRetention().toMillis()
59-
: null)
60-
.withInactivityTimeout(
61-
svc.getInactivityTimeout() != null
62-
? svc.getInactivityTimeout().toMillis()
63-
: null)
64-
.withAbortTimeout(
65-
svc.getAbortTimeout() != null
66-
? svc.getAbortTimeout().toMillis()
67-
: null)
68-
.withEnableLazyState(svc.getEnableLazyState())
69-
.withIngressPrivate(svc.getIngressPrivate())
70-
.withMetadata(
71-
svc.getMetadata().entrySet().stream()
72-
.reduce(
73-
new Metadata__1(),
74-
(meta, entry) ->
75-
meta.withAdditionalProperty(
76-
entry.getKey(), entry.getValue()),
77-
(m1, m2) -> {
78-
m2.getAdditionalProperties()
79-
.forEach(m1::setAdditionalProperty);
80-
return m1;
81-
}))
82-
.withHandlers(
83-
svc.getHandlers().stream()
84-
.map(EndpointManifest::convertHandler)
85-
.collect(Collectors.toList())))
86-
.collect(Collectors.toList()));
87-
}
88-
89-
EndpointManifestSchema manifest(Discovery.ServiceDiscoveryProtocolVersion version) {
88+
.withServices(this.services);
9089
// Verify that the user didn't set fields that we don't support in the discovery version we set
91-
for (var service : this.manifest.getServices()) {
90+
for (var service : manifest.getServices()) {
9291
if (version.getNumber() < Discovery.ServiceDiscoveryProtocolVersion.V2.getNumber()) {
9392
verifyFieldNotSet(
9493
"metadata",
@@ -121,7 +120,7 @@ EndpointManifestSchema manifest(Discovery.ServiceDiscoveryProtocolVersion versio
121120
}
122121
}
123122

124-
return this.manifest;
123+
return manifest;
125124
}
126125

127126
private static Service.Ty convertServiceType(ServiceType serviceType) {

sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,36 @@ public String get(@Nullable HeadersAccessor carrier, @NonNull String key) {
5454

5555
private final Endpoint endpoint;
5656
private final EndpointManifest deploymentManifest;
57+
private final boolean deprecatedSupportsBidirectionalStreaming;
5758

5859
private EndpointRequestHandler(
59-
EndpointManifestSchema.ProtocolMode protocolMode, Endpoint endpoint) {
60+
EndpointManifestSchema.@Nullable ProtocolMode protocolMode, Endpoint endpoint) {
6061
this.endpoint = endpoint;
6162
this.deploymentManifest =
6263
new EndpointManifest(
63-
protocolMode,
64-
this.endpoint.getServiceDefinitions(),
65-
this.endpoint.isExperimentalContextEnabled());
64+
this.endpoint.getServiceDefinitions(), this.endpoint.isExperimentalContextEnabled());
65+
this.deprecatedSupportsBidirectionalStreaming =
66+
protocolMode != EndpointManifestSchema.ProtocolMode.REQUEST_RESPONSE;
6667
}
6768

69+
public static EndpointRequestHandler create(Endpoint endpoint) {
70+
return new EndpointRequestHandler(null, endpoint);
71+
}
72+
73+
/**
74+
* @deprecated The protocol mode is now established on request basis, use {@link
75+
* #create(Endpoint)} instead.
76+
*/
77+
@Deprecated
6878
public static EndpointRequestHandler forBidiStream(Endpoint endpoint) {
6979
return new EndpointRequestHandler(EndpointManifestSchema.ProtocolMode.BIDI_STREAM, endpoint);
7080
}
7181

82+
/**
83+
* @deprecated The protocol mode is now established on request basis, use {@link
84+
* #create(Endpoint)} instead.
85+
*/
86+
@Deprecated
7287
public static EndpointRequestHandler forRequestResponse(Endpoint endpoint) {
7388
return new EndpointRequestHandler(
7489
EndpointManifestSchema.ProtocolMode.REQUEST_RESPONSE, endpoint);
@@ -93,17 +108,37 @@ public interface LoggingContextSetter {
93108
void set(String key, String value);
94109
}
95110

111+
/**
112+
* @deprecated Use {@link #processorForRequest(String, HeadersAccessor, LoggingContextSetter,
113+
* Executor, boolean)} instead.
114+
*/
115+
@Deprecated
116+
public RequestProcessor processorForRequest(
117+
String path,
118+
HeadersAccessor headersAccessor,
119+
LoggingContextSetter loggingContextSetter,
120+
Executor coreExecutor)
121+
throws ProtocolException {
122+
return processorForRequest(
123+
path,
124+
headersAccessor,
125+
loggingContextSetter,
126+
coreExecutor,
127+
this.deprecatedSupportsBidirectionalStreaming);
128+
}
129+
96130
/**
97131
* @param coreExecutor This executor MUST serialize the execution of all scheduled tasks. For
98132
* example {@link Executors#newSingleThreadExecutor()} can be used.
133+
* @param supportsBidirectionalStreaming true if the server supports bidirectional streaming.
99134
* @return The request processor
100-
* @throws ProtocolException in
101135
*/
102136
public RequestProcessor processorForRequest(
103137
String path,
104138
HeadersAccessor headersAccessor,
105139
LoggingContextSetter loggingContextSetter,
106-
Executor coreExecutor)
140+
Executor coreExecutor,
141+
boolean supportsBidirectionalStreaming)
107142
throws ProtocolException {
108143
if (path.endsWith(HEALTH_PATH)) {
109144
return new StaticResponseRequestProcessor(200, "text/plain", Slice.wrap("OK"));
@@ -120,7 +155,7 @@ public RequestProcessor processorForRequest(
120155

121156
// Discovery request
122157
if (path.endsWith(DISCOVER_PATH)) {
123-
return this.handleDiscoveryRequest(headersAccessor);
158+
return this.handleDiscoveryRequest(supportsBidirectionalStreaming, headersAccessor);
124159
}
125160

126161
// Parse request
@@ -142,7 +177,6 @@ public RequestProcessor processorForRequest(
142177
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);
143178

144179
// Resolve the service method definition
145-
@SuppressWarnings("unchecked")
146180
ServiceDefinition svc = this.endpoint.resolveService(serviceName);
147181
if (svc == null) {
148182
throw ProtocolException.methodNotFound(serviceName, handlerName);
@@ -182,7 +216,8 @@ public RequestProcessor processorForRequest(
182216
coreExecutor);
183217
}
184218

185-
StaticResponseRequestProcessor handleDiscoveryRequest(HeadersAccessor headersAccessor)
219+
StaticResponseRequestProcessor handleDiscoveryRequest(
220+
boolean supportsBidirectionalStreaming, HeadersAccessor headersAccessor)
186221
throws ProtocolException {
187222
String acceptContentType = headersAccessor.get(ACCEPT);
188223

@@ -195,7 +230,12 @@ StaticResponseRequestProcessor handleDiscoveryRequest(HeadersAccessor headersAcc
195230
ProtocolException.UNSUPPORTED_MEDIA_TYPE_CODE);
196231
}
197232

198-
EndpointManifestSchema response = this.deploymentManifest.manifest(version);
233+
EndpointManifestSchema response =
234+
this.deploymentManifest.manifest(
235+
version,
236+
supportsBidirectionalStreaming
237+
? EndpointManifestSchema.ProtocolMode.BIDI_STREAM
238+
: EndpointManifestSchema.ProtocolMode.REQUEST_RESPONSE);
199239
LOG.info(
200240
"Replying to discovery request with services [{}]",
201241
response.getServices().stream().map(Service::getName).collect(Collectors.joining(",")));

sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,10 @@ public static EndpointManifestSchemaAssert assertThatDiscovery(Endpoint.Builder
9292

9393
public static EndpointManifestSchemaAssert assertThatDiscovery(Endpoint endpoint) {
9494
return new EndpointManifestSchemaAssert(
95-
new EndpointManifest(
96-
EndpointManifestSchema.ProtocolMode.BIDI_STREAM,
97-
endpoint.getServiceDefinitions(),
98-
true)
99-
.manifest(DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION),
95+
new EndpointManifest(endpoint.getServiceDefinitions(), true)
96+
.manifest(
97+
DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION,
98+
EndpointManifestSchema.ProtocolMode.BIDI_STREAM),
10099
EndpointManifestSchemaAssert.class);
101100
}
102101

sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ class ComponentDiscoveryHandlerTest {
2727
void handleWithMultipleServices() {
2828
EndpointManifest deploymentManifest =
2929
new EndpointManifest(
30-
EndpointManifestSchema.ProtocolMode.REQUEST_RESPONSE,
3130
Stream.of(
3231
ServiceDefinition.of(
3332
"MyGreeter",
@@ -38,7 +37,9 @@ void handleWithMultipleServices() {
3837
false);
3938

4039
EndpointManifestSchema manifest =
41-
deploymentManifest.manifest(DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION);
40+
deploymentManifest.manifest(
41+
DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION,
42+
EndpointManifestSchema.ProtocolMode.REQUEST_RESPONSE);
4243

4344
assertThat(manifest.getServices()).extracting(Service::getName).containsOnly("MyGreeter");
4445
assertThat(manifest.getProtocolMode())

sdk-core/src/test/java/dev/restate/sdk/core/MockBidiStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void executeTest(TestDefinitions.TestDefinition definition) {
5353
if (definition.isEnablePreviewContext()) {
5454
builder.enablePreviewContext();
5555
}
56-
EndpointRequestHandler server = EndpointRequestHandler.forBidiStream(builder.build());
56+
EndpointRequestHandler server = EndpointRequestHandler.create(builder.build());
5757

5858
// Start invocation
5959
RequestProcessor handler =
@@ -62,7 +62,8 @@ public void executeTest(TestDefinitions.TestDefinition definition) {
6262
HeadersAccessor.wrap(
6363
Map.of("content-type", ProtoUtils.serviceProtocolContentTypeHeader())),
6464
EndpointRequestHandler.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
65-
coreExecutor);
65+
coreExecutor,
66+
true);
6667

6768
// Wire invocation
6869
AssertSubscriber<Slice> assertSubscriber = AssertSubscriber.create(Long.MAX_VALUE);

sdk-core/src/test/java/dev/restate/sdk/core/MockRequestResponse.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void executeTest(TestDefinition definition) {
5151
if (definition.isEnablePreviewContext()) {
5252
builder.enablePreviewContext();
5353
}
54-
EndpointRequestHandler server = EndpointRequestHandler.forRequestResponse(builder.build());
54+
EndpointRequestHandler server = EndpointRequestHandler.create(builder.build());
5555

5656
// Start invocation
5757
RequestProcessor handler =
@@ -60,7 +60,8 @@ public void executeTest(TestDefinition definition) {
6060
HeadersAccessor.wrap(
6161
Map.of("content-type", ProtoUtils.serviceProtocolContentTypeHeader())),
6262
EndpointRequestHandler.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
63-
syscallsExecutor);
63+
syscallsExecutor,
64+
false);
6465

6566
// Wire invocation
6667
AssertSubscriber<Slice> assertSubscriber = AssertSubscriber.create(Long.MAX_VALUE);

0 commit comments

Comments
 (0)