Skip to content

Commit 3aff634

Browse files
committed
Creating new Python SDK folder
1 parent 32f67d4 commit 3aff634

File tree

4 files changed

+237
-0
lines changed

4 files changed

+237
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
---
2+
title: Bind to a Workflow
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 3
6+
group:
7+
hideIndex: true
8+
9+
---
10+
import { WranglerConfig } from "~/components"
11+
12+
The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information.
13+
14+
From the configuration perspective, there is no difference between configuring a Python workflow or a Javascript one.
15+
16+
<WranglerConfig>
17+
18+
```toml title="wrangler.toml"
19+
#:schema node_modules/wrangler/config-schema.json
20+
name = "workflows-starter"
21+
main = "src/index.ts"
22+
compatibility_date = "2024-10-22"
23+
24+
[[workflows]]
25+
# name of your workflow
26+
name = "workflows-starter"
27+
# binding name env.MY_WORKFLOW
28+
binding = "MY_WORKFLOW"
29+
# this is class that extends the Workflow class in src/index.ts
30+
class_name = "MyWorkflow"
31+
```
32+
33+
</WranglerConfig>
34+
35+
### Create an instance via binding
36+
37+
Note that `env` is a Javascript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can
38+
access the binding like you would on a Javascript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.
39+
40+
Let's consider the previous binding called `MY_WORKFLOW`. Here's how you would create a new instance:
41+
42+
```python
43+
async def on_fetch(request, env):
44+
instance = await env.MY_WORKFLOW.create()
45+
return Response.json({"status": "success"})
46+
```
47+
48+
### Pass a payload to a workflow instance
49+
50+
```python
51+
from pyodide.ffi import to_js
52+
53+
async def on_fetch(request, env, ctx):
54+
event = {"foo": "bar"}
55+
# to_js here is required because the binding goes through ffi. Not something we can wrap or override on the runtime
56+
await env.MY_WORKFLOW.create(to_js({"params": event}, dict_converter=Object.fromEntries))
57+
return Response.json({"status": "success"})
58+
```
59+
:::note
60+
61+
Values returned from steps need to be converted into Javascript objects using `to_js`. This is why we explicitly construct the payload using `Object.fromEntries`.
62+
63+
:::
64+
65+
66+
And this is how you use the payload in your workflow:
67+
68+
```python
69+
from pyodide.ffi import to_js
70+
71+
class DemoWorkflowClass(WorkflowEntrypoint):
72+
async def on_run(self, event, step):
73+
@step.do('step-name')
74+
async def first_step():
75+
payload = event["payload"]
76+
return payload
77+
```
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
title: DAG Workflows
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 4
6+
7+
---
8+
9+
The Python SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).
10+
11+
```python
12+
from workers import WorkflowEntrypoint
13+
14+
class MyWorkflow(WorkflowEntrypoint):
15+
async def on_run(self, event, step):
16+
@step.do("dependency a")
17+
async def step_a():
18+
# do some work
19+
return 10
20+
21+
@step.do("dependency b")
22+
async def step_b():
23+
# do some work
24+
return 20
25+
26+
@step.do("my final step", depends=[step_a, step_b], concurrent=True)
27+
async def my_final_step(result_a=0, result_b=0):
28+
# should return 30
29+
return result_a + result_b
30+
31+
await my_final_step()
32+
```
33+
34+
Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.
35+
36+
This pattern is usefull for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
title: Python SDK
3+
pcx_content_type: navigation
4+
sidebar:
5+
order: 5
6+
badge:
7+
text: Beta
8+
---
9+
10+
11+
Workflow entrypoints can be declared using Python. To achieve this, you can export a `WorkflowEntrypoint` that runs on the Cloudflare Workers platform.
12+
Refer to [Python Workers](/workers/languages/python) for more information about Python on the Workers runtime.
13+
14+
:::caution[Python Workflows are in beta, as well as the underlying platform.]
15+
16+
You must add the `python_workflows` compatibility flag to your `wrangler.toml` file, as well as `python_workers`.
17+
18+
Join the #python-workflows channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next.
19+
:::
20+
21+
## Get Started
22+
23+
Similarly to Typescript, the main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler. In a Python workflow, this handler is named `on_run`.
24+
25+
```python
26+
from workers import WorkflowEntrypoint
27+
28+
class MyWorkflow(WorkflowEntrypoint):
29+
def on_run(self, event, step):
30+
# steps here
31+
```
32+
33+
To run a Python Workflow locally, you use [Wrangler](/workers/wrangler/), the CLI for Cloudflare Workers:
34+
35+
```bash
36+
npx wrangler@latest dev
37+
```
38+
39+
To deploy a Python Workflow to Cloudflare, run [`wrangler deploy`](/workers/wrangler/commands/#deploy):
40+
41+
```bash
42+
npx wrangler@latest deploy
43+
```
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
---
2+
title: Run method
3+
pcx_content_type: concept
4+
sidebar:
5+
order: 2
6+
7+
---
8+
9+
The main difference between the [Typescript SDK](/workflows/build/workers-api/#run) and the Python SDK lives in the `run` method, and the parameters that it receives.
10+
11+
## WorkflowStep
12+
13+
* <code>step.do(name, depends=[], concurrent=False, config=None)</code> is a decorator that allows you to define a step in a workflow.
14+
* `name` - the name of the step.
15+
* `depends` - an optional list of steps that must complete before this step can run.
16+
* `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps.
17+
* `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object.
18+
19+
```python
20+
from workers import WorkflowEntrypoint
21+
22+
class MyWorkflow(WorkflowEntrypoint):
23+
async def on_run(self, event, step):
24+
@step.do("my first step")
25+
async def my_first_step():
26+
# do some work
27+
return "Hello World!"
28+
29+
await my_first_step()
30+
```
31+
32+
When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js)
33+
Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information.
34+
35+
* <code>step.sleep(name, duration)</code>
36+
37+
* `name` - the name of the step.
38+
* `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string.
39+
40+
* <code>step.sleep_until(name, timestamp)</code>
41+
42+
* `name` - the name of the step.
43+
* `timestamp` - a `datetime.date` object or seconds from the Unix epoch to sleep the Workflow instance until.
44+
45+
* <code>step.wait_for_event(name, event_type, timeout="24 hours")</code>
46+
47+
* `name` - the name of the step.
48+
* `event_type` - the type of event to wait for.
49+
* `timeout` - the timeout for the `waitForEvent` call. The default timeout is 24 hours.
50+
51+
## Error Handling
52+
53+
Workflows semantics allow users to catch exceptions that get thrown to the top level.
54+
55+
:::note
56+
Catching specific exceptions within an `except` block may not work, as some Python errors will not be re-instantiated into the same type of error when they are passed through the RPC layer.
57+
:::
58+
59+
### NonRetryableError
60+
61+
Similarly to the [Typescript SDK](/workflows/build/workers-api/#nonretryableerror), the Python SDK provides a `NonRetryableError` class that can be used to signal that a step should not be retried.
62+
63+
```python
64+
from workers.workflows import NonRetryableError
65+
66+
throw NonRetryableError(message)
67+
```
68+
69+
## Configure a workflow instance
70+
71+
You can bind a step to a specific retry policy by passing a `WorkflowStepConfig` object to the `config` parameter of the `step.do` decorator.
72+
In Python , you need to make sure that your `dict` respects the [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) type.
73+
74+
```python
75+
class DemoWorkflowClass(WorkflowEntrypoint):
76+
async def on_run(self, event, step):
77+
@step.do('step-name', config={"retries": {"limit": 1, "delay": "10 seconds"}})
78+
async def first_step():
79+
# do some work
80+
pass
81+
```

0 commit comments

Comments
 (0)