Skip to content

Commit 9a950f0

Browse files
committed
Adds implicit dependency resolution on Python Workflows steps
1 parent b8755c7 commit 9a950f0

File tree

9 files changed

+198
-42
lines changed

9 files changed

+198
-42
lines changed

src/pyodide/internal/metadata.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ const FORCE_NEW_VENDOR_PATH: boolean =
5858
export const IS_DEDICATED_SNAPSHOT_ENABLED: boolean =
5959
!!COMPATIBILITY_FLAGS.python_dedicated_snapshot;
6060
const EXTERNAL_SDK = !!COMPATIBILITY_FLAGS.enable_python_external_sdk;
61+
export const WORKFLOWS_IMPLICIT_DEPS =
62+
!!COMPATIBILITY_FLAGS.python_workflows_implicit_dependencies;
6163

6264
export const LEGACY_GLOBAL_HANDLERS = !NO_GLOBAL_HANDLERS;
6365
export const LEGACY_VENDOR_PATH = !FORCE_NEW_VENDOR_PATH;

src/pyodide/internal/workers-api/src/workers/_workers.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,28 +1114,53 @@ def __init__(self, js_step):
11141114
self._js_step = js_step
11151115
self._memoized_dependencies = {}
11161116
self._in_flight = {}
1117+
self.step_closures = {}
11171118

11181119
def do(self, name, depends=None, concurrent=False, config=None):
11191120
def decorator(func):
11201121
async def wrapper():
1122+
# if implicit params are enabled, each param that is not context should be treated as a dependency and resolved
1123+
# In other words, we introspect the declaration and call a function (need to make sure it's a step) with the same name as the corresponding param
1124+
# This new code path should discard depends, as we encourage users to implicitly declare their invariant steps in the signature
1125+
# if the compat flag is disabled, then we just maintain the same legacy behavior
1126+
results_future_list = depends
1127+
1128+
if _pyodide_entrypoint_helper.workflowsImplicitDeps:
1129+
sig = inspect.signature(func)
1130+
results_future_list = []
1131+
for p in sig.parameters.values():
1132+
if p.name in self.step_closures:
1133+
results_future_list.append(self.step_closures[p.name])
1134+
elif p.name == "ctx":
1135+
results_future_list.append(p)
1136+
11211137
if concurrent:
11221138
results = await gather(
1123-
*[self._resolve_dependency(dep) for dep in depends or []]
1139+
*[
1140+
self._resolve_dependency(dep)
1141+
for dep in results_future_list or []
1142+
]
11241143
)
11251144
else:
11261145
results = [
1127-
await self._resolve_dependency(dep) for dep in depends or []
1146+
await self._resolve_dependency(dep)
1147+
for dep in results_future_list or []
11281148
]
1129-
python_results = [python_from_rpc(result) for result in results]
1149+
python_results = [
1150+
result
1151+
if (hasattr(result, "name") and result.name == "ctx")
1152+
else python_from_rpc(result)
1153+
for result in results
1154+
]
11301155
return await _do_call(self, name, config, func, *python_results)
11311156

11321157
wrapper._step_name = name
1158+
self.step_closures[name] = wrapper
11331159
return wrapper
11341160

11351161
return decorator
11361162

11371163
def sleep(self, *args, **kwargs):
1138-
# all types should be primitives - no need for explicit translation
11391164
return self._js_step.sleep(*args, **kwargs)
11401165

11411166
def sleep_until(self, name, timestamp):
@@ -1154,7 +1179,9 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"):
11541179
)
11551180

11561181
async def _resolve_dependency(self, dep):
1157-
if dep._step_name in self._memoized_dependencies:
1182+
if hasattr(dep, "name") and dep.name == "ctx":
1183+
return dep
1184+
elif dep._step_name in self._memoized_dependencies:
11581185
return self._memoized_dependencies[dep._step_name]
11591186
elif dep._step_name in self._in_flight:
11601187
return await self._in_flight[dep._step_name]
@@ -1163,8 +1190,13 @@ async def _resolve_dependency(self, dep):
11631190

11641191

11651192
async def _do_call(entrypoint, name, config, callback, *results):
1166-
async def _callback():
1167-
result = callback(*results)
1193+
async def _callback(ctx=None):
1194+
# deconstruct the actual ctx object
1195+
resolved_results = tuple(
1196+
python_from_rpc(ctx) if hasattr(r, "name") and r.name == "ctx" else r
1197+
for r in results
1198+
)
1199+
result = callback(*resolved_results)
11681200

