Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/pyodide/internal/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const FORCE_NEW_VENDOR_PATH: boolean =
export const IS_DEDICATED_SNAPSHOT_ENABLED: boolean =
!!COMPATIBILITY_FLAGS.python_dedicated_snapshot;
const EXTERNAL_SDK = !!COMPATIBILITY_FLAGS.enable_python_external_sdk;
export const WORKFLOWS_IMPLICIT_DEPS =
!!COMPATIBILITY_FLAGS.python_workflows_implicit_dependencies;

export const LEGACY_GLOBAL_HANDLERS = !NO_GLOBAL_HANDLERS;
export const LEGACY_VENDOR_PATH = !FORCE_NEW_VENDOR_PATH;
Expand Down
3 changes: 3 additions & 0 deletions src/pyodide/internal/test_introspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def setup_gen(cls):
stack.enter_context(
patch.dict("sys.modules", _pyodide_entrypoint_helper=MagicMock())
)
stack.enter_context(
patch.dict("sys.modules", _cloudflare_compat_flags=MagicMock())
)
stack.enter_context(
patch.dict("sys.modules", {"pyodide": MagicMock(__version__=2)})
)
Expand Down
59 changes: 50 additions & 9 deletions src/pyodide/internal/workers-api/src/workers/_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from types import LambdaType
from typing import Any, Never, Protocol, TypedDict, Unpack

import _cloudflare_compat_flags

# Get globals modules and import function from the entrypoint-helper
import _pyodide_entrypoint_helper
import js
Expand Down Expand Up @@ -1130,28 +1132,60 @@ def __init__(self, js_step):
self._js_step = js_step
self._memoized_dependencies = {}
self._in_flight = {}
self.step_closures = {}

def do(self, name, depends=None, concurrent=False, config=None):
def do(self, name=None, *, depends=None, concurrent=False, config=None):
def decorator(func):
async def wrapper():
# if implicit params are enabled, each param that is not context should be treated as a dependency and resolved
# In other words, we introspect the declaration and call a function (need to make sure it's a step) with the same name as the corresponding param
# This new code path should discard depends, as we encourage users to implicitly declare their invariant steps in the signature
# if the compat flag is disabled, then we just maintain the same legacy behavior
results_future_list = depends

if python_from_rpc(_cloudflare_compat_flags)[
"python_workflows_implicit_dependencies"
]:
Comment on lines +1146 to +1148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if python_from_rpc(_cloudflare_compat_flags)[
"python_workflows_implicit_dependencies"
]:
if _cloudflare_compat_flags.python_workflows_implicit_dependencies:

if depends is not None:
TypeError(
"Received unexpected parameter depends. This was deprecated and dependencies can be declared using callable names"
)
Comment on lines +1150 to +1152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot the raise.

Suggested change
TypeError(
"Received unexpected parameter depends. This was deprecated and dependencies can be declared using callable names"
)
raise TypeError(
"Received unexpected parameter depends. This was deprecated and dependencies can be declared using callable names"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think you might try defining different an entirely different method depending on whether this flag is on or not:
9757c5c

sig = inspect.signature(func)
results_future_list = []
for p in sig.parameters.values():
if p.name in self.step_closures:
results_future_list.append(self.step_closures[p.name])
elif p.name == "ctx":
results_future_list.append(p)

if concurrent:
results = await gather(
*[self._resolve_dependency(dep) for dep in depends or []]
*[
self._resolve_dependency(dep)
for dep in results_future_list or []
]
)
else:
results = [
await self._resolve_dependency(dep) for dep in depends or []
await self._resolve_dependency(dep)
for dep in results_future_list or []
]
python_results = [python_from_rpc(result) for result in results]
return await _do_call(self, name, config, func, *python_results)
python_results = [
result
if (hasattr(result, "name") and result.name == "ctx")
Comment on lines +1174 to +1175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment explaining why we do this?

else python_from_rpc(result)
for result in results
]
step_name = func.__name__ if name is None else name
return await _do_call(self, step_name, config, func, *python_results)

wrapper._step_name = name
self.step_closures[name] = wrapper
return wrapper

return decorator

def sleep(self, *args, **kwargs):
# all types should be primitives - no need for explicit translation
return self._js_step.sleep(*args, **kwargs)

def sleep_until(self, name, timestamp):
Expand All @@ -1170,7 +1204,9 @@ def wait_for_event(self, name, event_type, /, timeout="24 hours"):
)

