Skip to content

Commit ab56e59

Browse files
WIP trying to figure out where the RST stream issue even is...
1 parent a810ec1 commit ab56e59

File tree

4 files changed

+42
-44
lines changed

4 files changed

+42
-44
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,5 +230,5 @@
230230
restate = '2.5.0-SNAPSHOT'
231231
schema-kenerator = '2.1.2'
232232
spring-boot = '3.4.9'
233-
vertx = '4.5.18'
233+
vertx = '4.5.22'
234234
victools-json-schema = '4.38.0'

sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ public void pollAsyncResult(AsyncResultInternal<?> asyncResult) {
349349

350350
private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
351351
while (true) {
352+
if (this.stateMachine.state() == InvocationState.CLOSED) {
353+
asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE);
354+
return;
355+
}
352356
if (asyncResult.isDone()) {
353357
return;
354358
}
@@ -412,9 +416,12 @@ private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
412416
// Let it loop now
413417
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput
414418
|| response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
415-
CompletableFuture.anyOf(
416-
this.waitNextProcessedRun(), this.stateMachine.waitNextInputSignal())
417-
.thenAccept(v -> this.pollAsyncResultInner(asyncResult));
419+
// LOG.info("Gonna need to wait here {}", response);
420+
this.stateMachine.onNextEvent(
421+
() -> {
422+
LOG.info("Triggered after wait {}", response);
423+
this.pollAsyncResultInner(asyncResult);
424+
});
418425
return;
419426
} else if (response instanceof StateMachine.DoProgressResponse.ExecuteRun) {
420427
triggerScheduledRun(((StateMachine.DoProgressResponse.ExecuteRun) response).handle());
@@ -430,7 +437,6 @@ public void proposeRunSuccess(int runHandle, Slice toWrite) {
430437
} catch (Exception e) {
431438
this.failWithoutContextSwitch(e);
432439
}
433-
triggerNextProcessedRun();
434440
}
435441

436442
@Override
@@ -444,15 +450,6 @@ public void proposeRunFailure(
444450
} catch (Exception e) {
445451
this.failWithoutContextSwitch(e);
446452
}
447-
triggerNextProcessedRun();
448-
}
449-
450-
private void triggerNextProcessedRun() {
451-
if (this.nextProcessedRun != null) {
452-
var fut = this.nextProcessedRun;
453-
this.nextProcessedRun = null;
454-
fut.complete(null);
455-
}
456453
}
457454

458455
private void triggerScheduledRun(int handle) {
@@ -475,13 +472,6 @@ public void proposeFailure(Throwable toWrite, @Nullable RetryPolicy retryPolicy)
475472
});
476473
}
477474

478-
private CompletableFuture<Void> waitNextProcessedRun() {
479-
if (this.nextProcessedRun == null) {
480-
this.nextProcessedRun = new CompletableFuture<>();
481-
}
482-
return this.nextProcessedRun;
483-
}
484-
485475
@Override
486476
public void close() {
487477
this.stateMachine.end();

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ static StateMachine init(
4242

4343
CompletableFuture<Void> waitForReady();
4444

45-
// --- Await next input
45+
// --- Await next event
4646

47-
CompletableFuture<Void> waitNextInputSignal();
47+
void onNextEvent(Runnable runnable);
4848

4949
// --- Async results
5050

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.function.Consumer;
2929
import org.apache.logging.log4j.LogManager;
3030
import org.apache.logging.log4j.Logger;
31+
import org.jspecify.annotations.NonNull;
3132
import org.jspecify.annotations.Nullable;
3233

3334
class StateMachineImpl implements StateMachine {
@@ -38,7 +39,7 @@ class StateMachineImpl implements StateMachine {
3839

3940
// Callbacks
4041
private final CompletableFuture<Void> waitForReadyFuture = new CompletableFuture<>();
41-
private CompletableFuture<Void> waitNextProcessedInput;
42+
private @NonNull Runnable nextEventListener = () -> {};
4243

4344
// Java Flow and message handling
4445
private final MessageDecoder messageDecoder = new MessageDecoder();
@@ -72,22 +73,20 @@ public CompletableFuture<Void> waitForReady() {
7273
}
7374

7475
@Override
75-
public CompletableFuture<Void> waitNextInputSignal() {
76-
if (this.stateContext.isInputClosed()) {
77-
return CompletableFuture.completedFuture(null);
78-
}
79-
if (waitNextProcessedInput == null) {
80-
this.waitNextProcessedInput = new CompletableFuture<>();
81-
}
82-
return this.waitNextProcessedInput;
76+
public void onNextEvent(Runnable runnable) {
77+
// LOG.info("Registering next event signal");
78+
this.nextEventListener =
79+
() -> {
80+
this.nextEventListener.run();
81+
runnable.run();
82+
};
8383
}
8484

85-
private void triggerWaitNextInputSignal() {
86-
if (this.waitNextProcessedInput != null) {
87-
CompletableFuture<Void> fut = this.waitNextProcessedInput;
88-
this.waitNextProcessedInput = null;
89-
fut.complete(null);
90-
}
85+
private void triggerNextEventSignal() {
86+
// LOG.info("Triggering next event signal");
87+
Runnable listener = this.nextEventListener;
88+
this.nextEventListener = () -> {};
89+
listener.run();
9190
}
9291

9392
// -- IO
@@ -142,7 +141,7 @@ public void onNext(Slice slice) {
142141
}
143142

144143
if (shouldTriggerInputListener) {
145-
this.triggerWaitNextInputSignal();
144+
this.triggerNextEventSignal();
146145
}
147146

148147
} catch (Throwable e) {
@@ -153,6 +152,7 @@ public void onNext(Slice slice) {
153152
@Override
154153
public void onError(Throwable throwable) {
155154
this.stateContext.getCurrentState().hitError(throwable, null, null, this.stateContext);
155+
this.triggerNextEventSignal();
156156
cancelInputSubscription();
157157
}
158158

@@ -164,7 +164,7 @@ public void onComplete() {
164164
} catch (Throwable e) {
165165
this.onError(e);
166166
}
167-
this.triggerWaitNextInputSignal();
167+
this.triggerNextEventSignal();
168168
this.cancelInputSubscription();
169169
}
170170

@@ -548,7 +548,11 @@ public int run(String name) {
548548
@Override
549549
public void proposeRunCompletion(int handle, Slice value) {
550550
LOG.debug("Executing 'Run completed with success'");
551-
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
551+
try {
552+
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
553+
} finally {
554+
this.triggerNextEventSignal();
555+
}
552556
}
553557

554558
@Override
@@ -558,9 +562,13 @@ public void proposeRunCompletion(
558562
Duration attemptDuration,
559563
@Nullable RetryPolicy retryPolicy) {
560564
LOG.debug("Executing 'Run completed with failure'");
561-
this.stateContext
562-
.getCurrentState()
563-
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
565+
try {
566+
this.stateContext
567+
.getCurrentState()
568+
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
569+
} finally {
570+
this.triggerNextEventSignal();
571+
}
564572
}
565573

566574
@Override

0 commit comments

Comments
 (0)