Skip to content

Commit bf6dfdc

Browse files
committed
Adds implicit dependency resolution on Python Workflows steps
1 parent 6bacc75 commit bf6dfdc

File tree

10 files changed

+209
-48
lines changed

10 files changed

+209
-48
lines changed

src/pyodide/internal/metadata.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ const FORCE_NEW_VENDOR_PATH: boolean =
6262
export const IS_DEDICATED_SNAPSHOT_ENABLED: boolean =
6363
!!COMPATIBILITY_FLAGS.python_dedicated_snapshot;
6464
const EXTERNAL_SDK = !!COMPATIBILITY_FLAGS.enable_python_external_sdk;
65+
export const WORKFLOWS_IMPLICIT_DEPS =
66+
!!COMPATIBILITY_FLAGS.python_workflows_implicit_dependencies;
6567

6668
export const LEGACY_GLOBAL_HANDLERS = !NO_GLOBAL_HANDLERS;
6769
export const LEGACY_VENDOR_PATH = !FORCE_NEW_VENDOR_PATH;

src/pyodide/internal/test_introspection.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ def setup_gen(cls):
1818
stack.enter_context(
1919
patch.dict("sys.modules", _pyodide_entrypoint_helper=MagicMock())
2020
)
21+
stack.enter_context(
22+
patch.dict("sys.modules", _cloudflare_compat_flags=MagicMock())
23+
)
2124
stack.enter_context(
2225
patch.dict("sys.modules", {"pyodide": MagicMock(__version__=2)})
2326
)

src/pyodide/internal/workers-api/src/workers/_workers.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
from types import LambdaType
2121
from typing import Any, Never, Protocol, TypedDict, Unpack
2222

23+
import _cloudflare_compat_flags
24+
2325
# Get globals modules and import function from the entrypoint-helper
2426
import _pyodide_entrypoint_helper
2527
import js
@@ -1130,28 +1132,60 @@ def __init__(self, js_step):
11301132
self._js_step = js_step
11311133
self._memoized_dependencies = {}
11321134
self._in_flight = {}
1135+
self.step_closures = {}
11331136

1134-
def do(self, name, depends=None, concurrent=False, config=None):
1137+
def do(self, name=None, *, depends=None, concurrent=False, config=None):
11351138
def decorator(func):
11361139
async def wrapper():
1140+
# if implicit params are enabled, each param that is not context should be treated as a dependency and resolved
1141+
# In other words, we introspect the declaration and call a function (need to make sure it's a step) with the same name as the corresponding param
1142+
# This new code path should discard depends, as we encourage users to implicitly declare their invariant steps in the signature
1143+
# if the compat flag is disabled, then we just maintain the same legacy behavior
1144+
results_future_list = depends
1145+
1146+
if python_from_rpc(_cloudflare_compat_flags)[
1147+
"python_workflows_implicit_dependencies"
1148+
]:
1149+
if depends is not None:
1150+
TypeError(
1151+
"Received unexpected parameter depends. This was deprecated and dependencies can be declared using callable names"
1152+
)
1153+
sig = inspect.signature(func)
1154+
results_future_list = []
1155+
for p in sig.parameters.values():
1156+
if p.name in self.step_closures:
1157+
results_future_list.append(self.step_closures[p.name])
1158+
elif p.name == "ctx":
1159+
results_future_list.append(p)
1160+
11371161
if concurrent:
11381162
results = await gather(
1139-
*[self._resolve_dependency(dep) for dep in depends or []]
1163+
*[
1164+
self._resolve_dependency(dep)
1165+
for dep in results_future_list or []
1166+
]
11401167
)
11411168
else:
11421169
results = [
1143-
await self._resolve_dependency(dep) for dep in depends or []
1170+
await self._resolve_dependency(dep)
1171+
for dep in results_future_list or []
11441172
]
1145-
python_results = [python_from_rpc(result) for result in results]
1146-
return await _do_call(self, name, config, func, *python_results)
1173+
python_results = [
1174+
result
1175+
if (hasattr(result, "name") and result.name == "ctx")
1176+
else python_from_rpc(result)
1177+
for result in results
1178+
]
1179+
step_name = func.__name__ if name is None else name
1180+
return await _do_call(self, step_name, config, func, *python_results)
11471181

11481182
wrapper._step_name = name
1183+
self.step_closures[name] = wrapper
11491184
return wrapper
11501185

11511186
return decorator
11521187

11531188
def sleep(self, *args, **kwargs):
1154-
# all types should be primitives - no need for explicit translation
11551189
return self._js_step.sleep(*args, **kwargs)
11561190

11571191
def sleep_until(self, name, timestamp):
@@ -1170,7 +1204,9 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"):
11701204
)
11711205

