Skip to content

Commit 02e449c

Browse files
committed
support step context from Python Workflows SDK
1 parent 6067b11 commit 02e449c

File tree

4 files changed

+56
-8
lines changed

4 files changed

+56
-8
lines changed

src/pyodide/internal/workers-api/src/workers/_workers.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,8 +1163,20 @@ async def _resolve_dependency(self, dep):
11631163

11641164

11651165
async def _do_call(entrypoint, name, config, callback, *results):
1166-
async def _callback():
1167-
result = callback(*results)
1166+
async def _callback(ctx=None):
1167+
sig = inspect.signature(callback)
1168+
params = [
1169+
p
1170+
for p in sig.parameters.values()
1171+
if p.kind
1172+
not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD)
1173+
]
1174+
wants_ctx = len(params) > len(results)
1175+
1176+
if wants_ctx and ctx is not None:
1177+
result = callback(python_from_rpc(ctx), *results)
1178+
else:
1179+
result = callback(*results)
11681180

11691181
if inspect.iscoroutine(result):
11701182
result = await result

src/workerd/server/tests/python/workflow-entrypoint/worker.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import * as assert from 'node:assert';
44
class Context extends RpcTarget {
55
async do(name, fn) {
66
try {
7-
const result = await fn();
7+
const ctx = { attempt: '1', metadata: 'expected_return_metadata' };
8+
const result = await fn(ctx);
89
return result;
910
} catch (e) {
1011
console.log(`Error received: ${e.name} Message: ${e.message}`);
@@ -17,14 +18,22 @@ class Context extends RpcTarget {
1718
export default {
1819
async test(ctrl, env, ctx) {
1920
// JS types
20-
const stubStep = new Context();
21+
const stubStep= new Context();
2122

22-
const resp = await env.PythonWorkflow.run(
23+
let resp = await env.PythonWorkflow.run(
2324
{
2425
foo: 'bar',
2526
},
2627
stubStep
2728
);
2829
assert.deepStrictEqual(resp, 'foobar');
30+
31+
resp = await env.PythonWorkflowWithCtx.run(
32+
{
33+
foo: 'bar',
34+
},
35+
stubStep
36+
);
37+
assert.deepStrictEqual(resp, 'expected_return_metadata');
2938
},
3039
};

src/workerd/server/tests/python/workflow-entrypoint/workflow-entrypoint.wd-test

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const pyWorker :Workerd.Worker = (
1818

1919
bindings = [
2020
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
21+
(name = "PythonWorkflowWithCtx", service = (name = "py", entrypoint = "WorkflowEntrypointWithCtx")),
2122
],
2223
);
2324

@@ -32,5 +33,6 @@ const jsWorker :Workerd.Worker = (
3233

3334
bindings = [
3435
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
36+
(name = "PythonWorkflowWithCtx", service = (name = "py", entrypoint = "WorkflowEntrypointWithCtx")),
3537
],
3638
);

src/workerd/server/tests/python/workflow-entrypoint/workflow.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,48 @@ async def step_3(_result1, _result2):
4444
await await_step(step_3)
4545

4646
# `step_1` and `step_2` run serially
47-
@step.do("step_4", depends=[step_1, step_2], concurrent=False)
47+
@step.do("step_4", depends=[step_1, step_2], concurrent=True)
4848
async def step_4(result1, result2):
4949
print("Executing step 4 (depends on step 1 and step 2)")
5050
assert result1["foo"] == "foo"
5151
assert result2["bar"] == "bar"
5252

5353
await await_step(step_4)
5454

55+
# tests step memoization - steps 1 and 2 are already resolved
5556
@step.do("step_5", depends=[step_1, step_2], concurrent=False)
56-
async def step_5(result1, result2):
57+
async def step_5(result1=(), result2=()):
5758
print("Executing step 5 (depends on step 1 and step 2)")
5859
assert result1["foo"] == "foo"
5960
assert result2["bar"] == "bar"
6061

6162
return result1["foo"] + result2["bar"]
6263

63-
return await await_step(step_5)
64+
return await step_5()
65+
66+
class WorkflowEntrypointWithCtx(WorkflowEntrypoint):
67+
async def run(self, event, step):
68+
@step.do("step_1")
69+
async def step_1():
70+
# tests backwards compat with workflows that don't have ctx in the step callback
71+
print("Executing step 1")
72+
return {"foo": "foo"}
73+
74+
@step.do("step_2")
75+
async def step_2(ctx):
76+
print("Executing step 2")
77+
return {"bar": "bar"}
78+
79+
@step.do("step_3", depends=[step_1, step_2], concurrent=False)
80+
async def step_3(context, result1=(), result2=()):
81+
print(f"context exists and is {context}")
82+
print("Executing step 5 (depends on step 1 and step 2)")
83+
assert result1["foo"] == "foo"
84+
assert result2["bar"] == "bar"
85+
assert context["attempt"] == "1"
86+
return context["metadata"]
87+
88+
return await step_3()
6489

6590

6691
async def test(ctrl, env, ctx):

0 commit comments

Comments
 (0)