Skip to content

Commit 3e49cb0

Browse files
committed
Adds snippets for all step methods and event param
1 parent 3aff634 commit 3e49cb0

File tree

6 files changed

+260
-110
lines changed

6 files changed

+260
-110
lines changed

src/content/docs/workflows/build/workers-api.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ The `run` method can optionally return data, which is available when querying th
3535
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
3636
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
3737
// Steps here
38-
let someComputedState = step.do("my step", async () => { })
38+
let someComputedState = await step.do("my step", async () => { })
3939

4040
// Optional: return state from our run() method
4141
return someComputedState
Lines changed: 97 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
title: Bind to a Workflow
2+
title: Interact with a Workflow
33
pcx_content_type: concept
44
sidebar:
55
order: 3
@@ -11,7 +11,7 @@ import { WranglerConfig } from "~/components"
1111

1212
The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information.
1313

14-
From the configuration perspective, there is no difference between configuring a Python workflow or a Javascript one.
14+
From the configuration perspective, enabling Python Workflows requires adding the `python_workflows` compatibility flag to your `wrangler.toml` file.
1515

1616
<WranglerConfig>
1717

@@ -20,6 +20,7 @@ From the configuration perspective, there is no difference between configuring a
2020
name = "workflows-starter"
2121
main = "src/index.ts"
2222
compatibility_date = "2024-10-22"
23+
compatibility_flags = ["python_workflows", "python_workers"]
2324

2425
[[workflows]]
2526
# name of your workflow
@@ -32,28 +33,45 @@ class_name = "MyWorkflow"
3233

3334
</WranglerConfig>
3435

35-
### Create an instance via binding
3636

37-
Note that `env` is a Javascript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can
38-
access the binding like you would on a Javascript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.
39-
40-
Let's consider the previous binding called `MY_WORKFLOW`. Here's how you would create a new instance:
37+
And this is how you use the payload in your workflow:
4138

4239
```python
43-
async def on_fetch(request, env):
44-
instance = await env.MY_WORKFLOW.create()
45-
return Response.json({"status": "success"})
40+
from pyodide.ffi import to_js
41+
42+
class DemoWorkflowClass(WorkflowEntrypoint):
43+
async def run(self, event, step):
44+
@step.do('step-name')
45+
async def first_step():
46+
payload = event["payload"]
47+
return payload
4648
```
4749

