Skip to content

Commit 6be1e3d

Browse files
Small things
1 parent ab56e59 commit 6be1e3d

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ internal constructor(
123123
retryPolicy: RetryPolicy?,
124124
block: suspend () -> T
125125
): DurableFuture<T> {
126-
var serde: Serde<T> = resolveSerde(typeTag)
127-
var coroutineCtx = currentCoroutineContext()
126+
val serde: Serde<T> = resolveSerde(typeTag)
127+
val coroutineCtx = currentCoroutineContext()
128128
val javaRetryPolicy =
129129
retryPolicy?.let {
130130
dev.restate.sdk.common.RetryPolicy.exponential(

sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public RequestProcessor processorForRequest(
173173

174174
String fullyQualifiedServiceMethod = serviceName + "/" + handlerName;
175175

176+
// If we got it, set it
177+
String invocationIdHeader = headersAccessor.get("x-restate-invocation-id");
178+
if (invocationIdHeader != null) {
179+
loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader);
180+
}
181+
176182
// Instantiate state machine
177183
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);
178184

sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class HandlerContextImpl implements HandlerContextInternal {
4242
private final @Nullable String objectKey;
4343
private final String fullyQualifiedHandlerName;
4444

45-
private CompletableFuture<Void> nextProcessedRun;
4645
private final List<AsyncResultInternal<String>> invocationIdsToCancel;
4746
private final HashMap<Integer, Consumer<RunCompleter>> scheduledRuns;
4847

@@ -416,7 +415,6 @@ private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
416415
// Let it loop now
417416
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput
418417
|| response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
419-
// LOG.info("Gonna need to wait here {}", response);
420418
this.stateMachine.onNextEvent(
421419
() -> {
422420
LOG.info("Triggered after wait {}", response);

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,18 @@ public CompletableFuture<Void> waitForReady() {
7474

7575
@Override
7676
public void onNextEvent(Runnable runnable) {
77-
// LOG.info("Registering next event signal");
7877
this.nextEventListener =
7978
() -> {
8079
this.nextEventListener.run();
8180
runnable.run();
8281
};
82+
// Trigger this now
83+
if (this.stateContext.isInputClosed()) {
84+
this.triggerNextEventSignal();
85+
}
8386
}
8487

8588
private void triggerNextEventSignal() {
86-
// LOG.info("Triggering next event signal");
8789
Runnable listener = this.nextEventListener;
8890
this.nextEventListener = () -> {};
8991
listener.run();
@@ -163,6 +165,7 @@ public void onComplete() {
163165
this.stateContext.getCurrentState().onInputClosed(this.stateContext);
164166
} catch (Throwable e) {
165167
this.onError(e);
168+
return;
166169
}
167170
this.triggerNextEventSignal();
168171
this.cancelInputSubscription();
@@ -550,9 +553,11 @@ public void proposeRunCompletion(int handle, Slice value) {
550553
LOG.debug("Executing 'Run completed with success'");
551554
try {
552555
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
553-
} finally {
554-
this.triggerNextEventSignal();
556+
} catch (Throwable e) {
557+
this.onError(e);
558+
return;
555559
}
560+
this.triggerNextEventSignal();
556561
}
557562

558563
@Override
@@ -566,9 +571,11 @@ public void proposeRunCompletion(
566571
this.stateContext
567572
.getCurrentState()
568573
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
569-
} finally {
570-
this.triggerNextEventSignal();
574+
} catch (Throwable e) {
575+
this.onError(e);
576+
return;
571577
}
578+
this.triggerNextEventSignal();
572579
}
573580

574581
@Override

0 commit comments

Comments
 (0)