Skip to content

Commit 67b60b5

Browse files
OxyjunCaio-Nogueira
authored andcommitted
[Workflows] Creating new Python SDK folder (#24009)
* Creating new Python SDK folder * Adds snippets for all step methods and event param --------- Co-authored-by: Caio Nogueira <[email protected]>
1 parent c34b4a6 commit 67b60b5

File tree

5 files changed

+388
-1
lines changed

5 files changed

+388
-1
lines changed

src/content/docs/workflows/build/workers-api.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ The `run` method can optionally return data, which is available when querying th
3535
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
3636
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
3737
// Steps here
38-
let someComputedState = step.do("my step", async () => { })
38+
let someComputedState = await step.do("my step", async () => { })
3939

4040
// Optional: return state from our run() method
4141
return someComputedState
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
---
2+
title: Interact with a Workflow
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 3
6+
group:
7+
hideIndex: true
8+
9+
---
10+
import { WranglerConfig } from "~/components"
11+
12+
The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information.
13+
14+
From the configuration perspective, enabling Python Workflows requires adding the `python_workflows` compatibility flag to your `wrangler.toml` file.
15+
16+
<WranglerConfig>
17+
18+
```toml title="wrangler.toml"
19+
#:schema node_modules/wrangler/config-schema.json
20+
name = "workflows-starter"
21+
main = "src/index.ts"
22+
compatibility_date = "2024-10-22"
23+
compatibility_flags = ["python_workflows", "python_workers"]
24+
25+
[[workflows]]
26+
# name of your workflow
27+
name = "workflows-starter"
28+
# binding name env.MY_WORKFLOW
29+
binding = "MY_WORKFLOW"
30+
# this is class that extends the Workflow class in src/index.ts
31+
class_name = "MyWorkflow"
32+
```
33+
34+
</WranglerConfig>
35+
36+
37+
And this is how you use the payload in your workflow:
38+
39+
```python
40+
from pyodide.ffi import to_js
41+
42+
class DemoWorkflowClass(WorkflowEntrypoint):
43+
async def run(self, event, step):
44+
@step.do('step-name')
45+
async def first_step():
46+
payload = event["payload"]
47+
return payload
48+
```
49+
50+
51+
## Workflow
52+
53+
The `Workflow` binding gives you access to the [Workflow](/workflows/build/workers-api/#workflow) class. All its methods are available
54+
on the binding.
55+
56+
Under the hood, the `Workflow` binding is a Javascript object that is exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy).
57+
This means that the values returned by its methods are also `JsProxy` objects, and need to be converted back into Python objects using `python_from_rpc`.
58+
59+
60+
### `create`
61+
62+
Create (trigger) a new instance of a given Workflow.
63+
64+
* <code>create(options=None)</code>
65+
* `options` - an **optional** dictionary of options to pass to the workflow instance. Should contain the same keys
66+
as the [WorkflowInstanceCreateOptions](/workflows/build/workers-api/#workflowinstancecreateoptions) type.
67+
68+
```python
69+
from pyodide.ffi import to_js
70+
71+
async def on_fetch(request, env, ctx):
72+
event = {"foo": "bar"}
73+
options = to_js({"params": event}, dict_converter=Object.fromEntries)
74+
await env.MY_WORKFLOW.create(options)
75+
return Response.json({"status": "success"})
76+
```
77+
:::note
78+
79+
Values returned from steps need to be converted into Javascript objects using `to_js`. This is why we explicitly construct the payload using `Object.fromEntries`.
80+
81+
:::
82+
83+
The `create` method returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance. Note that this is a Javascript object, and not a Python object.
84+
85+
### `create_batch`
86+
87+
Create (trigger) a batch of new workflow instances, up to 100 instances at a time. This is useful if you need to create multiple instances at once within the [instance creation limit](/workflows/reference/limits/).
88+
89+
* <code>create_batch(batch)</code>
90+
* `batch` - list of `WorkflowInstanceCreateOptions` to pass when creating an instance, including a user-provided ID and payload parameters.
91+
92+
Each element of the `batch` list is expected to include both `id` and `params` properties:
93+
94+
```python
95+
from pyodide.ffi import to_js
96+
97+
# Create a new batch of 3 Workflow instances, each with its own ID and pass params to the Workflow instances
98+
listOfInstances = [
99+
to_js({ "id": "id-abc123", "params": { "hello": "world-0" } }, dict_converter=Object.fromEntries),
100+
to_js({ "id": "id-def456", "params": { "hello": "world-1" } }, dict_converter=Object.fromEntries),
101+
to_js({ "id": "id-ghi789", "params": { "hello": "world-2" } }, dict_converter=Object.fromEntries)
102+
];
103+
104+
await env.MY_WORKFLOW.create_batch(listOfInstances);
105+
```
106+
107+
### `get`
108+
109+
Get a workflow instance by ID.
110+
111+
* <code>get(id)</code>
112+
* `id` - the ID of the workflow instance to get.
113+
114+
Returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance.
115+
116+
```python
117+
instance = await env.MY_WORKFLOW.get("abc-123")
118+
119+
# FFI methods available for WorkflowInstance
120+
await instance.status()
121+
await instance.pause()
122+
await instance.resume()
123+
await instance.restart()
124+
await instance.terminate()
125+
```
126+
127+
### `send_event`
128+
129+
Send an event to a workflow instance.
130+
131+
* <code>send_event(options)</code>
132+
* `type` - the type of event to send to the workflow instance.
133+
* `payload` - the payload to send to the workflow instance.
134+
135+
```python
136+
from pyodide.ffi import to_js
137+
138+
await env.MY_WORKFLOW.send_event(to_js({ "type": "my-event-type", "payload": { "foo": "bar" } }, dict_converter=Object.fromEntries))
139+
```
140+
141+
:::note
142+
143+
Values passed to `send_event` require explicit type translation into JS objects.
144+
145+
:::
146+
147+
## REST API (HTTP)
148+
149+
Refer to the [Workflows REST API documentation](/api/resources/workflows/subresources/instances/methods/create/).
150+
151+
## Command line (CLI)
152+
153+
Refer to the [CLI quick start](/workflows/get-started/cli-quick-start/) to learn more about how to manage and trigger Workflows via the command-line.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
---
2+
title: DAG Workflows
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 4
6+
7+
---
8+
9+
The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).
10+
11+
```python
12+
from workers import WorkflowEntrypoint
13+
14+
class MyWorkflow(WorkflowEntrypoint):
15+
async def run(self, event, step):
16+
@step.do("dependency a")
17+
async def step_a():
18+
# do some work
19+
return 10
20+
21+
@step.do("dependency b")
22+
async def step_b():
23+
# do some work
24+
return 20
25+
26+
@step.do("my final step", depends=[step_a, step_b], concurrent=True)
27+
async def my_final_step(result_a=0, result_b=0):
28+
# should return 30
29+
return result_a + result_b
30+
31+
await my_final_step()
32+
```
33+
34+
On this example, `step_a` and `step_b` are run concurrently before execution of `my_final_step`, which depends on both of them.
35+
36+
Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.
37+
38+
This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
title: Python Workflows SDK
3+
pcx_content_type: navigation
4+
sidebar:
5+
order: 5
6+
badge:
7+
text: Beta
8+
---
9+
10+
11+
Workflow entrypoints can be declared using Python. To achieve this, you can export a `WorkflowEntrypoint` that runs on the Cloudflare Workers platform.
12+
Refer to [Python Workers](/workers/languages/python) for more information about Python on the Workers runtime.
13+
14+
:::caution[Python Workflows are in beta, as well as the underlying platform.]
15+
16+
You must add both `python_workflows` and `python_workers` compatibility flags to your `wrangler.toml` file.
17+
18+
Join the #python-workers channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next.
19+
:::
20+
21+
## Get Started
22+
23+
The main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler.
24+
25+
```python
26+
from workers import WorkflowEntrypoint
27+
28+
class MyWorkflow(WorkflowEntrypoint):
29+
async def run(self, event, step):
30+
# steps here
31+
```
32+
33+
To run a Python Workflow locally, you use [Wrangler](/workers/wrangler/), the CLI for Cloudflare Workers:
34+
35+
```bash
36+
npx wrangler@latest dev
37+
```
38+
39+
To deploy a Python Workflow to Cloudflare, run [`wrangler deploy`](/workers/wrangler/commands/#deploy):
40+
41+
```bash
42+
npx wrangler@latest deploy
43+
```

0 commit comments

Comments
 (0)