48-
### Pass a payload to a workflow instance
50+
51+
## Workflow
52+
53+
The `Workflow` binding gives you access to the [Workflow](/workflows/build/workers-api/#workflow) class. All its methods are available
54+
on the binding.
55+
56+
Under the hood, the `Workflow` binding is a Javascript object that is exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy).
57+
This means that the values returned by its methods are also `JsProxy` objects, and need to be converted back into Python objects using `python_from_rpc`.
58+
59+
60+
### `create`
61+
62+
Create (trigger) a new instance of a given Workflow.
63+
64+
* <code>create(options=None)</code>
65+
* `options` - an **optional** dictionary of options to pass to the workflow instance. Should contain the same keys
66+
as the [WorkflowInstanceCreateOptions](/workflows/build/workers-api/#workflowinstancecreateoptions) type.
4967

5068
```python
5169
from pyodide.ffi import to_js
5270

5371
async def on_fetch(request, env, ctx):
5472
event = {"foo": "bar"}
55-
# to_js here is required because the binding goes through ffi. Not something we can wrap or override on the runtime
56-
await env.MY_WORKFLOW.create(to_js({"params": event}, dict_converter=Object.fromEntries))
73+
options = to_js({"params": event}, dict_converter=Object.fromEntries)
74+
await env.MY_WORKFLOW.create(options)
5775
return Response.json({"status": "success"})
5876
```
5977
:::note
@@ -62,16 +80,74 @@ Values returned from steps need to be converted into Javascript objects using `t
6280

6381
:::
6482

83+
The `create` method returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance. Note that this is a Javascript object, and not a Python object.
6584

66-
And this is how you use the payload in your workflow:
85+
### `create_batch`
86+
87+
Create (trigger) a batch of new workflow instances, up to 100 instances at a time. This is useful if you need to create multiple instances at once within the [instance creation limit](/workflows/reference/limits/).
88+
89+
* <code>create_batch(batch)</code>
90+
* `batch` - list of `WorkflowInstanceCreateOptions` to pass when creating an instance, including a user-provided ID and payload parameters.
91+
92+
Each element of the `batch` list is expected to include both `id` and `params` properties:
6793

6894
```python
6995
from pyodide.ffi import to_js
7096

71-
class DemoWorkflowClass(WorkflowEntrypoint):
72-
async def on_run(self, event, step):
73-
@step.do('step-name')
74-
async def first_step():
75-
payload = event["payload"]
76-
return payload
77-
```
97+
# Create a new batch of 3 Workflow instances, each with its own ID and pass params to the Workflow instances
98+
listOfInstances = [
99+
to_js({ "id": "id-abc123", "params": { "hello": "world-0" } }, dict_converter=Object.fromEntries),
100+
to_js({ "id": "id-def456", "params": { "hello": "world-1" } }, dict_converter=Object.fromEntries),
101+
to_js({ "id": "id-ghi789", "params": { "hello": "world-2" } }, dict_converter=Object.fromEntries)
102+
];
103+
104+
await env.MY_WORKFLOW.create_batch(listOfInstances);
105+
```
106+
107+
### `get`
108+
109+
Get a workflow instance by ID.
110+
111+
* <code>get(id)</code>
112+
* `id` - the ID of the workflow instance to get.
113+
114+
Returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance.
115+
116+
```python
117+
instance = await env.MY_WORKFLOW.get("abc-123")
118+
119+
# FFI methods available for WorkflowInstance
120+
await instance.status()
121+
await instance.pause()
122+
await instance.resume()
123+
await instance.restart()
124+
await instance.terminate()
125+
```
126+
127+
### `send_event`
128+
129+
Send an event to a workflow instance.
130+
131+
* <code>send_event(options)</code>
132+
* `type` - the type of event to send to the workflow instance.
133+
* `payload` - the payload to send to the workflow instance.
134+
135+
```python
136+
from pyodide.ffi import to_js
137+
138+
await env.MY_WORKFLOW.send_event(to_js({ "type": "my-event-type", "payload": { "foo": "bar" } }, dict_converter=Object.fromEntries))
139+
```
140+
141+
:::note
142+
143+
Values passed to `send_event` require explicit type translation into JS objects
144+
145+
:::
146+
147+
## REST API (HTTP)
148+
149+
Refer to the [Workflows REST API documentation](/api/resources/workflows/subresources/instances/methods/create/).
150+
151+
## Command line (CLI)
152+
153+
Refer to the [CLI quick start](/workflows/get-started/cli-quick-start/) to learn more about how to manage and trigger Workflows via the command-line.

src/content/docs/workflows/python/dag.mdx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ sidebar:
66

77
---
88

9-
The Python SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).
9+
The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).
1010

1111
```python
1212
from workers import WorkflowEntrypoint
1313

1414
class MyWorkflow(WorkflowEntrypoint):
15-
async def on_run(self, event, step):
15+
async def run(self, event, step):
1616
@step.do("dependency a")
1717
async def step_a():
1818
# do some work
@@ -31,6 +31,8 @@ class MyWorkflow(WorkflowEntrypoint):
3131
await my_final_step()
3232
```
3333

34+
On this example, `step_a` and `step_b` are run concurrently before execution of `my_final_step`, which depends on both of them.
35+
3436
Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.
3537

36-
This pattern is usefull for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
38+
This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.

src/content/docs/workflows/python/index.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
title: Python SDK
2+
title: Python Workflows SDK
33
pcx_content_type: navigation
44
sidebar:
55
order: 5
@@ -13,20 +13,20 @@ Refer to [Python Workers](/workers/languages/python) for more information about
1313

1414
:::caution[Python Workflows are in beta, as well as the underlying platform.]
1515

16-
You must add the `python_workflows` compatibility flag to your `wrangler.toml` file, as well as `python_workers`.
16+
You must add both `python_workflows` and `python_workers` compatibility flags to your `wrangler.toml` file.
1717

1818
Join the #python-workflows channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next.
1919
:::
2020

2121
## Get Started
2222

23-
Similarly to Typescript, the main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler. In a Python workflow, this handler is named `on_run`.
23+
The main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler.
2424

