diff --git a/src/content/docs/workflows/python/bindings.mdx b/src/content/docs/workflows/python/bindings.mdx new file mode 100644 index 00000000000000..9e2404d9e70fdf --- /dev/null +++ b/src/content/docs/workflows/python/bindings.mdx @@ -0,0 +1,79 @@ +--- +title: Binding to a Workflow +pcx_content_type: concept +sidebar: + order: 3 + group: + hideIndex: true + +--- +import { WranglerConfig } from "~/components" + +The Python Workers platform leverages FFI to access bindings to Cloudflare resources. Refer to the [bindings](/workers/languages/python/ffi/#using-bindings-from-python-workers) documentation for more information. + +From the configuration perspective, there is no difference between configuring a Python workflow or a Javascript one. + + + +```toml title="wrangler.toml" +#:schema node_modules/wrangler/config-schema.json +name = "workflows-starter" +main = "src/index.ts" +compatibility_date = "2024-10-22" + +[[workflows]] +# name of your workflow +name = "workflows-starter" +# binding name env.MY_WORKFLOW +binding = "MY_WORKFLOW" +# this is class that extends the Workflow class in src/index.ts +class_name = "MyWorkflow" +``` + + + +### Creating an instance via binding + +Note that `env` is a Javascript object exposed to the Python script via [JsProxy](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.JsProxy). You can +access the binding like you would on a Javascript worker. Refer to the [Workflow binding documentation](/workflows/build/workers-api/#workflow) to learn more about the methods available. + +Let's consider the previous binding called `MY_WORKFLOW`. Here's how you would create a new instance: + +```python +async def on_fetch(request, env): + instance = await env.MY_WORKFLOW.create() + return Response.json({"status": "success"}) +``` + +### Passing a payload to a workflow instance + +```python +from pyodide.ffi import to_js + +async def on_fetch(request, env, ctx): + event = {"foo": "bar"} + # to_js here is required because the binding goes through ffi. Not something we can wrap or override on the runtime + await env.MY_WORKFLOW.create(to_js({"params": event}, dict_converter=Object.fromEntries)) + return Response.json({"status": "success"}) +``` +:::note + +Values returned from steps need to be converted into Javascript objects using `to_js`. This is why we explicitly construct the payload using `Object.fromEntries`. + +::: + + +And this is how you use the payload in your workflow: + +```python +from pyodide.ffi import to_js + +class DemoWorkflowClass(WorkflowEntrypoint): + async def on_run(self, event, step): + @step.do('step-name') + async def first_step(): + payload = event["payload"] + return payload +``` + + diff --git a/src/content/docs/workflows/python/dag.mdx b/src/content/docs/workflows/python/dag.mdx new file mode 100644 index 00000000000000..0c0a183a8ae683 --- /dev/null +++ b/src/content/docs/workflows/python/dag.mdx @@ -0,0 +1,36 @@ +--- +title: DAG Workflows +pcx_content_type: concept +sidebar: + order: 4 + +--- + +The Python SDK supports DAG workflows in a declarative way, using the `step.do` decorator with the `depends` parameter to define dependencies (other steps that must complete before this step can run). + +```python +from workers import WorkflowEntrypoint + +class MyWorkflow(WorkflowEntrypoint): + async def on_run(self, event, step): + @step.do("dependency a") + async def step_a(): + # do some work + return 10 + + @step.do("dependency b") + async def step_b(): + # do some work + return 20 + + @step.do("my final step", depends=[step_a, step_b], concurrent=True) + async def my_final_step(result_a=0, result_b=0): + # should return 30 + return result_a + result_b + + await my_final_step() +``` + +Having `concurrent=True` allows the dependencies to be resolved concurrently. If one of the callables passed to `depends` has already completed, it will be skipped and its return value will be reused. + +This pattern is usefull for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently. \ No newline at end of file diff --git a/src/content/docs/workflows/python/index.mdx b/src/content/docs/workflows/python/index.mdx new file mode 100644 index 00000000000000..853e08006f643e --- /dev/null +++ b/src/content/docs/workflows/python/index.mdx @@ -0,0 +1,13 @@ +--- +title: Python SDK +pcx_content_type: navigation +sidebar: + order: 5 + group: + hideIndex: true + +--- + +import { DirectoryListing } from "~/components" + + diff --git a/src/content/docs/workflows/python/overview.mdx b/src/content/docs/workflows/python/overview.mdx new file mode 100644 index 00000000000000..1575e5c49b5630 --- /dev/null +++ b/src/content/docs/workflows/python/overview.mdx @@ -0,0 +1,42 @@ +--- +title: Overview +pcx_content_type: get-started +sidebar: + order: 1 + badge: + text: Beta + +--- + +Workflow entrypoints can be declared using Python. To achieve this, you can export a `WorkflowEntrypoint` that runs on the Cloudflare Workers platform. +Refer to [Python Workers](/workers/languages/python) for more information about Python on the Workers runtime. + +:::caution[Python Workflows are in beta, as well as the underlying platform.] + +You must add the `python_workflows` compatibility flag to your `wrangler.toml` file, as well as `python_workers`. + +Join the #python-workflows channel in the [Cloudflare Developers Discord](https://discord.cloudflare.com/) and let us know what you'd like to see next. +::: + +## Get Started +Similarly to Typescript, the main entrypoint for a Python workflow is the [`WorkflowEntrypoint`](/workflows/build/workers-api/#workflowentrypoint) class. Your workflow logic should exist inside the [`run`](/workflows/build/workers-api/#run) handler. In a Python workflow, this handler is named `on_run`. + +```python +from workers import WorkflowEntrypoint + +class MyWorkflow(WorkflowEntrypoint): + def on_run(self, event, step): + # steps here +``` + +To run a Python Workflow locally, you use [Wrangler](/workers/wrangler/), the CLI for Cloudflare Workers: + +```bash +npx wrangler@latest dev +``` + +To deploy a Python Workflow to Cloudflare, run [`wrangler deploy`](/workers/wrangler/commands/#deploy): + +```bash +npx wrangler@latest deploy +``` diff --git a/src/content/docs/workflows/python/run.mdx b/src/content/docs/workflows/python/run.mdx new file mode 100644 index 00000000000000..73a693c9c0c555 --- /dev/null +++ b/src/content/docs/workflows/python/run.mdx @@ -0,0 +1,81 @@ +--- +title: Run method +pcx_content_type: concept +sidebar: + order: 2 + +--- + +The main difference between the [Typescript SDK](/workflows/build/workers-api/#run) and the Python SDK lives in the `run` method, and the parameters that it receives. + +## WorkflowStep + +* step.do(name, depends=[], concurrent=False, config=None) is a decorator that allows you to define a step in a workflow. + * `name` - the name of the step. + * `depends` - an optional list of steps that must complete before this step can run. + * `concurrent` - an optional boolean that indicates whether this step can run concurrently with other steps. + * `config` - an optional [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) for configuring [step specific retry behaviour](/workflows/build/sleeping-and-retrying/). This is passed as a Python dictionary and then type translated into a `WorkflowStepConfig` object. + +```python +from workers import WorkflowEntrypoint + +class MyWorkflow(WorkflowEntrypoint): + async def on_run(self, event, step): + @step.do("my first step") + async def my_first_step(): + # do some work + return "Hello World!" + + await my_first_step() +``` + +When returning state from a step, you must make sure that the returned value is serializable. Since steps run through an FFI layer, the returned value gets type translated via [FFI.](https://pyodide.org/en/stable/usage/api/python-api/ffi.html#pyodide.ffi.to_js) +Refer to [Pyodide's documentation](https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-pyproxy-to-js) regarding type conversions for more information. + +* step.sleep(name, duration) + + * `name` - the name of the step. + * `duration` - the duration to sleep until, in either seconds or as a `WorkflowDuration` compatible string. + +* step.sleep_until(name, timestamp) + + * `name` - the name of the step. + * `timestamp` - a `datetime.date` object or seconds from the Unix epoch to sleep the Workflow instance until. + +* step.wait_for_event(name, event_type, timeout="24 hours") + + * `name` - the name of the step. + * `event_type` - the type of event to wait for. + * `timeout` - the timeout for the `waitForEvent` call. The default timeout is 24 hours. + +## Error Handling + +Workflows semantics allow users to catch exceptions that get thrown to the top level. + +:::note +Catching specific exceptions within an `except` block may not work, as some Python errors will not be re-instantiated into the same type of error when they are passed through the RPC layer. +::: + +### NonRetryableError + +Similarly to the [Typescript SDK](/workflows/build/workers-api/#nonretryableerror), the Python SDK provides a `NonRetryableError` class that can be used to signal that a step should not be retried. + +```python +from workers.workflows import NonRetryableError + +throw NonRetryableError(message) +``` + +## Configuring a workflow instance + +You can bind a step to a specific retry policy by passing a `WorkflowStepConfig` object to the `config` parameter of the `step.do` decorator. +In Python , you need to make sure that your `dict` respects the [`WorkflowStepConfig`](/workflows/build/workers-api/#workflowstepconfig) type. + +```python +class DemoWorkflowClass(WorkflowEntrypoint): + async def on_run(self, event, step): + @step.do('step-name', config={"retries": {"limit": 1, "delay": "10 seconds"}}) + async def first_step(): + # do some work + pass +```