Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/content/docs/workflows/build/workers-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The `run` method can optionally return data, which is available when querying th
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Steps here
let someComputedState = step.do("my step", async () => { })
let someComputedState = await step.do("my step", async () => { })

// Optional: return state from our run() method
return someComputedState
Expand Down
153 changes: 153 additions & 0 deletions src/content/docs/workflows/python/bindings.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
---
title: Interact with a Workflow
pcx_content_type: concept
sidebar:
order: 3
group:
hideIndex: true

---
import { WranglerConfig } from "~/components"

The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information.

From the configuration perspective, enabling Python Workflows requires adding the `python_workflows` compatibility flag to your `wrangler.toml` file.

<WranglerConfig>

```toml title="wrangler.toml"
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"
compatibility_flags = ["python_workflows", "python_workers"]

[[workflows]]
# name of your workflow
name = "workflows-starter"
# binding name env.MY_WORKFLOW
binding = "MY_WORKFLOW"
# this is class that extends the Workflow class in src/index.ts
class_name = "MyWorkflow"
```

</WranglerConfig>


And this is how you use the payload in your workflow:

```python
from pyodide.ffi import to_js

class DemoWorkflowClass(WorkflowEntrypoint):
async def run(self, event, step):
@step.do('step-name')
async def first_step():
payload = event["payload"]
return payload
```


## Workflow

The `Workflow` binding gives you access to the [Workflow](/workflows/build/workers-api/#workflow) class. All its methods are available
on the binding.

Under the hood, the `Workflow` binding is a Javascript object that is exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy).
This means that the values returned by its methods are also `JsProxy` objects, and need to be converted back into Python objects using `python_from_rpc`.


### `create`

Create (trigger) a new instance of a given Workflow.

* <code>create(options=None)</code>
* `options` - an **optional** dictionary of options to pass to the workflow instance. Should contain the same keys
as the [WorkflowInstanceCreateOptions](/workflows/build/workers-api/#workflowinstancecreateoptions) type.

```python
from pyodide.ffi import to_js

async def on_fetch(request, env, ctx):
event = {"foo": "bar"}
options = to_js({"params": event}, dict_converter=Object.fromEntries)
await env.MY_WORKFLOW.create(options)
return Response.json({"status": "success"})
```
:::note

Values returned from steps need to be converted into Javascript objects using `to_js`. This is why we explicitly construct the payload using `Object.fromEntries`.

:::

The `create` method returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance. Note that this is a Javascript object, and not a Python object.

### `create_batch`

Create (trigger) a batch of new workflow instances, up to 100 instances at a time. This is useful if you need to create multiple instances at once within the [instance creation limit](/workflows/reference/limits/).

* <code>create_batch(batch)</code>
* `batch` - list of `WorkflowInstanceCreateOptions` to pass when creating an instance, including a user-provided ID and payload parameters.

Each element of the `batch` list is expected to include both `id` and `params` properties:

```python
from pyodide.ffi import to_js

# Create a new batch of 3 Workflow instances, each with its own ID and pass params to the Workflow instances
listOfInstances = [
to_js({ "id": "id-abc123", "params": { "hello": "world-0" } }, dict_converter=Object.fromEntries),
to_js({ "id": "id-def456", "params": { "hello": "world-1" } }, dict_converter=Object.fromEntries),
to_js({ "id": "id-ghi789", "params": { "hello": "world-2" } }, dict_converter=Object.fromEntries)
];

await env.MY_WORKFLOW.create_batch(listOfInstances);
```

### `get`

Get a workflow instance by ID.

* <code>get(id)</code>
* `id` - the ID of the workflow instance to get.

Returns a [`WorkflowInstance`](/workflows/build/workers-api/#workflowinstance) object, which can be used to query the status of the workflow instance.

```python
instance = await env.MY_WORKFLOW.get("abc-123")

# FFI methods available for WorkflowInstance
await instance.status()
await instance.pause()
await instance.resume()
await instance.restart()
await instance.terminate()
```

### `send_event`

Send an event to a workflow instance.

* <code>send_event(options)</code>
* `type` - the type of event to send to the workflow instance.
* `payload` - the payload to send to the workflow instance.

```python
from pyodide.ffi import to_js

await env.MY_WORKFLOW.send_event(to_js({ "type": "my-event-type", "payload": { "foo": "bar" } }, dict_converter=Object.fromEntries))
```

:::note

Values passed to `send_event` require explicit type translation into JS objects.

:::

## REST API (HTTP)

Refer to the [Workflows REST API documentation](/api/resources/workflows/subresources/instances/methods/create/).

## Command line (CLI)

Refer to the [CLI quick start](/workflows/get-started/cli-quick-start/) to learn more about how to manage and trigger Workflows via the command-line.
38 changes: 38 additions & 0 deletions src/content/docs/workflows/python/dag.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
title: DAG Workflows
pcx_content_type: concept
sidebar:
order: 4

---

The Python Workflows SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run).

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("dependency a")
async def step_a():
# do some work
return 10

@step.do("dependency b")
async def step_b():
# do some work
return 20

@step.do("my final step", depends=[step_a, step_b], concurrent=True)
async def my_final_step(result_a=0, result_b=0):
# should return 30
return result_a + result_b

await my_final_step()
```

On this example, `step_a` and `step_b` are run concurrently before execution of `my_final_step`, which depends on both of them.

Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused.

This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
43 changes: 43 additions & 0 deletions src/content/docs/workflows/python/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
title: Python Workflows SDK
pcx_content_type: navigation
sidebar:
order: 5
badge:
text: Beta
---


Workflow entrypoints can be declared using Python. To achieve this, you can export a `WorkflowEntrypoint` that runs on the Cloudflare Workers platform.
Refer to [Python Workers](/workers/languages/python) for more information about Python on the Workers runtime.

:::caution[Python Workflows are in beta, as well as the underlying platform.]

You must add both `python_workflows` and `python_workers` compatibility flags to your `wrangler.toml` file.

Join the #python-workers channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next.
:::

## Get Started

The main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler.

```python
from workers import WorkflowEntrypoint

class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
# steps here
```

To run a Python Workflow locally, you use [Wrangler](/workers/wrangler/), the CLI for Cloudflare Workers:

```bash
npx wrangler@latest dev
```

To deploy a Python Workflow to Cloudflare, run [`wrangler deploy`](/workers/wrangler/commands/#deploy):

```bash
npx wrangler@latest deploy
```
Loading