11691201
if inspect.iscoroutine(result):
11701202
result = await result

src/pyodide/python-entrypoint-helper.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,35 @@
22
// This file is a BUILTIN module that provides the actual implementation for the
33
// python-entrypoint.js USER module.
44

5-
import {
6-
beforeRequest,
7-
loadPyodide,
8-
clearSignals,
9-
} from 'pyodide-internal:python';
105
import { enterJaegerSpan } from 'pyodide-internal:jaeger';
11-
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
6+
import { default as Limiter } from 'pyodide-internal:limiter';
127
import {
138
IS_WORKERD,
9+
LEGACY_GLOBAL_HANDLERS,
10+
LEGACY_INCLUDE_SDK,
1411
LOCKFILE,
15-
TRANSITIVE_REQUIREMENTS,
1612
MAIN_MODULE_NAME,
17-
WORKERD_INDEX_URL,
1813
SHOULD_SNAPSHOT_TO_DISK,
14+
TRANSITIVE_REQUIREMENTS,
15+
WORKERD_INDEX_URL,
1916
WORKFLOWS_ENABLED,
20-
LEGACY_GLOBAL_HANDLERS,
21-
LEGACY_INCLUDE_SDK,
17+
WORKFLOWS_IMPLICIT_DEPS,
2218
} from 'pyodide-internal:metadata';
23-
import { default as Limiter } from 'pyodide-internal:limiter';
2419
import {
25-
PythonWorkersInternalError,
20+
beforeRequest,
21+
clearSignals,
22+
loadPyodide,
23+
} from 'pyodide-internal:python';
24+
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
25+
import {
26+
LOADED_SNAPSHOT_TYPE,
27+
maybeCollectDedicatedSnapshot,
28+
} from 'pyodide-internal:snapshot';
29+
import {
2630
PythonUserError,
31+
PythonWorkersInternalError,
2732
reportError,
2833
} from 'pyodide-internal:util';
29-
import { LOADED_SNAPSHOT_TYPE } from 'pyodide-internal:snapshot';
3034

3135
type PyFuture<T> = Promise<T> & { copy(): PyFuture<T>; destroy(): void };
3236

@@ -69,8 +73,8 @@ export type PyodideEntrypointHelper = {
6973
cloudflareSocketsModule: any;
7074
workerEntrypoint: any;
7175
patchWaitUntil: typeof patchWaitUntil;
76+
workflowsImplicitDeps: boolean;
7277
};
73-
import { maybeCollectDedicatedSnapshot } from 'pyodide-internal:snapshot';
7478

7579
// Function to import JavaScript modules from Python
7680
let _pyodide_entrypoint_helper: PyodideEntrypointHelper | null = null;
@@ -96,6 +100,7 @@ export function setDoAnImport(
96100
cloudflareSocketsModule,
97101
workerEntrypoint,
98102
patchWaitUntil,
103+
workflowsImplicitDeps: WORKFLOWS_IMPLICIT_DEPS,
99104
};
100105
}
101106

src/pyodide/types/runtime-generated/metadata.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ declare namespace MetadataReader {
66
python_dedicated_snapshot?: boolean;
77
enable_python_external_sdk?: boolean;
88
python_check_rng_state?: boolean;
9+
python_workflows_implicit_dependencies?: boolean;
910
}
1011

1112
const isWorkerd: () => boolean;

src/workerd/io/compatibility-date.capnp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,4 +1256,11 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
12561256
$experimental;
12571257
# Enables precise timers with 3ms granularity. This provides more accurate timing for performance
12581258
# measurements and time-sensitive operations.
1259+
1260+
pythonWorkflowsImplicitDeps @149 :Bool
1261+
$compatEnableFlag("python_workflows_implicit_dependencies")
1262+
$compatDisableFlag("no_python_workflows_implicit_dependencies")
1263+
$impliedByAfterDate(name = "pythonWorkers", date = "2026-01-15");
1264+
# replaces depends param on steps to an implicit approach with step callables passed as params
1265+
# these steps are called internally and act as dependencies
12591266
}

src/workerd/server/tests/python/workflow-entrypoint/worker.js

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@ import { RpcTarget } from 'cloudflare:workers';
22
import * as assert from 'node:assert';
33