async def _resolve_dependency(self, dep):
if dep._step_name in self._memoized_dependencies:
if hasattr(dep, "name") and dep.name == "ctx":
return dep
elif dep._step_name in self._memoized_dependencies:
return self._memoized_dependencies[dep._step_name]
elif dep._step_name in self._in_flight:
return await self._in_flight[dep._step_name]
Expand All @@ -1179,8 +1215,13 @@ async def _resolve_dependency(self, dep):


async def _do_call(entrypoint, name, config, callback, *results):
async def _callback():
result = callback(*results)
async def _callback(ctx=None):
# deconstruct the actual ctx object
resolved_results = tuple(
python_from_rpc(ctx) if hasattr(r, "name") and r.name == "ctx" else r
for r in results
)
result = callback(*resolved_results)

if inspect.iscoroutine(result):
result = await result
Expand Down
34 changes: 18 additions & 16 deletions src/pyodide/python-entrypoint-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,37 @@
// This file is a BUILTIN module that provides the actual implementation for the
// python-entrypoint.js USER module.

import {
beforeRequest,
loadPyodide,
clearSignals,
} from 'pyodide-internal:python';
import { patch_env_helper } from 'pyodide-internal:envHelpers';
import { enterJaegerSpan } from 'pyodide-internal:jaeger';
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
import { default as Limiter } from 'pyodide-internal:limiter';
import {
COMPATIBILITY_FLAGS,
IS_WORKERD,
LEGACY_GLOBAL_HANDLERS,
LEGACY_INCLUDE_SDK,
LOCKFILE,
TRANSITIVE_REQUIREMENTS,
MAIN_MODULE_NAME,
WORKERD_INDEX_URL,
SHOULD_SNAPSHOT_TO_DISK,
TRANSITIVE_REQUIREMENTS,
WORKERD_INDEX_URL,
WORKFLOWS_ENABLED,
LEGACY_GLOBAL_HANDLERS,
LEGACY_INCLUDE_SDK,
COMPATIBILITY_FLAGS,
} from 'pyodide-internal:metadata';
import { default as Limiter } from 'pyodide-internal:limiter';
import {
PythonWorkersInternalError,
beforeRequest,
clearSignals,
loadPyodide,
} from 'pyodide-internal:python';
import { patchLoadPackage } from 'pyodide-internal:setupPackages';
import {
LOADED_SNAPSHOT_TYPE,
maybeCollectDedicatedSnapshot,
} from 'pyodide-internal:snapshot';
import {
PythonUserError,
PythonWorkersInternalError,
reportError,
} from 'pyodide-internal:util';
import { LOADED_SNAPSHOT_TYPE } from 'pyodide-internal:snapshot';
export { createImportProxy } from 'pyodide-internal:serializeJsModule';
import { patch_env_helper } from 'pyodide-internal:envHelpers';

type PyFuture<T> = Promise<T> & { copy(): PyFuture<T>; destroy(): void };

Expand Down Expand Up @@ -74,7 +77,6 @@ export type PyodideEntrypointHelper = {
patchWaitUntil: typeof patchWaitUntil;
patch_env_helper: (patch: unknown) => Generator<void>;
};
import { maybeCollectDedicatedSnapshot } from 'pyodide-internal:snapshot';

// Function to import JavaScript modules from Python
let _pyodide_entrypoint_helper: PyodideEntrypointHelper | null = null;
Expand Down
1 change: 1 addition & 0 deletions src/pyodide/types/runtime-generated/metadata.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ declare namespace MetadataReader {
python_dedicated_snapshot?: boolean;
enable_python_external_sdk?: boolean;
python_check_rng_state?: boolean;
python_workflows_implicit_dependencies?: boolean;
}

const isWorkerd: () => boolean;
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -1316,4 +1316,11 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# Node.js-compatible versions from node:timers. setTimeout and setInterval return
# Timeout objects with methods like refresh(), ref(), unref(), and hasRef().
# This flag requires nodejs_compat or nodejs_compat_v2 to be enabled.

pythonWorkflowsImplicitDeps @154 :Bool
$compatEnableFlag("python_workflows_implicit_dependencies")
$compatDisableFlag("no_python_workflows_implicit_dependencies")
$impliedByAfterDate(name = "pythonWorkers", date = "2026-01-15");
# replaces depends param on steps to an implicit approach with step callables passed as params
# these steps are called internally and act as dependencies
}
39 changes: 35 additions & 4 deletions src/workerd/server/tests/python/workflow-entrypoint/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ import { RpcTarget } from 'cloudflare:workers';
import * as assert from 'node:assert';

