Skip to content

Commit 665f394

Browse files
Protocol V6 + random_seed support (#542)
1 parent 3393460 commit 665f394

File tree

14 files changed

+71
-30
lines changed

14 files changed

+71
-30
lines changed

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/InvocationIdImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313
import java.security.MessageDigest;
1414
import java.security.NoSuchAlgorithmException;
1515
import java.util.Objects;
16+
import org.jspecify.annotations.Nullable;
1617

1718
final class InvocationIdImpl implements InvocationId {
1819

1920
private final String id;
2021
private Long seed;
2122

22-
InvocationIdImpl(String debugId) {
23+
InvocationIdImpl(String debugId, @Nullable Long seed) {
2324
this.id = debugId;
24-
this.seed = null;
25+
// If random seed null, it will be computed
26+
this.seed = seed;
2527
}
2628

2729
@Override

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ReplayingState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public StateMachine.Input processInputCommand(StateContext stateContext) {
101101

102102
//noinspection unchecked
103103
return new StateMachine.Input(
104-
new InvocationIdImpl(stateContext.getStartInfo().debugId()),
104+
new InvocationIdImpl(
105+
stateContext.getStartInfo().debugId(), stateContext.getStartInfo().randomSeed()),
105106
byteStringToSlice(inputCommandMessage.getValue().getContent()),
106107
Map.ofEntries(
107108
inputCommandMessage.getHeadersList().stream()

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/ServiceProtocol.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class ServiceProtocol {
1515
public static final Protocol.ServiceProtocolVersion MIN_SERVICE_PROTOCOL_VERSION =
1616
Protocol.ServiceProtocolVersion.V5;
1717
public static final Protocol.ServiceProtocolVersion MAX_SERVICE_PROTOCOL_VERSION =
18-
Protocol.ServiceProtocolVersion.V5;
18+
Protocol.ServiceProtocolVersion.V6;
1919

2020
static final String CONTENT_TYPE = "content-type";
2121

@@ -40,6 +40,9 @@ static Protocol.ServiceProtocolVersion parseServiceProtocolVersion(String versio
4040
if (version.equals("application/vnd.restate.invocation.v5")) {
4141
return Protocol.ServiceProtocolVersion.V5;
4242
}
43+
if (version.equals("application/vnd.restate.invocation.v6")) {
44+
return Protocol.ServiceProtocolVersion.V6;
45+
}
4346
return Protocol.ServiceProtocolVersion.SERVICE_PROTOCOL_VERSION_UNSPECIFIED;
4447
}
4548

@@ -59,6 +62,9 @@ static String serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersio
5962
if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V5) {
6063
return "application/vnd.restate.invocation.v5";
6164
}
65+
if (Objects.requireNonNull(version) == Protocol.ServiceProtocolVersion.V6) {
66+
return "application/vnd.restate.invocation.v6";
67+
}
6268
throw new IllegalArgumentException(
6369
String.format("Service protocol version '%s' has no header value", version.getNumber()));
6470
}

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StartInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ record StartInfo(
1717
String objectKey,
1818
int entriesToReplay,
1919
int retryCountSinceLastStoredEntry,
20-
Duration durationSinceLastStoredEntry) {}
20+
Duration durationSinceLastStoredEntry,
21+
Long randomSeed) {}

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateContext.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,33 @@
1010

1111
import com.google.protobuf.MessageLite;
1212
import dev.restate.sdk.core.EndpointRequestHandler;
13+
import dev.restate.sdk.core.generated.protocol.Protocol;
1314
import java.util.Objects;
1415
import java.util.concurrent.Flow;
1516

1617
final class StateContext {
1718

19+
private final Protocol.ServiceProtocolVersion negotiatedProtocolVersion;
1820
private final StateHolder stateHolder;
1921
private final Journal journal;
2022
private EagerState eagerState;
2123
private transient StartInfo startInfo;
2224
private boolean inputClosed;
2325
private Flow.Subscriber<MessageLite> outputSubscriber;
2426

25-
StateContext(EndpointRequestHandler.LoggingContextSetter loggingContextSetter) {
27+
StateContext(
28+
EndpointRequestHandler.LoggingContextSetter loggingContextSetter,
29+
Protocol.ServiceProtocolVersion negotiatedProtocolVersion) {
2630
this.stateHolder = new StateHolder(loggingContextSetter);
31+
this.negotiatedProtocolVersion = negotiatedProtocolVersion;
2732
this.journal = new Journal();
2833
this.inputClosed = false;
2934
}
3035

36+
public Protocol.ServiceProtocolVersion getNegotiatedProtocolVersion() {
37+
return negotiatedProtocolVersion;
38+
}
39+
3140
public State getCurrentState() {
3241
return stateHolder.getState();
3342
}

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ class StateMachineImpl implements StateMachine {
3636
private static final String AWAKEABLE_IDENTIFIER_PREFIX = "sign_1";
3737
private static final int CANCEL_SIGNAL_ID = 1;
3838

39-
private final Protocol.ServiceProtocolVersion serviceProtocolVersion;
40-
4139
// Callbacks
4240
private final CompletableFuture<Void> waitForReadyFuture = new CompletableFuture<>();
4341
private CompletableFuture<Void> waitNextProcessedInput;
@@ -54,17 +52,16 @@ class StateMachineImpl implements StateMachine {
5452
EndpointRequestHandler.LoggingContextSetter loggingContextSetter) {
5553
String contentTypeHeader = headersAccessor.get(ServiceProtocol.CONTENT_TYPE);
5654

57-
this.serviceProtocolVersion = ServiceProtocol.parseServiceProtocolVersion(contentTypeHeader);
58-
59-
if (!ServiceProtocol.isSupported(this.serviceProtocolVersion)) {
55+
var serviceProtocolVersion = ServiceProtocol.parseServiceProtocolVersion(contentTypeHeader);
56+
if (!ServiceProtocol.isSupported(serviceProtocolVersion)) {
6057
throw new ProtocolException(
6158
String.format(
6259
"Service endpoint does not support the service protocol version '%s'.",
6360
contentTypeHeader),
6461
ProtocolException.UNSUPPORTED_MEDIA_TYPE_CODE);
6562
}
6663

67-
this.stateContext = new StateContext(loggingContextSetter);
64+
this.stateContext = new StateContext(loggingContextSetter, serviceProtocolVersion);
6865
}
6966

7067
// -- Few callbacks
@@ -176,7 +173,8 @@ public void onComplete() {
176173

177174
@Override
178175
public String getResponseContentType() {
179-
return ServiceProtocol.serviceProtocolVersionToHeaderValue(serviceProtocolVersion);
176+
return ServiceProtocol.serviceProtocolVersionToHeaderValue(
177+
stateContext.getNegotiatedProtocolVersion());
180178
}
181179

182180
@Override

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/WaitingStartState.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ public void onNewMessage(
4141
startMessage.getKey(),
4242
startMessage.getKnownEntries(),
4343
startMessage.getRetryCountSinceLastStoredEntry(),
44-
Duration.ofMillis(startMessage.getDurationSinceLastStoredEntry())));
44+
Duration.ofMillis(startMessage.getDurationSinceLastStoredEntry()),
45+
// Random seed from start message will be set only if protocol >= 6
46+
stateContext.getNegotiatedProtocolVersion().getNumber()
47+
>= Protocol.ServiceProtocolVersion.V6_VALUE
48+
? startMessage.getRandomSeed()
49+
: null));
4550
stateContext.setEagerState(new EagerState(startMessage));
4651

4752
// Tracing and logging setup

sdk-core/src/main/service-protocol/dev/restate/service/protocol.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ enum ServiceProtocolVersion {
3232
// * New command to attach to existing invocation
3333
// * New command to get output of existing invocation
3434
V5 = 5;
35+
// Added:
36+
// * StartMessage.random_seed
37+
V6 = 6;
3538
}
3639

3740
// --- Core frames ---
@@ -73,6 +76,10 @@ message StartMessage {
7376
// Please note this duration might not be accurate,
7477
// and might change depending on which Restate replica executes the request.
7578
uint64 duration_since_last_stored_entry = 8;
79+
80+
// Random seed to use to seed the deterministic RNG exposed in the context API.
81+
// This will be stable across restarts.
82+
uint64 random_seed = 9;
7683
}
7784

7885
// Type: 0x0000 + 1

sdk-core/src/test/java/dev/restate/sdk/core/MockBidiStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public void executeTest(TestDefinitions.TestDefinition definition) {
6060
server.processorForRequest(
6161
"/" + serviceDefinition.getServiceName() + "/" + definition.getMethod(),
6262
HeadersAccessor.wrap(
63-
Map.of("content-type", ProtoUtils.serviceProtocolContentTypeHeader())),
63+
Map.of(
64+
"content-type",
65+
ProtoUtils.serviceProtocolContentTypeHeader(
66+
definition.isEnablePreviewContext()))),
6467
EndpointRequestHandler.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
6568
coreExecutor,
6669
true);

sdk-core/src/test/java/dev/restate/sdk/core/MockRequestResponse.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ public void executeTest(TestDefinition definition) {
5858
server.processorForRequest(
5959
"/" + serviceDefinition.getServiceName() + "/" + definition.getMethod(),
6060
HeadersAccessor.wrap(
61-
Map.of("content-type", ProtoUtils.serviceProtocolContentTypeHeader())),
61+
Map.of(
62+
"content-type",
63+
ProtoUtils.serviceProtocolContentTypeHeader(
64+
definition.isEnablePreviewContext()))),
6265
EndpointRequestHandler.LoggingContextSetter.THREAD_LOCAL_INSTANCE,
6366
syscallsExecutor,
6467
false);

0 commit comments

Comments
 (0)