11721206
async def _resolve_dependency(self, dep):
1173-
if dep._step_name in self._memoized_dependencies:
1207+
if hasattr(dep, "name") and dep.name == "ctx":
1208+
return dep
1209+
elif dep._step_name in self._memoized_dependencies:
11741210
return self._memoized_dependencies[dep._step_name]
11751211
elif dep._step_name in self._in_flight:
11761212
return await self._in_flight[dep._step_name]
@@ -1179,8 +1215,13 @@ async def _resolve_dependency(self, dep):
11791215

11801216

11811217
async def _do_call(entrypoint, name, config, callback, *results):
1182-
async def _callback():
1183-
result = callback(*results)
1218+
async def _callback(ctx=None):
1219+
# deconstruct the actual ctx object
1220+
resolved_results = tuple(
1221+
python_from_rpc(ctx) if hasattr(r, "name") and r.name == "ctx" else r
1222+
for r in results
1223+
)
1224+
result = callback(*resolved_results)
11841225

11851226
if inspect.iscoroutine(result):
11861227
result = await result

src/pyodide/python-entrypoint-helper.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,37 @@
22
// This file is a BUILTIN module that provides the actual implementation for the
33
// python-entrypoint.js USER module.
44

5-
import {
6-
beforeRequest,
7-
loadPyodide,
8-
clearSignals,
9-
} from 'pyodide-internal:python';
5+
import { patch_env_helper } from 'pyodide-internal:envHelpers';
106
import { enterJaegerSpan } from 'pyodide-internal:jaeger';
11-
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
7+
import { default as Limiter } from 'pyodide-internal:limiter';
128
import {
9+
COMPATIBILITY_FLAGS,
1310
IS_WORKERD,
11+
LEGACY_GLOBAL_HANDLERS,
12+
LEGACY_INCLUDE_SDK,
1413
LOCKFILE,
15-
TRANSITIVE_REQUIREMENTS,
1614
MAIN_MODULE_NAME,
17-
WORKERD_INDEX_URL,
1815
SHOULD_SNAPSHOT_TO_DISK,
16+
TRANSITIVE_REQUIREMENTS,
17+
WORKERD_INDEX_URL,
1918
WORKFLOWS_ENABLED,
20-
LEGACY_GLOBAL_HANDLERS,
21-
LEGACY_INCLUDE_SDK,
22-
COMPATIBILITY_FLAGS,
2319
} from 'pyodide-internal:metadata';
24-
import { default as Limiter } from 'pyodide-internal:limiter';
2520
import {
26-
PythonWorkersInternalError,
21+
beforeRequest,
22+
clearSignals,
23+
loadPyodide,
24+
} from 'pyodide-internal:python';
25+
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
26+
import {
27+
LOADED_SNAPSHOT_TYPE,
28+
maybeCollectDedicatedSnapshot,
29+
} from 'pyodide-internal:snapshot';
30+
import {
2731
PythonUserError,
32+
PythonWorkersInternalError,
2833
reportError,
2934
} from 'pyodide-internal:util';
30-
import { LOADED_SNAPSHOT_TYPE } from 'pyodide-internal:snapshot';
3135
export { createImportProxy } from 'pyodide-internal:serializeJsModule';
32-
import { patch_env_helper } from 'pyodide-internal:envHelpers';
3336

3437
type PyFuture<T> = Promise<T> & { copy(): PyFuture<T>; destroy(): void };
3538

@@ -74,7 +77,6 @@ export type PyodideEntrypointHelper = {
7477
patchWaitUntil: typeof patchWaitUntil;
7578
patch_env_helper: (patch: unknown) => Generator<void>;
7679
};
77-
import { maybeCollectDedicatedSnapshot } from 'pyodide-internal:snapshot';
7880

7981
// Function to import JavaScript modules from Python
8082
let _pyodide_entrypoint_helper: PyodideEntrypointHelper | null = null;

src/pyodide/types/runtime-generated/metadata.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ declare namespace MetadataReader {
66
python_dedicated_snapshot?: boolean;
77
enable_python_external_sdk?: boolean;
88
python_check_rng_state?: boolean;
9+
python_workflows_implicit_dependencies?: boolean;
910
}
1011

1112
const isWorkerd: () => boolean;

src/workerd/io/compatibility-date.capnp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,4 +1316,11 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
13161316
# Node.js-compatible versions from node:timers. setTimeout and setInterval return
13171317
# Timeout objects with methods like refresh(), ref(), unref(), and hasRef().
13181318
# This flag requires nodejs_compat or nodejs_compat_v2 to be enabled.
1319+
1320+
pythonWorkflowsImplicitDeps @154 :Bool
1321+
$compatEnableFlag("python_workflows_implicit_dependencies")
1322+
$compatDisableFlag("no_python_workflows_implicit_dependencies")
1323+
$impliedByAfterDate(name = "pythonWorkers", date = "2026-01-15");
1324+
# replaces depends param on steps to an implicit approach with step callables passed as params
1325+
# these steps are called internally and act as dependencies
13191326
}