2525
```python
2626
from workers import WorkflowEntrypoint
2727

2828
class MyWorkflow(WorkflowEntrypoint):
29-
def on_run(self, event, step):
29+
async def run(self, event, step):
3030
# steps here
3131
```
3232

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
---
2+
title: Python Workers API
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 2
6+
7+
---
8+
9+
This guide covers the Python Workflows SDK, with instructions of how to build and create workflows using Python.
10+
11+
## WorkflowEntrypoint
12+
13+
The `WorkflowEntrypoint` is the main entrypoint for a Python workflow. It extends the `WorkflowEntrypoint` class, and implements the `run` method.
14+
15+
```python
16+
from workers import WorkflowEntrypoint
17+
18+
class MyWorkflow(WorkflowEntrypoint):
19+
def run(self, event, step):
20+
# steps here
21+
```
22+
23+
## WorkflowStep
24+
25+
* <code>step.do(name, depends=[], concurrent=False, config=None)</code> is a decorator that allows you to define a step in a workflow.
26+
* `name` - the name of the step.
27+
* `depends` - an optional list of steps that must complete before this step can run. See [DAG Workflows](/workflows/python/dag).
28+
* `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps.
29+
* `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object.
30+
31+
```python
32+
from workers import WorkflowEntrypoint
33+
34+
class MyWorkflow(WorkflowEntrypoint):
35+
async def run(self, event, step):
36+
@step.do("my first step")
37+
async def my_first_step():
38+
# do some work
39+
return "Hello World!"
40+
41+
await my_first_step()
42+
```
43+
44+
Note that the decorator doesn't make the call to the step, it just returns a callable that can be used to invoke the step. You have to call the callable to make the step run.
45+
46+
When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js)
47+
Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information.
48+
49+
* <code>step.sleep(name, duration)</code>
50+
51+
* `name` - the name of the step.
52+
* `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string.
53+
54+
```python
55+
async def run(self, event, step):
56+
await step.sleep("my-sleep-step", "10 seconds")
57+
```
58+
59+
* <code>step.sleep_until(name, timestamp)</code>
60+
61+
* `name` - the name of the step.
62+
* `timestamp` - a `datetime.datetime` object or seconds from the Unix epoch to sleep the workflow instance until.
63+
64+
```python
65+
async def run(self, event, step):
66+
await step.sleep_until("my-sleep-step", datetime.datetime.now() + datetime.timedelta(seconds=10))
67+
```
68+
69+
* <code>step.wait_for_event(name, event_type, timeout="24 hours")</code>
70+
71+
* `name` - the name of the step.
72+
* `event_type` - the type of event to wait for.
73+
* `timeout` - the timeout for the `wait_for_event` call. The default timeout is 24 hours.
74+
75+
```python
76+
async def run(self, event, step):
77+
await step.wait_for_event("my-wait-for-event-step", "my-event-type")
78+
```
79+
80+
81+
### `event` parameter
82+
83+
The `event` parameter is a dictionary that contains the payload passed to the workflow instance, along with other metadata:
84+
85+
* <code>payload</code> - the payload passed to the workflow instance.
86+
* <code>timestamp</code> - the timestamp that the workflow was triggered.
87+
* <code>instanceId</code> - the ID of the current workflow instance.
88+
* <code>workflowName</code> - the name of the workflow.
89+
90+
## Error Handling
91+
92+
Workflows semantics allow users to catch exceptions that get thrown to the top level.
93+
94+
Catching specific exceptions within an `except` block may not work, as some Python errors will not be re-instantiated into the same type of error when they are passed through the RPC layer.
95+
96+
:::note
97+
98+
Some built-in Python errors (e.g.: `ValueError`, `TypeError`) will work correctly. User defined exceptions, as well as other built-in Python errors will not and should be caught with the `Exception` class.
99+
100+
:::
101+
102+
```python
103+
async def run(self, event, step):
104+
async def try_step(fn):
105+
try:
106+
return await fn()
107+
except Exception as e:
108+
print(f"Successfully caught {type(e).__name__}: {e}")
109+
110+
@step.do("my_failing")
111+
async def my_failing():
112+
print("Executing my_failing")
113+
raise TypeError("Intentional error in my_failing")
114+
115+
await try_step(my_failing)
116+
```
117+
118+
### NonRetryableError
119+
120+
The Python Workflows SDK provides a `NonRetryableError` class that can be used to signal that a step should not be retried.
121+
122+
```python
123+
from workers.workflows import NonRetryableError
124+
125+
raise NonRetryableError(message)
126+
```
127+
128+
## Configure a workflow instance
129+
130+
You can bind a step to a specific retry policy by passing a `WorkflowStepConfig` object to the `config` parameter of the `step.do` decorator.
131+
With Python Workflows, you need to make sure that your `dict` respects the [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) type.
132+
133+
```python
134+
class DemoWorkflowClass(WorkflowEntrypoint):
135+
async def run(self, event, step):
136+
@step.do('step-name', config={"retries": {"limit": 1, "delay": "10 seconds"}})
137+
async def first_step():
138+
# do some work
139+
pass
140+
```
141+
142+
### Create an instance via binding
143+
144+
Note that `env` is a Javascript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can
145+
access the binding like you would on a Javascript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.
146+
147+
Let's consider the previous binding called `MY_WORKFLOW`. Here's how you would create a new instance:
148+
149+
```python
150+
async def on_fetch(request, env):
151+
instance = await env.MY_WORKFLOW.create()
152+
return Response.json({"status": "success"})
153+
```

0 commit comments

Comments
 (0)