Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,5 +230,5 @@
restate = '2.5.0-SNAPSHOT'
schema-kenerator = '2.1.2'
spring-boot = '3.4.9'
vertx = '4.5.18'
vertx = '4.5.22'
victools-json-schema = '4.38.0'
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ internal constructor(
retryPolicy: RetryPolicy?,
block: suspend () -> T
): DurableFuture<T> {
var serde: Serde<T> = resolveSerde(typeTag)
var coroutineCtx = currentCoroutineContext()
val serde: Serde<T> = resolveSerde(typeTag)
val coroutineCtx = currentCoroutineContext()
val javaRetryPolicy =
retryPolicy?.let {
dev.restate.sdk.common.RetryPolicy.exponential(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public RequestProcessor processorForRequest(

String fullyQualifiedServiceMethod = serviceName + "/" + handlerName;

// If we got it, set it
String invocationIdHeader = headersAccessor.get("x-restate-invocation-id");
if (invocationIdHeader != null) {
loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader);
}

// Instantiate state machine
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);

Expand Down
43 changes: 17 additions & 26 deletions sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class HandlerContextImpl implements HandlerContextInternal {
private final @Nullable String objectKey;
private final String fullyQualifiedHandlerName;

private CompletableFuture<Void> nextProcessedRun;
private final List<AsyncResultInternal<String>> invocationIdsToCancel;
private final HashMap<Integer, Consumer<RunCompleter>> scheduledRuns;

Expand Down Expand Up @@ -349,6 +348,10 @@ public void pollAsyncResult(AsyncResultInternal<?> asyncResult) {

private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
while (true) {
if (this.stateMachine.state() == InvocationState.CLOSED) {
asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE);
return;
}
if (asyncResult.isDone()) {
return;
}
Expand Down Expand Up @@ -399,21 +402,26 @@ private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
}

// Not ready yet, let's try to do some progress
StateMachine.DoProgressResponse response = this.stateMachine.doProgress(uncompletedLeaves);
StateMachine.DoProgressResponse response;
try {
response = this.stateMachine.doProgress(uncompletedLeaves);
} catch (Throwable e) {
this.failWithoutContextSwitch(e);
asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE);
return;
}

if (response instanceof StateMachine.DoProgressResponse.AnyCompleted) {
// Let it loop now
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput) {
this.stateMachine
.waitNextInputSignal()
.thenAccept(v -> this.pollAsyncResultInner(asyncResult));
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput
|| response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
this.stateMachine.onNextEvent(
() -> this.pollAsyncResultInner(asyncResult),
response instanceof StateMachine.DoProgressResponse.ReadFromInput);
return;
} else if (response instanceof StateMachine.DoProgressResponse.ExecuteRun) {
triggerScheduledRun(((StateMachine.DoProgressResponse.ExecuteRun) response).handle());
// Let it loop now
} else if (response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
this.waitNextProcessedRun().thenAccept(v -> this.pollAsyncResultInner(asyncResult));
return;
}
}
}
Expand All @@ -425,7 +433,6 @@ public void proposeRunSuccess(int runHandle, Slice toWrite) {
} catch (Exception e) {
this.failWithoutContextSwitch(e);
}
triggerNextProcessedRun();
}

@Override
Expand All @@ -439,15 +446,6 @@ public void proposeRunFailure(
} catch (Exception e) {
this.failWithoutContextSwitch(e);
}
triggerNextProcessedRun();
}

private void triggerNextProcessedRun() {
if (this.nextProcessedRun != null) {
var fut = this.nextProcessedRun;
this.nextProcessedRun = null;
fut.complete(null);
}
}

private void triggerScheduledRun(int handle) {
Expand All @@ -470,13 +468,6 @@ public void proposeFailure(Throwable toWrite, @Nullable RetryPolicy retryPolicy)
});
}

private CompletableFuture<Void> waitNextProcessedRun() {
if (this.nextProcessedRun == null) {
this.nextProcessedRun = new CompletableFuture<>();
}
return this.nextProcessedRun;
}

@Override
public void close() {
this.stateMachine.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ static StateMachine init(

CompletableFuture<Void> waitForReady();

// --- Await next input
// --- Await next event

CompletableFuture<Void> waitNextInputSignal();
void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed);

// --- Async results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

class StateMachineImpl implements StateMachine {
Expand All @@ -38,7 +39,7 @@ class StateMachineImpl implements StateMachine {

// Callbacks
private final CompletableFuture<Void> waitForReadyFuture = new CompletableFuture<>();
private CompletableFuture<Void> waitNextProcessedInput;
private @NonNull Runnable nextEventListener = () -> {};

// Java Flow and message handling
private final MessageDecoder messageDecoder = new MessageDecoder();
Expand Down Expand Up @@ -72,22 +73,22 @@ public CompletableFuture<Void> waitForReady() {
}

@Override
public CompletableFuture<Void> waitNextInputSignal() {
if (this.stateContext.isInputClosed()) {
return CompletableFuture.completedFuture(null);
public void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed) {
this.nextEventListener =
() -> {
this.nextEventListener.run();
runnable.run();
};
// Trigger this now
if (triggerNowIfInputClosed && this.stateContext.isInputClosed()) {
this.triggerNextEventSignal();
}
if (waitNextProcessedInput == null) {
this.waitNextProcessedInput = new CompletableFuture<>();
}
return this.waitNextProcessedInput;
}

private void triggerWaitNextInputSignal() {
if (this.waitNextProcessedInput != null) {
CompletableFuture<Void> fut = this.waitNextProcessedInput;
this.waitNextProcessedInput = null;
fut.complete(null);
}
private void triggerNextEventSignal() {
Runnable listener = this.nextEventListener;
this.nextEventListener = () -> {};
listener.run();
}

// -- IO
Expand Down Expand Up @@ -142,7 +143,7 @@ public void onNext(Slice slice) {
}

if (shouldTriggerInputListener) {
this.triggerWaitNextInputSignal();
this.triggerNextEventSignal();
}

} catch (Throwable e) {
Expand All @@ -152,8 +153,8 @@ public void onNext(Slice slice) {

@Override
public void onError(Throwable throwable) {
LOG.trace("Got failure", throwable);
this.stateContext.getCurrentState().hitError(throwable, null, null, this.stateContext);
this.triggerNextEventSignal();
cancelInputSubscription();
}

Expand All @@ -164,8 +165,9 @@ public void onComplete() {
this.stateContext.getCurrentState().onInputClosed(this.stateContext);
} catch (Throwable e) {
this.onError(e);
return;
}
this.triggerWaitNextInputSignal();
this.triggerNextEventSignal();
this.cancelInputSubscription();
}

Expand Down Expand Up @@ -549,7 +551,13 @@ public int run(String name) {
@Override
public void proposeRunCompletion(int handle, Slice value) {
LOG.debug("Executing 'Run completed with success'");
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
try {
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
} catch (Throwable e) {
this.onError(e);
return;
}
this.triggerNextEventSignal();
}

@Override
Expand All @@ -559,9 +567,15 @@ public void proposeRunCompletion(
Duration attemptDuration,
@Nullable RetryPolicy retryPolicy) {
LOG.debug("Executing 'Run completed with failure'");
this.stateContext
.getCurrentState()
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
try {
this.stateContext
.getCurrentState()
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
} catch (Throwable e) {
this.onError(e);
return;
}
this.triggerNextEventSignal();
}

@Override
Expand Down