diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9802ea84..87f2f488 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -230,5 +230,5 @@ restate = '2.5.0-SNAPSHOT' schema-kenerator = '2.1.2' spring-boot = '3.4.9' - vertx = '4.5.18' + vertx = '4.5.22' victools-json-schema = '4.38.0' diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt index b8dfc7d5..601540ec 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt @@ -123,8 +123,8 @@ internal constructor( retryPolicy: RetryPolicy?, block: suspend () -> T ): DurableFuture { - var serde: Serde = resolveSerde(typeTag) - var coroutineCtx = currentCoroutineContext() + val serde: Serde = resolveSerde(typeTag) + val coroutineCtx = currentCoroutineContext() val javaRetryPolicy = retryPolicy?.let { dev.restate.sdk.common.RetryPolicy.exponential( diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java index 435dd241..aeb8043d 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java @@ -173,6 +173,12 @@ public RequestProcessor processorForRequest( String fullyQualifiedServiceMethod = serviceName + "/" + handlerName; + // If we got it, set it + String invocationIdHeader = headersAccessor.get("x-restate-invocation-id"); + if (invocationIdHeader != null) { + loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader); + } + // Instantiate state machine StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java index 55840cf1..ff6910f6 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java @@ -42,7 +42,6 @@ class HandlerContextImpl implements HandlerContextInternal { private final @Nullable String objectKey; private final String fullyQualifiedHandlerName; - private CompletableFuture nextProcessedRun; private final List> invocationIdsToCancel; private final HashMap> scheduledRuns; @@ -349,6 +348,10 @@ public void pollAsyncResult(AsyncResultInternal asyncResult) { private void pollAsyncResultInner(AsyncResultInternal asyncResult) { while (true) { + if (this.stateMachine.state() == InvocationState.CLOSED) { + asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE); + return; + } if (asyncResult.isDone()) { return; } @@ -399,21 +402,26 @@ private void pollAsyncResultInner(AsyncResultInternal asyncResult) { } // Not ready yet, let's try to do some progress - StateMachine.DoProgressResponse response = this.stateMachine.doProgress(uncompletedLeaves); + StateMachine.DoProgressResponse response; + try { + response = this.stateMachine.doProgress(uncompletedLeaves); + } catch (Throwable e) { + this.failWithoutContextSwitch(e); + asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE); + return; + } if (response instanceof StateMachine.DoProgressResponse.AnyCompleted) { // Let it loop now - } else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput) { - this.stateMachine - .waitNextInputSignal() - .thenAccept(v -> this.pollAsyncResultInner(asyncResult)); + } else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput + || response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) { + this.stateMachine.onNextEvent( + () -> this.pollAsyncResultInner(asyncResult), + response instanceof StateMachine.DoProgressResponse.ReadFromInput); return; } else if (response instanceof StateMachine.DoProgressResponse.ExecuteRun) { triggerScheduledRun(((StateMachine.DoProgressResponse.ExecuteRun) response).handle()); // Let it loop now - } else if (response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) { - this.waitNextProcessedRun().thenAccept(v -> this.pollAsyncResultInner(asyncResult)); - return; } } } @@ -425,7 +433,6 @@ public void proposeRunSuccess(int runHandle, Slice toWrite) { } catch (Exception e) { this.failWithoutContextSwitch(e); } - triggerNextProcessedRun(); } @Override @@ -439,15 +446,6 @@ public void proposeRunFailure( } catch (Exception e) { this.failWithoutContextSwitch(e); } - triggerNextProcessedRun(); - } - - private void triggerNextProcessedRun() { - if (this.nextProcessedRun != null) { - var fut = this.nextProcessedRun; - this.nextProcessedRun = null; - fut.complete(null); - } } private void triggerScheduledRun(int handle) { @@ -470,13 +468,6 @@ public void proposeFailure(Throwable toWrite, @Nullable RetryPolicy retryPolicy) }); } - private CompletableFuture waitNextProcessedRun() { - if (this.nextProcessedRun == null) { - this.nextProcessedRun = new CompletableFuture<>(); - } - return this.nextProcessedRun; - } - @Override public void close() { this.stateMachine.end(); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java index c915deb5..14d810c1 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java @@ -42,9 +42,9 @@ static StateMachine init( CompletableFuture waitForReady(); - // --- Await next input + // --- Await next event - CompletableFuture waitNextInputSignal(); + void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed); // --- Async results diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java index 317ed2b7..4253a1ba 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; class StateMachineImpl implements StateMachine { @@ -38,7 +39,7 @@ class StateMachineImpl implements StateMachine { // Callbacks private final CompletableFuture waitForReadyFuture = new CompletableFuture<>(); - private CompletableFuture waitNextProcessedInput; + private @NonNull Runnable nextEventListener = () -> {}; // Java Flow and message handling private final MessageDecoder messageDecoder = new MessageDecoder(); @@ -72,22 +73,22 @@ public CompletableFuture waitForReady() { } @Override - public CompletableFuture waitNextInputSignal() { - if (this.stateContext.isInputClosed()) { - return CompletableFuture.completedFuture(null); + public void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed) { + this.nextEventListener = + () -> { + this.nextEventListener.run(); + runnable.run(); + }; + // Trigger this now + if (triggerNowIfInputClosed && this.stateContext.isInputClosed()) { + this.triggerNextEventSignal(); } - if (waitNextProcessedInput == null) { - this.waitNextProcessedInput = new CompletableFuture<>(); - } - return this.waitNextProcessedInput; } - private void triggerWaitNextInputSignal() { - if (this.waitNextProcessedInput != null) { - CompletableFuture fut = this.waitNextProcessedInput; - this.waitNextProcessedInput = null; - fut.complete(null); - } + private void triggerNextEventSignal() { + Runnable listener = this.nextEventListener; + this.nextEventListener = () -> {}; + listener.run(); } // -- IO @@ -142,7 +143,7 @@ public void onNext(Slice slice) { } if (shouldTriggerInputListener) { - this.triggerWaitNextInputSignal(); + this.triggerNextEventSignal(); } } catch (Throwable e) { @@ -152,8 +153,8 @@ public void onNext(Slice slice) { @Override public void onError(Throwable throwable) { - LOG.trace("Got failure", throwable); this.stateContext.getCurrentState().hitError(throwable, null, null, this.stateContext); + this.triggerNextEventSignal(); cancelInputSubscription(); } @@ -164,8 +165,9 @@ public void onComplete() { this.stateContext.getCurrentState().onInputClosed(this.stateContext); } catch (Throwable e) { this.onError(e); + return; } - this.triggerWaitNextInputSignal(); + this.triggerNextEventSignal(); this.cancelInputSubscription(); } @@ -549,7 +551,13 @@ public int run(String name) { @Override public void proposeRunCompletion(int handle, Slice value) { LOG.debug("Executing 'Run completed with success'"); - this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext); + try { + this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext); + } catch (Throwable e) { + this.onError(e); + return; + } + this.triggerNextEventSignal(); } @Override @@ -559,9 +567,15 @@ public void proposeRunCompletion( Duration attemptDuration, @Nullable RetryPolicy retryPolicy) { LOG.debug("Executing 'Run completed with failure'"); - this.stateContext - .getCurrentState() - .proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext); + try { + this.stateContext + .getCurrentState() + .proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext); + } catch (Throwable e) { + this.onError(e); + return; + } + this.triggerNextEventSignal(); } @Override