44
class Context extends RpcTarget {
5+
constructor(shouldSendCtx) {
6+
super();
7+
this.shouldSendCtx = shouldSendCtx;
8+
}
9+
510
async do(name, fn) {
611
try {
7-
const result = await fn();
12+
const ctx = { attempt: '1', metadata: 'expected_return_metadata' };
13+
const result = this.shouldSendCtx ? await fn(ctx) : await fn();
814
return result;
915
} catch (e) {
1016
console.log(`Error received: ${e.name} Message: ${e.message}`);
@@ -16,10 +22,35 @@ class Context extends RpcTarget {
1622

1723
export default {
1824
async test(ctrl, env, ctx) {
19-
// JS types
20-
const stubStep = new Context();
25+
let stubStep = new Context(true);
26+
27+
// Tests forward compat - i.e.: python workflows should be compatible with steps that pass a ctx argument
28+
// this param is optional and searched by name. Meaning it's not positional
29+
let resp = await env.PythonWorkflow.run(
30+
{
31+
foo: 'bar',
32+
},
33+
stubStep
34+
);
35+
assert.deepStrictEqual(resp, 'foobar');
36+
37+
// Tests backwards compat - i.e.: new logic shouldn't break workflows until a release is done with
38+
// ctx being passed as a step argument - this is not controlled via compat flag
39+
// Moreover, this workflow also tests that dependencies are no longer resolved through the old code path
40+
stubStep = new Context(false);
41+
resp = await env.PythonWorkflowBackwardsCompat.run(
42+
{
43+
foo: 'bar',
44+
},
45+
stubStep
46+
);
47+
assert.deepStrictEqual(resp, 'foo');
2148

22-
const resp = await env.PythonWorkflow.run(
49+
// Tests backwards compat for dependency resolution - previously deps were not following a name based
50+
// approach. Instead, they were passed in the same order as they were declared in the depends param
51+
// This test also doesn't pass any ctx to steps
52+
stubStep = new Context(false);
53+
resp = await env.PythonWorkflowDepends.run(
2354
{
2455
foo: 'bar',
2556
},

src/workerd/server/tests/python/workflow-entrypoint/workflow-entrypoint.wd-test

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,37 @@ using Workerd = import "/workerd/workerd.capnp";
33
const config :Workerd.Config = (
44
services = [
55
(name = "py", worker = .pyWorker),
6+
(name = "pyOld", worker = .pyWorkerOld),
67
(name = "js", worker = .jsWorker),
78
],
89
);
910

1011
const pyWorker :Workerd.Worker = (
11-
compatibilityDate = "2025-03-04",
12+
compatibilityDate = "2025-12-04",
1213

13-
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows"],
14+
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "python_workflows_implicit_dependencies"],
1415

1516
modules = [
1617
(name = "workflow.py", pythonModule = embed "workflow.py"),
1718
],
1819

1920
bindings = [
2021
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
22+
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
23+
],
24+
);
25+
26+
const pyWorkerOld :Workerd.Worker = (
27+
compatibilityDate = "2025-12-04",
28+
29+
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "no_python_workflows_implicit_dependencies"],
30+
31+
modules = [
32+
(name = "workflow-old.py", pythonModule = embed "workflow-old.py"),
33+
],
34+
35+
bindings = [
36+
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
2137
],
2238
);
2339

@@ -32,5 +48,7 @@ const jsWorker :Workerd.Worker = (
3248

3349
bindings = [
3450
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
51+
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
52+
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
3553
],
3654
);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright (c) 2025 Cloudflare, Inc.
2+
# Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
# https://opensource.org/licenses/Apache-2.0
4+
5+
from workers import WorkflowEntrypoint
6+
7+
8+
class PythonWorkflowDepends(WorkflowEntrypoint):
9+
# The purpose of this workflow is testing that depends are no longer resolved
10+
#
11+
async def run(self, event, step):
12+
@step.do("step_1")
13+
async def step_1():
14+
# tests backwards compat with workflows that don't have ctx in the step callback
15+
print("Executing step 1")
16+
return "foo"
17+
18+
@step.do("step_2")
19+
async def step_2():
20+
print("Executing step 2")
21+
return "bar"
22+
23+
@step.do("step_3", depends=[step_1, step_2], concurrent=True)
24+
async def step_3(result1=(), result2=()):
25+
assert result1 == "foo"
26+
assert result2 == "bar"
27+
return result1 + result2
28+
29+
return await step_3()
30+
31+
32+
async def test(ctrl, env, ctx):
33+
pass

0 commit comments

Comments
 (0)