Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/content/docs/workflows/python/bindings.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
title: Binding to a Workflow
pcx_content_type: concept
sidebar:
order: 3
group:
hideIndex: true

---
import { WranglerConfig } from "~/components"

The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information.

From the configuration perspective, there is no difference between configuring a Python workflow or a Javascript one.

<WranglerConfig>

```toml title="wrangler.toml"
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"

[[workflows]]
# name of your workflow
name = "workflows-starter"
# binding name env.MY_WORKFLOW
binding = "MY_WORKFLOW"
# this is class that extends the Workflow class in src/index.ts
class_name = "MyWorkflow"
```

</WranglerConfig>

### Creating an instance via binding

Note that `env` is a Javascript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can
access the binding like you would on a Javascript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available.

Let's consider the previous binding called `MY_WORKFLOW`. Here's how you would create a new instance:

```python
async def on_fetch(request, env):
instance = await env.MY_WORKFLOW.create()
return Response.json({"status": "success"})
```

### Passing a payload to a workflow instance

```python
from pyodide.ffi import to_js

async def on_fetch(request, env, ctx):
event = {"foo": "bar"}
# to_js here is required because the binding goes through ffi. Not something we can wrap or override on the runtime
await env.MY_WORKFLOW.create(to_js({"params": event}, dict_converter=Object.fromEntries))
return Response.json({"status": "success"})
```
:::note

Values returned from steps need to be converted into Javascript objects using `to_js`. This is why we explicitly construct the payload using `Object.fromEntries`.

:::


And this is how you use the payload in your workflow:

```python
from pyodide.ffi import to_js

class DemoWorkflowClass(WorkflowEntrypoint):
async def on_run(self, event, step):
@step.do('step-name')
async def first_step():
payload = event["payload"]
return payload
```


36 changes: 36 additions & 0 deletions src/content/docs/workflows/python/dag.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
title: DAG Workflows
pcx_content_type: concept
sidebar:
order: 4

---

The Python SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def on_run(self, event, step):
@step.do("dependency a")
async def step_a():
# do some work
return 10

@step.do("dependency b")
async def step_b():
# do some work
return 20

@step.do("my final step", depends=[step_a, step_b], concurrent=True)
async def my_final_step(result_a=0, result_b=0):
# should return 30
return result_a + result_b

await my_final_step()
```

Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.

This pattern is usefull for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
13 changes: 13 additions & 0 deletions src/content/docs/workflows/python/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
title: Python SDK
pcx_content_type: navigation
sidebar:
order: 5
group:
hideIndex: true

---

import { DirectoryListing } from "~/components"

<DirectoryListing />
42 changes: 42 additions & 0 deletions src/content/docs/workflows/python/overview.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
title: Overview
pcx_content_type: get-started
sidebar:
order: 1
badge:
text: Beta

---

Workflow entrypoints can be declared using Python. To achieve this, you can export a `WorkflowEntrypoint` that runs on the Cloudflare Workers platform.
Refer to [Python Workers](/workers/languages/python) for more information about Python on the Workers runtime.

:::caution[Python Workflows are in beta, as well as the underlying platform.]

You must add the `python_workflows` compatibility flag to your `wrangler.toml` file, as well as `python_workers`.

Join the #python-workflows channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next.
:::

## Get Started
Similarly to Typescript, the main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler. In a Python workflow, this handler is named `on_run`.

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
def on_run(self, event, step):
# steps here
```

To run a Python Workflow locally, you use [Wrangler](/workers/wrangler/), the CLI for Cloudflare Workers:

```bash
npx wrangler@latest dev
```

To deploy a Python Workflow to Cloudflare, run [`wrangler deploy`](/workers/wrangler/commands/#deploy):

```bash
npx wrangler@latest deploy
```
81 changes: 81 additions & 0 deletions src/content/docs/workflows/python/run.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
title: Run method
pcx_content_type: concept
sidebar:
order: 2

---

The main difference between the [Typescript SDK](/workflows/build/workers-api/#run) and the Python SDK lives in the `run` method, and the parameters that it receives.

## WorkflowStep

* <code>step.do(name, depends=[], concurrent=False, config=None)</code> is a decorator that allows you to define a step in a workflow.
* `name` - the name of the step.
* `depends` - an optional list of steps that must complete before this step can run.
* `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps.
* `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object.

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def on_run(self, event, step):
@step.do("my first step")
async def my_first_step():
# do some work
return "Hello World!"

await my_first_step()
```

When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js)
Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information.

* <code>step.sleep(name, duration)</code>

* `name` - the name of the step.
* `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string.

* <code>step.sleep_until(name, timestamp)</code>

* `name` - the name of the step.
* `timestamp` - a `datetime.date` object or seconds from the Unix epoch to sleep the Workflow instance until.

* <code>step.wait_for_event(name, event_type, timeout="24 hours")</code>

* `name` - the name of the step.
* `event_type` - the type of event to wait for.
* `timeout` - the timeout for the `waitForEvent` call. The default timeout is 24 hours.

## Error Handling

Workflows semantics allow users to catch exceptions that get thrown to the top level.

:::note
Catching specific exceptions within an `except` block may not work, as some Python errors will not be re-instantiated into the same type of error when they are passed through the RPC layer.
:::

### NonRetryableError

Similarly to the [Typescript SDK](/workflows/build/workers-api/#nonretryableerror), the Python SDK provides a `NonRetryableError` class that can be used to signal that a step should not be retried.

```python
from workers.workflows import NonRetryableError

throw NonRetryableError(message)
```

## Configuring a workflow instance

You can bind a step to a specific retry policy by passing a `WorkflowStepConfig` object to the `config` parameter of the `step.do` decorator.
In Python , you need to make sure that your `dict` respects the [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) type.

```python
class DemoWorkflowClass(WorkflowEntrypoint):
async def on_run(self, event, step):
@step.do('step-name', config={"retries": {"limit": 1, "delay": "10 seconds"}})
async def first_step():
# do some work
pass
```
Loading