src/workerd/server/tests/python/workflow-entrypoint/worker.js

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@ import { RpcTarget } from 'cloudflare:workers';
22
import * as assert from 'node:assert';
33

44
class Context extends RpcTarget {
5+
constructor(shouldSendCtx) {
6+
super();
7+
this.shouldSendCtx = shouldSendCtx;
8+
}
9+
510
async do(name, fn) {
611
try {
7-
const result = await fn();
12+
const ctx = { attempt: '1', metadata: 'expected_return_metadata' };
13+
const result = this.shouldSendCtx ? await fn(ctx) : await fn();
814
return result;
915
} catch (e) {
1016
console.log(`Error received: ${e.name} Message: ${e.message}`);
@@ -16,10 +22,35 @@ class Context extends RpcTarget {
1622

1723
export default {
1824
async test(ctrl, env, ctx) {
19-
// JS types
20-
const stubStep = new Context();
25+
let stubStep = new Context(true);
26+
27+
// Tests forward compat - i.e.: python workflows should be compatible with steps that pass a ctx argument
28+
// this param is optional and searched by name. Meaning it's not positional
29+
let resp = await env.PythonWorkflow.run(
30+
{
31+
foo: 'bar',
32+
},
33+
stubStep
34+
);
35+
assert.deepStrictEqual(resp, 'foobar');
36+
37+
// Tests backwards compat - i.e.: new logic shouldn't break workflows until a release is done with
38+
// ctx being passed as a step argument - this is not controlled via compat flag
39+
// Moreover, this workflow also tests that dependencies are no longer resolved through the old code path
40+
stubStep = new Context(false);
41+
resp = await env.PythonWorkflowBackwardsCompat.run(
42+
{
43+
foo: 'bar',
44+
},
45+
stubStep
46+
);
47+
assert.deepStrictEqual(resp, 'foo');
2148

22-
const resp = await env.PythonWorkflow.run(
49+
// Tests backwards compat for dependency resolution - previously deps were not following a name based
50+
// approach. Instead, they were passed in the same order as they were declared in the depends param
51+
// This test also doesn't pass any ctx to steps
52+
stubStep = new Context(false);
53+
resp = await env.PythonWorkflowDepends.run(
2354
{
2455
foo: 'bar',
2556
},

src/workerd/server/tests/python/workflow-entrypoint/workflow-entrypoint.wd-test

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,46 @@ using Workerd = import "/workerd/workerd.capnp";
33
const config :Workerd.Config = (
44
services = [
55
(name = "py", worker = .pyWorker),
6+
(name = "pyOld", worker = .pyWorkerOld),
67
(name = "js", worker = .jsWorker),
78
],
89
);
910

1011
const pyWorker :Workerd.Worker = (
11-
12-
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "rpc", "disable_python_no_global_handlers"],
12+
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "python_workflows_implicit_dependencies"],
1313

1414
modules = [
1515
(name = "workflow.py", pythonModule = embed "workflow.py"),
1616
],
1717

1818
bindings = [
1919
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
20+
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
2021
],
2122
);
2223

23-
const jsWorker :Workerd.Worker = (
24+
const pyWorkerOld :Workerd.Worker = (
25+
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "no_python_workflows_implicit_dependencies"],
26+
27+
modules = [
28+
(name = "workflow-old.py", pythonModule = embed "workflow-old.py"),
29+
],
2430

25-
compatibilityFlags = ["nodejs_compat", "rpc", "disable_python_no_global_handlers"],
31+
bindings = [
32+
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
33+
],
34+
);
35+
36+
const jsWorker :Workerd.Worker = (
37+
compatibilityFlags = ["nodejs_compat", "rpc"],
2638

2739
modules = [
2840
(name = "worker", esModule = embed "worker.js"),
2941
],
3042

3143
bindings = [
3244
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
45+
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
46+
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
3347
],
3448
);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright (c) 2025 Cloudflare, Inc.
2+
# Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
# https://opensource.org/licenses/Apache-2.0
4+
5+
from workers import WorkflowEntrypoint
6+
7+
8+
class PythonWorkflowDepends(WorkflowEntrypoint):
9+
# The purpose of this workflow is testing that depends are no longer resolved
10+
#
11+
async def run(self, event, step):
12+
@step.do("step_1")
13+
async def step_1():
14+
# tests backwards compat with workflows that don't have ctx in the step callback
15+
print("Executing step 1")
16+
return "foo"
17+
18+
@step.do("step_2")
19+
async def step_2():
20+
print("Executing step 2")
21+
return "bar"
22+
23+
@step.do("step_3", depends=[step_1, step_2], concurrent=True)
24+
async def step_3(result1=(), result2=()):
25+
assert result1 == "foo"
26+
assert result2 == "bar"
27+
return result1 + result2
28+
29+
return await step_3()
30+
31+
32+
async def test(ctrl, env, ctx):
33+
pass

0 commit comments

Comments
 (0)