Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions src/pyodide/internal/workers-api/src/workers/_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,77 @@ def wrapper(*args, **kwargs):
return wrapper


class _RollbackStepWrapper:
"""
Wrapper returned by step.with_rollback() that allows attaching an .undo decorator.

Delegates to the engine's withRollback for durable undo stack management.

Usage:
@step.with_rollback("save to db")
async def save():
return await db.insert(data)

@save.undo
async def undo_save(error, record_id):
await db.delete(record_id)

record_id = await save()
"""

def __init__(
self,
step_wrapper: "_WorkflowStepWrapper",
name: str,
do_fn,
config: dict | None = None,
):
self._step_wrapper = step_wrapper
self._name = name
self._do_fn = do_fn
self._config = config
self._undo_fn = None
self._undo_config = None
self._step_name = name # For compatibility with dependency resolution

def undo(self, config: dict | None = None):
"""
Decorator to register an undo/compensation function for this step.

The undo function receives (error, value) where value is the result
of the do function.

Args:
config: Optional WorkflowStepConfig for the undo step's retry behavior.
If not provided, inherits from the do step's config.
"""

def decorator(fn):
self._undo_fn = fn
self._undo_config = config
return fn

# Support both @save.undo and @save.undo(config={...})
if callable(config):
fn = config
self._undo_fn = fn
self._undo_config = None
return fn

return decorator

async def __call__(self):
"""Execute the step via engine's withRollback for durable undo stack."""
return await _withRollback_call(
self._step_wrapper,
self._name,
self._config,
self._undo_config,
self._do_fn,
self._undo_fn,
)


class _WorkflowStepWrapper:
def __init__(self, js_step):
self._js_step = js_step
Expand Down Expand Up @@ -1169,6 +1240,39 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"):
),
)

def with_rollback(self, name: str, config: dict | None = None):
"""
Decorator to define a step with rollback/compensation support (saga pattern).

Returns a callable wrapper that allows attaching an .undo decorator for
compensation logic. Undo functions execute automatically in LIFO order
when the workflow throws an uncaught error (if rollback config is enabled
at instance creation).

Args:
name: The name of the step.
config: Optional WorkflowStepConfig for configuring retry behavior.

Usage:
@step.with_rollback("save to db")
async def save():
return await db.insert(data)

@save.undo
async def undo_save(error, record_id):
await db.delete(record_id)

record_id = await save()

# If any step throws, undo functions run automatically
# (when instance created with rollback config enabled)
"""

def decorator(func):
return _RollbackStepWrapper(self, name, func, config)

return decorator

async def _resolve_dependency(self, dep):
if dep._step_name in self._memoized_dependencies:
return self._memoized_dependencies[dep._step_name]
Expand Down Expand Up @@ -1211,6 +1315,66 @@ async def _closure():
return result


async def _withRollback_call(entrypoint, name, config, undo_config, do_fn, undo_fn):
"""Call the engine's withRollback with Python callbacks wrapped for JS."""

async def _closure():
async def _do_callback():
result = do_fn()
if inspect.iscoroutine(result):
result = await result
return to_js(result, dict_converter=Object.fromEntries)

async def _undo_callback(js_err, js_value):
py_err = None
if js_err is not None:
py_err = (
_from_js_error(js_err) if hasattr(js_err, "message") else js_err
)

py_value = python_from_rpc(js_value)

result = undo_fn(py_err, py_value)
if inspect.iscoroutine(result):
await result

handler = {"do": _do_callback}
if undo_fn is not None:
handler["undo"] = _undo_callback

js_handler = to_js(handler, dict_converter=Object.fromEntries)

js_config = None
if config is not None or undo_config is not None:
config_dict = dict(config) if config else {}
if undo_config is not None:
config_dict["undoConfig"] = undo_config
js_config = to_js(config_dict, dict_converter=Object.fromEntries)

try:
if js_config is None:
result = await entrypoint._js_step.withRollback(name, js_handler)
else:
result = await entrypoint._js_step.withRollback(
name, js_handler, js_config
)

return python_from_rpc(result)
except Exception as exc:
raise _from_js_error(exc) from exc

task = create_task(_closure())
entrypoint._in_flight[name] = task

try:
result = await task
entrypoint._memoized_dependencies[name] = result
finally:
del entrypoint._in_flight[name]

return result


def _wrap_subclass(cls):
# Override the class __init__ so that we can wrap the `env` in the constructor.
original_init = cls.__init__
Expand Down
2 changes: 2 additions & 0 deletions src/workerd/server/tests/python/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ py_wd_test("python-rpc")

py_wd_test("workflow-entrypoint")

py_wd_test("workflow-rollback")

py_wd_test("vendor_dir_compat_flag")

py_wd_test("multiprocessing")
Expand Down
75 changes: 75 additions & 0 deletions src/workerd/server/tests/python/workflow-rollback/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { RpcTarget } from 'cloudflare:workers';
import * as assert from 'node:assert';

class Context extends RpcTarget {
async do(name, configOrFn, maybeFn) {
const fn = maybeFn ?? configOrFn;
try {
const result = await fn();
return result;
} catch (e) {
console.log(`Error received: ${e.name} Message: ${e.message}`);
throw e;
}
}
}

export default {
async test(ctrl, env, ctx) {
const stubStep = new Context();

// Test 1: Basic with_rollback - executes do and returns value
{
const resp = await env.PythonWorkflow.run(
{ test: 'with_rollback_basic' },
stubStep
);
assert.deepStrictEqual(resp, ['do_1', 'returned:value_1']);
console.log('✓ with_rollback_basic');
}

// Test 2: Undo decorator properly registers handler
{
const resp = await env.PythonWorkflow.run(
{ test: 'with_rollback_undo_decorator' },
stubStep
);
assert.strictEqual(resp.has_undo, true);
console.log('✓ with_rollback_undo_decorator');
}

// Test 3: Handler structure captures return value
{
const resp = await env.PythonWorkflow.run(
{ test: 'with_rollback_undo_receives_value' },
stubStep
);
assert.deepStrictEqual(resp.do_result, { id: 123, data: 'important' });
assert.strictEqual(resp.undo_registered, true);
console.log('✓ with_rollback_undo_receives_value');
}

// Test 4: Config and undoConfig are stored
{
const resp = await env.PythonWorkflow.run(
{ test: 'with_rollback_config' },
stubStep
);
assert.deepStrictEqual(resp.step_config, { retries: { limit: 1 } });
assert.deepStrictEqual(resp.undo_config, { retries: { limit: 3 } });
console.log('✓ with_rollback_config');
}

// Test 5: with_rollback works without undo handler
{
const resp = await env.PythonWorkflow.run(
{ test: 'with_rollback_no_undo' },
stubStep
);
assert.deepStrictEqual(resp, ['do_without_undo', 'returned:value']);
console.log('✓ with_rollback_no_undo');
}

console.log('All with_rollback tests passed!');
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using Workerd = import "/workerd/workerd.capnp";

const config :Workerd.Config = (
services = [
(name = "py", worker = .pyWorker),
(name = "js", worker = .jsWorker),
],
);

const pyWorker :Workerd.Worker = (

compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "rpc", "disable_python_no_global_handlers"],

modules = [
(name = "workflow.py", pythonModule = embed "workflow.py"),
],

bindings = [
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")),
],
);

const jsWorker :Workerd.Worker = (

compatibilityFlags = ["nodejs_compat", "rpc", "disable_python_no_global_handlers"],

modules = [
(name = "worker", esModule = embed "worker.js"),
],

bindings = [
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowRollbackExample")),
],
);
Loading
Loading