class Context extends RpcTarget {
constructor(shouldSendCtx) {
super();
this.shouldSendCtx = shouldSendCtx;
}

async do(name, fn) {
try {
const result = await fn();
const ctx = { attempt: '1', metadata: 'expected_return_metadata' };
const result = this.shouldSendCtx ? await fn(ctx) : await fn();
return result;
} catch (e) {
console.log(`Error received: ${e.name} Message: ${e.message}`);
Expand All @@ -16,10 +22,35 @@ class Context extends RpcTarget {

export default {
async test(ctrl, env, ctx) {
// JS types
const stubStep = new Context();
let stubStep = new Context(true);

// Tests forward compat - i.e.: python workflows should be compatible with steps that pass a ctx argument
// this param is optional and searched by name. Meaning it's not positional
let resp = await env.PythonWorkflow.run(
{
foo: 'bar',
},
stubStep
);
assert.deepStrictEqual(resp, 'foobar');

// Tests backwards compat - i.e.: new logic shouldn't break workflows until a release is done with
// ctx being passed as a step argument - this is not controlled via compat flag
// Moreover, this workflow also tests that dependencies are no longer resolved through the old code path
stubStep = new Context(false);
resp = await env.PythonWorkflowBackwardsCompat.run(
{
foo: 'bar',
},
stubStep
);
assert.deepStrictEqual(resp, 'foo');

const resp = await env.PythonWorkflow.run(
// Tests backwards compat for dependency resolution - previously deps were not following a name based
// approach. Instead, they were passed in the same order as they were declared in the depends param
// This test also doesn't pass any ctx to steps
stubStep = new Context(false);
resp = await env.PythonWorkflowDepends.run(
{
foo: 'bar',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,46 @@ using Workerd = import "/workerd/workerd.capnp";
const config :Workerd.Config = (
services = [
(name = "py", worker = .pyWorker),
(name = "pyOld", worker = .pyWorkerOld),
(name = "js", worker = .jsWorker),
],
);

const pyWorker :Workerd.Worker = (

compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "rpc", "disable_python_no_global_handlers"],
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "python_workflows_implicit_dependencies"],

modules = [
(name = "workflow.py", pythonModule = embed "workflow.py"),
],

bindings = [
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
],
);

const jsWorker :Workerd.Worker = (
const pyWorkerOld :Workerd.Worker = (
compatibilityFlags = [%PYTHON_FEATURE_FLAGS, "python_workflows", "no_python_workflows_implicit_dependencies"],

modules = [
(name = "workflow-old.py", pythonModule = embed "workflow-old.py"),
],

compatibilityFlags = ["nodejs_compat", "rpc", "disable_python_no_global_handlers"],
bindings = [
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
],
);

const jsWorker :Workerd.Worker = (
compatibilityFlags = ["nodejs_compat", "rpc"],

modules = [
(name = "worker", esModule = embed "worker.js"),
],

bindings = [
(name = "PythonWorkflow", service = (name = "py", entrypoint = "WorkflowEntrypointExample")),
(name = "PythonWorkflowBackwardsCompat", service = (name = "py", entrypoint = "WorkflowEntrypointBackwardsCompat")),
(name = "PythonWorkflowDepends", service = (name = "pyOld", entrypoint = "PythonWorkflowDepends")),
],
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) 2025 Cloudflare, Inc.
# Licensed under the Apache 2.0 license found in the LICENSE file or at:
# https://opensource.org/licenses/Apache-2.0

from workers import WorkflowEntrypoint


class PythonWorkflowDepends(WorkflowEntrypoint):
# The purpose of this workflow is testing that depends are no longer resolved
#
async def run(self, event, step):
@step.do("step_1")
async def step_1():
# tests backwards compat with workflows that don't have ctx in the step callback
print("Executing step 1")
return "foo"

@step.do("step_2")
async def step_2():
print("Executing step 2")
return "bar"

@step.do("step_3", depends=[step_1, step_2], concurrent=True)
async def step_3(result1=(), result2=()):
assert result1 == "foo"
assert result2 == "bar"
return result1 + result2

return await step_3()


async def test(ctrl, env, ctx):
pass
Loading
Loading