Skip to content

Conversation

@Sxnan
Copy link
Contributor

@Sxnan Sxnan commented Jan 9, 2026

Linked issue: #423

Purpose of change

  • Introduce CallRecord into ActionState and persist/restore call records across durable execution recovery.
  • Expose execute API on runner_context.py and wire it through local/flink runner contexts.
  • Extend Python/Java runtimes to emit and replay call records, aligning durable execution semantics and ensuring operator context restoration.

Tests

  • Added/updated unit tests for ActionState/CallRecord serde and store, runner context execute path, and durable execution flows.
  • Added e2e integration tests for runner context execute (basic, multiple calls, async) with ground-truth fixtures.

API

  • Added execute to Python runner_context (new public API surface); no breaking changes identified.

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions bot added priority/major Default priority of the PR or issue. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. doc-needed Your PR changes impact docs. labels Jan 9, 2026
@Sxnan Sxnan force-pushed the finegrain-durable branch from 7224f9e to ff5984c Compare January 9, 2026 02:24
@Sxnan Sxnan changed the title [runtime] Introduce CallRecord to ActionState [runtime] Support Fine-grain Durable Execution Jan 9, 2026
@Sxnan Sxnan marked this pull request as ready for review January 9, 2026 02:52
@Sxnan Sxnan requested a review from xintongsong January 9, 2026 02:59
@Sxnan
Copy link
Contributor Author

Sxnan commented Jan 9, 2026

@xintongsong Could you review this PR?

* <p>During recovery, the success or failure of the original call is determined by checking whether
* {@code exceptionPayload} is null.
*/
public class CallRecord {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to name this CallResult, which makes it explicit that this class represents a result of call execution.

Comment on lines +108 to +122
public byte[] getResultPayload() {
return resultPayload;
}

public void setResultPayload(byte[] resultPayload) {
this.resultPayload = resultPayload;
}

public byte[] getExceptionPayload() {
return exceptionPayload;
}

public void setExceptionPayload(byte[] exceptionPayload) {
this.exceptionPayload = exceptionPayload;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set the payloads after construction? Can these fields be final?

"""

@abstractmethod
def execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest the name durable_execute()

The action that calls this API should be deterministic, meaning that it
will always make the execute_async call with the same arguments and in
the same order during job recovery. Otherwise, the behavior is undefined.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to name this method durable_execute_async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the necessity, but it's still possible that we support non-durable async execution in future, which simply execute in a separate thread/coroutine but the result should not be reused for replaying.

E.g., getting a token from remote, which might be already expired during the replay.

Note: Local runner does not support durable execution, so recovery
is not available.
"""
return func(*args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are renaming this to durable_execute, we also need to print a warning log here. There's no need to fail the call, as we don't want users to change their codes when switching between local execution & remote execution.

Comment on lines +166 to +174
if (currentActionState != null && actionStatePersister != null) {
currentActionState.addCallRecord(callRecord);
actionStatePersister.run();
LOG.debug(
"Recorded and persisted CallRecord at index {}: functionId={}, argsDigest={}",
currentCallIndex,
functionId,
argsDigest);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would currentActionState and actionStatePersister be null when this method is called? Shall we use assertion here?

* @return array containing [isHit (boolean), resultPayload (byte[]), exceptionPayload
* (byte[])], or null if miss
*/
public Object[] tryGetCachedCallRecord(String functionId, String argsDigest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest the name matchNextOrClearSubsequentCallRecord. Otherwise, the clearing is super implicit.

() -> {
try {
actionStateStore.put(
key, sequenceNumberKState.value(), action, event, actionState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sequenceNumberKState might change when the callback is called.

Comment on lines +51 to +60
private int currentCallIndex = 0;

/** List of existing CallRecords loaded during recovery. */
private List<CallRecord> recoveryCallRecords;

/** The current ActionState being built during action execution. */
@Nullable private ActionState currentActionState;

/** Callback to persist ActionState after each code block completion. */
@Nullable private Runnable actionStatePersister;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird that these are maintained in PythonRunnerContextImpl.

  1. These are not only needed by python. We will also support this for Java.
  2. These are actually ActionTask-specific. I think we should handle this in RunnerContextImpl.switchActionContext(). We might want to actually introduce an ActionTaskContext.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-needed Your PR changes impact docs. fixVersion/0.2.0 The feature or bug should be implemented/fixed in the 0.2.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants