diff --git a/.github/workflows/durabletask-azuremanaged.yml b/.github/workflows/durabletask-azuremanaged.yml index e2215a3..9de61e3 100644 --- a/.github/workflows/durabletask-azuremanaged.yml +++ b/.github/workflows/durabletask-azuremanaged.yml @@ -67,7 +67,7 @@ jobs: pip install -r requirements.txt - name: Install durabletask-azuremanaged dependencies - working-directory: examples/dts + working-directory: examples run: | python -m pip install --upgrade pip pip install -r requirements.txt diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..6c2596b --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,13 @@ +# Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. + +When you submit a pull request, a CLA bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. \ No newline at end of file diff --git a/README.md b/README.md index b9d829c..49d6e0d 100644 --- a/README.md +++ b/README.md @@ -4,208 +4,18 @@ [![Build Validation](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml) [![PyPI version](https://badge.fury.io/py/durabletask.svg)](https://badge.fury.io/py/durabletask) -This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) and the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. +This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://github.com/Azure/Durable-Task-Scheduler). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code. -⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️ +> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). -> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python). - -## Supported patterns - -The following orchestration patterns are currently supported. - -### Function chaining - -An orchestration can chain a sequence of function calls using the following syntax: - -```python -# simple activity function that returns a greeting -def hello(ctx: task.ActivityContext, name: str) -> str: - return f'Hello {name}!' - -# orchestrator function that sequences the activity calls -def sequence(ctx: task.OrchestrationContext, _): - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - return [result1, result2, result3] -``` - -You can find the full sample [here](./examples/activity_sequence.py). - -### Fan-out/fan-in - -An orchestration can fan-out a dynamic number of function calls in parallel and then fan-in the results using the following syntax: - -```python -# activity function for getting the list of work items -def get_work_items(ctx: task.ActivityContext, _) -> List[str]: - # ... - -# activity function for processing a single work item -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - # ... - -# orchestrator function that fans-out the work items and then fans-in the results -def orchestrator(ctx: task.OrchestrationContext, _): - # the number of work-items is unknown in advance - work_items = yield ctx.call_activity(get_work_items) - - # fan-out: schedule the work items in parallel and wait for all of them to complete - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results = yield task.when_all(tasks) - - # fan-in: summarize and return the results - return {'work_items': work_items, 'results': results, 'total': sum(results)} -``` - -You can find the full sample [here](./examples/fanout_fanin.py). - -### Human interaction and durable timers - -An orchestration can wait for a user-defined event, such as a human approval event, before proceding to the next step. In addition, the orchestration can create a timer with an arbitrary duration that triggers some alternate action if the external event hasn't been received: - -```python -def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): - """Orchestrator function that represents a purchase order workflow""" - # Orders under $1000 are auto-approved - if order.Cost < 1000: - return "Auto-approved" - - # Orders of $1000 or more require manager approval - yield ctx.call_activity(send_approval_request, input=order) - - # Approvals must be received within 24 hours or they will be canceled. - approval_event = ctx.wait_for_external_event("approval_received") - timeout_event = ctx.create_timer(timedelta(hours=24)) - winner = yield task.when_any([approval_event, timeout_event]) - if winner == timeout_event: - return "Canceled" - - # The order was approved - yield ctx.call_activity(place_order, input=order) - approval_details = approval_event.get_result() - return f"Approved by '{approval_details.approver}'" -``` - -As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK. - -You can find the full sample [here](./examples/human_interaction.py). - -## Feature overview - -The following features are currently supported: - -### Orchestrations - -Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input. - -### Activities - -Activities are implemented using ordinary Python functions that take an `ActivityContext` as their first parameter. Activity functions are scheduled by orchestrations and have at-least-once execution guarantees, meaning that they will be executed at least once but may be executed multiple times in the event of a transient failure. Activity functions are where the real "work" of any orchestration is done. - -### Durable timers - -Orchestrations can schedule durable timers using the `create_timer` API. These timers are durable, meaning that they will survive orchestrator restarts and will fire even if the orchestrator is not actively in memory. Durable timers can be of any duration, from milliseconds to months. - -### Sub-orchestrations - -Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. - -### External events - -Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing. - -### Continue-as-new (TODO) - -Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input. - -### Suspend, resume, and terminate - -Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events. - -### Retry policies (TODO) - -Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. - -## Getting Started - -### Prerequisites - -- Python 3.9 -- A Durable Task-compatible sidecar, like [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) - -### Installing the Durable Task Python client SDK - -Installation is currently only supported from source. Ensure pip, setuptools, and wheel are up-to-date. - -```sh -python3 -m pip install --upgrade pip setuptools wheel -``` - -To install this package from source, clone this repository and run the following command from the project root: - -```sh -python3 -m pip install . -``` - -### Run the samples - -See the [examples](./examples) directory for a list of sample orchestrations and instructions on how to run them. - -## Development - -The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL. - -### Generating protobufs - -```sh -pip3 install -r dev-requirements.txt -make gen-proto -``` - -This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`. - -### Running unit tests - -Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running. - -```sh -make test-unit -``` - -### Running E2E tests - -The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command: - -```sh -go install github.com/microsoft/durabletask-go@main -durabletask-go --port 4001 -``` - -To run the E2E tests, run the following command from the project root: - -```sh -make test-e2e -``` - -## Contributing - -This project welcomes contributions and suggestions. Most contributions require you to agree to a -Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us -the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. - -When you submit a pull request, a CLA bot will automatically determine whether you need to provide -a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions -provided by the bot. You will only need to do this once across all repos using our CLA. - -This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). -For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or -contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. +# References +- [Supported Patterns](./docs/supported-patterns.md) +- [Available Features](./docs/features.md) +- [Getting Started](./docs/getting-started.md) +- [Development Guide](./docs/development.md) +- [Contributing Guide](./CONTRIBUTING.md) ## Trademarks - This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow [Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general). diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..3308316 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,35 @@ +# Development + +The following is more information about how to develop this project. Note that development commands require that `make` is installed on your local machine. If you're using Windows, you can install `make` using [Chocolatey](https://chocolatey.org/) or use WSL. + +### Generating protobufs + +```sh +pip3 install -r dev-requirements.txt +make gen-proto +``` + +This will download the `orchestrator_service.proto` from the `microsoft/durabletask-protobuf` repo and compile it using `grpcio-tools`. The version of the source proto file that was downloaded can be found in the file `durabletask/internal/PROTO_SOURCE_COMMIT_HASH`. + +### Running unit tests + +Unit tests can be run using the following command from the project root. Unit tests _don't_ require a sidecar process to be running. + +```sh +make test-unit +``` + +### Running E2E tests + +The E2E (end-to-end) tests require a sidecar process to be running. You can use the Durable Task test sidecar using the following `docker` command: + +```sh +go install github.com/microsoft/durabletask-go@main +durabletask-go --port 4001 +``` + +To run the E2E tests, run the following command from the project root: + +```sh +make test-e2e +``` \ No newline at end of file diff --git a/docs/features.md b/docs/features.md new file mode 100644 index 0000000..d5c1b8c --- /dev/null +++ b/docs/features.md @@ -0,0 +1,35 @@ +# Feature overview + +The following features are currently supported: + +### Orchestrations + +Orchestrations are implemented using ordinary Python functions that take an `OrchestrationContext` as their first parameter. The `OrchestrationContext` provides APIs for starting child orchestrations, scheduling activities, and waiting for external events, among other things. Orchestrations are fault-tolerant and durable, meaning that they can automatically recover from failures and rebuild their local execution state. Orchestrator functions must be deterministic, meaning that they must always produce the same output given the same input. + +### Activities + +Activities are implemented using ordinary Python functions that take an `ActivityContext` as their first parameter. Activity functions are scheduled by orchestrations and have at-least-once execution guarantees, meaning that they will be executed at least once but may be executed multiple times in the event of a transient failure. Activity functions are where the real "work" of any orchestration is done. + +### Durable timers + +Orchestrations can schedule durable timers using the `create_timer` API. These timers are durable, meaning that they will survive orchestrator restarts and will fire even if the orchestrator is not actively in memory. Durable timers can be of any duration, from milliseconds to months. + +### Sub-orchestrations + +Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. + +### External events + +Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing. + +### Continue-as-new + +Orchestrations can be continued as new using the `continue_as_new` API. This API allows an orchestration to restart itself from scratch, optionally with a new input. + +### Suspend, resume, and terminate + +Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events. + +### Retry policies + +Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error. \ No newline at end of file diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..4f31c22 --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,9 @@ +# Getting Started + +### Run the Order Processing Example +- Check out the [Durable Task Scheduler example](../examples/dts/sub-orchestrations-with-fan-out-fan-in/README.md) + for detailed instructions on running the order processing example. + +### Explore Other Samples +- Visit the [examples](../examples/dts/) directory to find a variety of sample orchestrations and learn how to run them. + diff --git a/docs/supported-patterns.md b/docs/supported-patterns.md new file mode 100644 index 0000000..bbac4a7 --- /dev/null +++ b/docs/supported-patterns.md @@ -0,0 +1,82 @@ +# Supported patterns + +The following orchestration patterns are currently supported. + +### Function chaining + +An orchestration can chain a sequence of function calls using the following syntax: + +```python +# simple activity function that returns a greeting +def hello(ctx: task.ActivityContext, name: str) -> str: + return f'Hello {name}!' + +# orchestrator function that sequences the activity calls +def sequence(ctx: task.OrchestrationContext, _): + result1 = yield ctx.call_activity(hello, input='Tokyo') + result2 = yield ctx.call_activity(hello, input='Seattle') + result3 = yield ctx.call_activity(hello, input='London') + + return [result1, result2, result3] +``` + +You can find the full sample [here](../examples/activity_sequence.py). + +### Fan-out/fan-in + +An orchestration can fan-out a dynamic number of function calls in parallel and then fan-in the results using the following syntax: + +```python +# activity function for getting the list of work items +def get_work_items(ctx: task.ActivityContext, _) -> List[str]: + # ... + +# activity function for processing a single work item +def process_work_item(ctx: task.ActivityContext, item: str) -> int: + # ... + +# orchestrator function that fans-out the work items and then fans-in the results +def orchestrator(ctx: task.OrchestrationContext, _): + # the number of work-items is unknown in advance + work_items = yield ctx.call_activity(get_work_items) + + # fan-out: schedule the work items in parallel and wait for all of them to complete + tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] + results = yield task.when_all(tasks) + + # fan-in: summarize and return the results + return {'work_items': work_items, 'results': results, 'total': sum(results)} +``` + +You can find the full sample [here](../examples/fanout_fanin.py). + +### Human interaction and durable timers + +An orchestration can wait for a user-defined event, such as a human approval event, before proceding to the next step. In addition, the orchestration can create a timer with an arbitrary duration that triggers some alternate action if the external event hasn't been received: + +```python +def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): + """Orchestrator function that represents a purchase order workflow""" + # Orders under $1000 are auto-approved + if order.Cost < 1000: + return "Auto-approved" + + # Orders of $1000 or more require manager approval + yield ctx.call_activity(send_approval_request, input=order) + + # Approvals must be received within 24 hours or they will be canceled. + approval_event = ctx.wait_for_external_event("approval_received") + timeout_event = ctx.create_timer(timedelta(hours=24)) + winner = yield task.when_any([approval_event, timeout_event]) + if winner == timeout_event: + return "Canceled" + + # The order was approved + yield ctx.call_activity(place_order, input=order) + approval_details = approval_event.get_result() + return f"Approved by '{approval_details.approver}'" +``` + +As an aside, you'll also notice that the example orchestration above works with custom business objects. Support for custom business objects includes support for custom classes, custom data classes, and named tuples. Serialization and deserialization of these objects is handled automatically by the SDK. + +You can find the full sample [here](../examples/human_interaction.py). \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 404b127..0912a60 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,27 +1,86 @@ -# Examples - -This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK. - -## Prerequisites - -All the examples assume that you have a Durable Task-compatible sidecar running locally. There are two options for this: - -1. Install the latest version of the [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/), which contains and exposes an embedded version of the Durable Task engine. The setup process (which requires Docker) will configure the workflow engine to store state in a local Redis container. - -2. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database. - -## Running the examples - -With one of the sidecars running, you can simply execute any of the examples in this directory using `python3`: - -```sh -python3 ./activity_sequence.py -``` - -In some cases, the sample may require command-line parameters or user inputs. In these cases, the sample will print out instructions on how to proceed. - -## List of examples - -- [Activity sequence](./activity_sequence.py): Orchestration that schedules three activity calls in a sequence. -- [Fan-out/fan-in](./fanout_fanin.py): Orchestration that schedules a dynamic number of activity calls in parallel, waits for all of them to complete, and then performs an aggregation on the results. -- [Human interaction](./human_interaction.py): Orchestration that waits for a human to approve an order before continuing. +# Examples + +This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). + +## Prerequisites +If using a deployed Durable Task Scheduler: + - [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) + - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) + +## Running the Examples +There are two separate ways to run an example: + +- Using the Emulator (recommended for learning and development) +- Using a deployed Scheduler and Taskhub in Azure + +### Running with the Emulator +We recommend using the emulator for learning and development as it's faster to set up and doesn't require any Azure resources. The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. + +1. Install Docker: If it is not already installed. + +2. Pull the Docker Image for the Emulator: +```bash +docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +3. Run the Emulator: Wait a few seconds for the container to be ready. +```bash +docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +4. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +Note: The example code has been updated to use the default emulator settings automatically (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Running with a Deployed Scheduler and Taskhub Resource in Azure +For production scenarios or when you're ready to deploy to Azure, you can create a taskhub using the Azure CLI: + +1. Create a Scheduler: +```bash +az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" +``` + +2. Create Your Taskhub: +```bash +az durabletask taskhub create --resource-group --scheduler-name --name +``` + +3. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + +4. Set the Environment Variables: +Bash: +```bash +export TASKHUB= +export ENDPOINT= +``` +Powershell: +```powershell +$env:TASKHUB = "" +$env:ENDPOINT = "" +``` + +5. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +### Running the Examples +You can now execute any of the examples in this directory using Python: + +```bash +python3 example_file.py +``` + +### Review Orchestration History and Status in the Durable Task Scheduler Dashboard +To access the Durable Task Scheduler Dashboard, follow these steps: + +- **Using the Emulator**: By default, the dashboard runs on portal 8082. Navigate to http://localhost:8082 and click on the default task hub. + +- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. Then, go to the Task Hub subresource that you are using and click on the dashboard URL in the top right corner. + +```sh +python3 activity_sequence.py +``` diff --git a/examples/activity_sequence.py b/examples/activity_sequence.py index 066a733..38c013d 100644 --- a/examples/activity_sequence.py +++ b/examples/activity_sequence.py @@ -1,35 +1,54 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that calls an activity function in a sequence and prints the outputs.""" -from durabletask import client, task, worker - - -def hello(ctx: task.ActivityContext, name: str) -> str: - """Activity function that returns a greeting""" - return f'Hello {name}!' - - -def sequence(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'hello' activity function in a sequence""" - # call "hello" activity function in a sequence - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - # return an array of results - return [result1, result2, result3] - - -# configure and start the worker -with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(sequence) - w.add_activity(hello) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = client.TaskHubGrpcClient() - instance_id = c.schedule_new_orchestration(sequence) - state = c.wait_for_orchestration_completion(instance_id, timeout=10) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') +"""End-to-end sample that demonstrates how to configure an orchestrator +that calls an activity function in a sequence and prints the outputs.""" +import os + +from azure.identity import DefaultAzureCredential + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + + +def hello(ctx: task.ActivityContext, name: str) -> str: + """Activity function that returns a greeting""" + return f'Hello {name}!' + + +def sequence(ctx: task.OrchestrationContext, _): + """Orchestrator function that calls the 'hello' activity function in a sequence""" + # call "hello" activity function in a sequence + result1 = yield ctx.call_activity(hello, input='Tokyo') + result2 = yield ctx.call_activity(hello, input='Seattle') + result3 = yield ctx.call_activity(hello, input='London') + + # return an array of results + return [result1, result2, result3] + + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# configure and start the worker - use secure_channel=False for emulator +secure_channel = endpoint != "http://localhost:8080" +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(sequence) + w.add_activity(hello) + w.start() + + # Construct the client and run the orchestrations + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + instance_id = c.schedule_new_orchestration(sequence) + state = c.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + elif state: + print(f'Orchestration failed: {state.failure_details}') diff --git a/examples/dts/README.md b/examples/dts/README.md deleted file mode 100644 index 8df2b75..0000000 --- a/examples/dts/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Examples - -This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). Please note that the installation instructions provided below will use the version of DTS directly from the your branch rather than installing through PyPI. - -## Prerequisites - -There are 2 separate ways to run an example: -1. Using the emulator. -2. Using a real scheduler and taskhub. - -All the examples by defualt assume that you have a Durable Task Scheduler taskhub created. - -## Running with a scheduler and taskhub resource -The simplest way to create a taskhub is by using the az cli commands: - -1. Create a scheduler: - az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{}" - -1. Create your taskhub - - ```bash - az durabletask taskhub create --resource-group --scheduler-name --name - ``` - -1. Retrieve the endpoint for the scheduler. This can be done by locating the taskhub in the portal. - -1. Set the appropriate environment variables for the TASKHUB and ENDPOINT - - ```bash - export TASKHUB= - export ENDPOINT= - ``` - -1. Since the samples rely on azure identity, ensure the package is installed and up-to-date - - ```bash - python3 -m pip install azure-identity - ``` - -1. Install the correct packages from the top level of this repository, i.e. durabletask-python/ - - ```bash - python3 -m pip install . - ``` - -1. Install the DTS specific packages from the durabletask-python/durabletask-azuremanaged directory - - ```bash - pip3 install -e . - ``` - -1. Grant yourself the `Durable Task Data Contributor` role over your scheduler - -## Running with the emulator -The emulator is a simulation of a scheduler and taskhub. It is the 'backend' of the durabletask-azuremanaged system packaged up into an easy to use docker container. For these steps, it is assumed that you are using port 8080. - -In order to use the emulator for the examples, perform the following steps: -1. Install docker if it is not already installed. - -2. Pull down the docker image for the emulator: - `docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.4` - -3. Run the emulator and wait a few seconds for the container to be ready: -`docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.4` - -4. Set the environment variables that are referenced and used in the examples: - 1. If you are using windows powershell: - `$env:TASKHUB="default"` - `$env:ENDPOINT="http://localhost:8080"` - 2. If you are using bash: - `export TASKHUB=default` - `export ENDPOINT=http://localhost:8080` - -5. Finally, edit the examples to change the `token_credential` input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to a value of `None` - - -## Running the examples - -Now, you can simply execute any of the examples in this directory using `python3`: - -```sh -python3 dts_activity_sequence.py -``` diff --git a/examples/dts/dts_activity_sequence.py b/examples/dts/dts_activity_sequence.py deleted file mode 100644 index 2ff3c22..0000000 --- a/examples/dts/dts_activity_sequence.py +++ /dev/null @@ -1,71 +0,0 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that calls an activity function in a sequence and prints the outputs.""" -import os - -from azure.identity import DefaultAzureCredential - -from durabletask import client, task -from durabletask.azuremanaged.client import DurableTaskSchedulerClient -from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker - - -def hello(ctx: task.ActivityContext, name: str) -> str: - """Activity function that returns a greeting""" - return f'Hello {name}!' - - -def sequence(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'hello' activity function in a sequence""" - # call "hello" activity function in a sequence - result1 = yield ctx.call_activity(hello, input='Tokyo') - result2 = yield ctx.call_activity(hello, input='Seattle') - result3 = yield ctx.call_activity(hello, input='London') - - # return an array of results - return [result1, result2, result3] - - -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -# Note that any azure-identity credential type and configuration can be used here as DTS supports various credential -# types such as Managed Identities -credential = DefaultAzureCredential() - -# configure and start the worker -with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) as w: - w.add_orchestrator(sequence) - w.add_activity(hello) - w.start() - - # Construct the client and run the orchestrations - c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) - instance_id = c.schedule_new_orchestration(sequence) - state = c.wait_for_orchestration_completion(instance_id, timeout=60) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') diff --git a/examples/dts/dts_fanout_fanin.py b/examples/dts/dts_fanout_fanin.py deleted file mode 100644 index 8ab68df..0000000 --- a/examples/dts/dts_fanout_fanin.py +++ /dev/null @@ -1,96 +0,0 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that a dynamic number activity functions in parallel, waits for them all -to complete, and prints an aggregate summary of the outputs.""" -import os -import random -import time - -from azure.identity import DefaultAzureCredential - -from durabletask import client, task -from durabletask.azuremanaged.client import DurableTaskSchedulerClient -from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker - - -def get_work_items(ctx: task.ActivityContext, _) -> list[str]: - """Activity function that returns a list of work items""" - # return a random number of work items - count = random.randint(2, 10) - print(f'generating {count} work items...') - return [f'work item {i}' for i in range(count)] - - -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - """Activity function that returns a result for a given work item""" - print(f'processing work item: {item}') - - # simulate some work that takes a variable amount of time - time.sleep(random.random() * 5) - - # return a result for the given work item, which is also a random number in this case - return random.randint(0, 10) - - -def orchestrator(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'get_work_items' and 'process_work_item' - activity functions in parallel, waits for them all to complete, and prints - an aggregate summary of the outputs""" - - work_items: list[str] = yield ctx.call_activity(get_work_items) - - # execute the work-items in parallel and wait for them all to return - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results: list[int] = yield task.when_all(tasks) - - # return an aggregate summary of the results - return { - 'work_items': work_items, - 'results': results, - 'total': sum(results), - } - - -# Read the environment variable -taskhub_name = os.getenv("TASKHUB") - -# Check if the variable exists -if taskhub_name: - print(f"The value of TASKHUB is: {taskhub_name}") -else: - print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") - print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") - print("If you are using bash, run the following: export TASKHUB=\"\"") - exit() - -# Read the environment variable -endpoint = os.getenv("ENDPOINT") - -# Check if the variable exists -if endpoint: - print(f"The value of ENDPOINT is: {endpoint}") -else: - print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") - print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") - print("If you are using bash, run the following: export ENDPOINT=\"\"") - exit() - -credential = DefaultAzureCredential() - -# configure and start the worker -with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) as w: - w.add_orchestrator(orchestrator) - w.add_activity(process_work_item) - w.add_activity(get_work_items) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, - taskhub=taskhub_name, token_credential=credential) - instance_id = c.schedule_new_orchestration(orchestrator) - state = c.wait_for_orchestration_completion(instance_id, timeout=30) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') - exit() diff --git a/examples/fanout_fanin.py b/examples/fanout_fanin.py index c53744f..a606731 100644 --- a/examples/fanout_fanin.py +++ b/examples/fanout_fanin.py @@ -1,62 +1,81 @@ -"""End-to-end sample that demonstrates how to configure an orchestrator -that a dynamic number activity functions in parallel, waits for them all -to complete, and prints an aggregate summary of the outputs.""" -import random -import time - -from durabletask import client, task, worker - - -def get_work_items(ctx: task.ActivityContext, _) -> list[str]: - """Activity function that returns a list of work items""" - # return a random number of work items - count = random.randint(2, 10) - print(f'generating {count} work items...') - return [f'work item {i}' for i in range(count)] - - -def process_work_item(ctx: task.ActivityContext, item: str) -> int: - """Activity function that returns a result for a given work item""" - print(f'processing work item: {item}') - - # simulate some work that takes a variable amount of time - time.sleep(random.random() * 5) - - # return a result for the given work item, which is also a random number in this case - return random.randint(0, 10) - - -def orchestrator(ctx: task.OrchestrationContext, _): - """Orchestrator function that calls the 'get_work_items' and 'process_work_item' - activity functions in parallel, waits for them all to complete, and prints - an aggregate summary of the outputs""" - - work_items: list[str] = yield ctx.call_activity(get_work_items) - - # execute the work-items in parallel and wait for them all to return - tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] - results: list[int] = yield task.when_all(tasks) - - # return an aggregate summary of the results - return { - 'work_items': work_items, - 'results': results, - 'total': sum(results), - } - - -# configure and start the worker -with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(orchestrator) - w.add_activity(process_work_item) - w.add_activity(get_work_items) - w.start() - - # create a client, start an orchestration, and wait for it to finish - c = client.TaskHubGrpcClient() - instance_id = c.schedule_new_orchestration(orchestrator) - state = c.wait_for_orchestration_completion(instance_id, timeout=30) - if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - elif state: - print(f'Orchestration failed: {state.failure_details}') +"""End-to-end sample that demonstrates how to configure an orchestrator +that a dynamic number activity functions in parallel, waits for them all +to complete, and prints an aggregate summary of the outputs.""" +import os +import random +import time + +from azure.identity import DefaultAzureCredential + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + + +def get_work_items(ctx: task.ActivityContext, _) -> list[str]: + """Activity function that returns a list of work items""" + # return a random number of work items + count = random.randint(2, 10) + print(f'generating {count} work items...') + return [f'work item {i}' for i in range(count)] + + +def process_work_item(ctx: task.ActivityContext, item: str) -> int: + """Activity function that returns a result for a given work item""" + print(f'processing work item: {item}') + + # simulate some work that takes a variable amount of time + time.sleep(random.random() * 5) + + # return a result for the given work item, which is also a random number in this case + return random.randint(0, 10) + + +def orchestrator(ctx: task.OrchestrationContext, _): + """Orchestrator function that calls the 'get_work_items' and 'process_work_item' + activity functions in parallel, waits for them all to complete, and prints + an aggregate summary of the outputs""" + + work_items: list[str] = yield ctx.call_activity(get_work_items) + + # execute the work-items in parallel and wait for them all to return + tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] + results: list[int] = yield task.when_all(tasks) + + # return an aggregate summary of the results + return { + 'work_items': work_items, + 'results': results, + 'total': sum(results), + } + + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# configure and start the worker - use secure_channel=False for emulator +secure_channel = endpoint != "http://localhost:8080" +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(orchestrator) + w.add_activity(process_work_item) + w.add_activity(get_work_items) + w.start() + + # create a client, start an orchestration, and wait for it to finish + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + instance_id = c.schedule_new_orchestration(orchestrator) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + elif state: + print(f'Orchestration failed: {state.failure_details}') + exit() diff --git a/examples/human_interaction.py b/examples/human_interaction.py index 2a01897..ae93cd2 100644 --- a/examples/human_interaction.py +++ b/examples/human_interaction.py @@ -3,13 +3,18 @@ the approval isn't received within a specified timeout, the order that is represented by the orchestration is automatically cancelled.""" +import os import threading import time from collections import namedtuple from dataclasses import dataclass from datetime import timedelta +from azure.identity import DefaultAzureCredential + from durabletask import client, task, worker +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker @dataclass @@ -63,37 +68,87 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): parser.add_argument("--cost", type=int, default=2000, help="Cost of the order") parser.add_argument("--approver", type=str, default="Me", help="Approver name") parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds") + parser.add_argument("--local", action="store_true", help="Use local worker instead of DurableTaskScheduler") args = parser.parse_args() - # configure and start the worker - with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(purchase_order_workflow) - w.add_activity(send_approval_request) - w.add_activity(place_order) - w.start() - - c = client.TaskHubGrpcClient() - - # Start a purchase order workflow using the user input - order = Order(args.cost, "MyProduct", 1) - instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) - - def prompt_for_approval(): - input("Press [ENTER] to approve the order...\n") - approval_event = namedtuple("Approval", ["approver"])(args.approver) - c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) - - # Prompt the user for approval on a background thread - threading.Thread(target=prompt_for_approval, daemon=True).start() - - # Wait for the orchestration to complete - try: - state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) - if not state: - print("Workflow not found!") # not expected - elif state.runtime_status == client.OrchestrationStatus.COMPLETED: - print(f'Orchestration completed! Result: {state.serialized_output}') - else: - state.raise_if_failed() # raises an exception - except TimeoutError: - print("*** Orchestration timed out!") + if args.local: + # Use local worker (original implementation) + with worker.TaskHubGrpcWorker() as w: + w.add_orchestrator(purchase_order_workflow) + w.add_activity(send_approval_request) + w.add_activity(place_order) + w.start() + + c = client.TaskHubGrpcClient() + + # Start a purchase order workflow using the user input + order = Order(args.cost, "MyProduct", 1) + instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) + + def prompt_for_approval(): + input("Press [ENTER] to approve the order...\n") + approval_event = namedtuple("Approval", ["approver"])(args.approver) + c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) + + # Prompt the user for approval on a background thread + threading.Thread(target=prompt_for_approval, daemon=True).start() + + # Wait for the orchestration to complete + try: + state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + else: + state.raise_if_failed() # raises an exception + except TimeoutError: + print("*** Orchestration timed out!") + else: + # Use DurableTaskScheduler + # Use environment variables if provided, otherwise use default emulator values + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + # Configure and start the worker - use secure_channel=False for emulator + secure_channel = endpoint != "http://localhost:8080" + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) as w: + w.add_orchestrator(purchase_order_workflow) + w.add_activity(send_approval_request) + w.add_activity(place_order) + w.start() + + # Construct the client and run the orchestrations + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, + taskhub=taskhub_name, token_credential=credential) + + # Start a purchase order workflow using the user input + order = Order(args.cost, "MyProduct", 1) + instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order) + + def prompt_for_approval(): + input("Press [ENTER] to approve the order...\n") + approval_event = namedtuple("Approval", ["approver"])(args.approver) + c.raise_orchestration_event(instance_id, "approval_received", data=approval_event) + + # Prompt the user for approval on a background thread + threading.Thread(target=prompt_for_approval, daemon=True).start() + + # Wait for the orchestration to complete + try: + state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + else: + state.raise_if_failed() # raises an exception + except TimeoutError: + print("*** Orchestration timed out!") diff --git a/examples/dts/requirements.txt b/examples/requirements.txt similarity index 100% rename from examples/dts/requirements.txt rename to examples/requirements.txt diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/README.md b/examples/sub-orchestrations-with-fan-out-fan-in/README.md new file mode 100644 index 0000000..8e73e78 --- /dev/null +++ b/examples/sub-orchestrations-with-fan-out-fan-in/README.md @@ -0,0 +1,94 @@ +# Portable SDK Sample for Sub Orchestrations and Fan-out / Fan-in + +This sample demonstrates how to use the Durable Task SDK, also known as the Portable SDK, with the Durable Task Scheduler to create orchestrations. These orchestrations not only spin off child orchestrations but also perform parallel processing by leveraging the fan-out/fan-in application pattern. + +The scenario showcases an order processing system where orders are processed in batches. + +> Note, for simplicity, this code is contained within a single source file. In real practice, you would have + +# Prerequisites +If using a deployed Durable Task Scheduler: + - [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) + - [`az durabletask` CLI extension](https://learn.microsoft.com/en-us/cli/azure/durabletask?view=azure-cli-latest) + +## Running the Examples +There are two separate ways to run an example: + +- Using the Emulator (recommended for learning and development) +- Using a deployed Scheduler and Taskhub in Azure + +### Running with the Emulator +We recommend using the emulator for learning and development as it's faster to set up and doesn't require any Azure resources. The emulator simulates a scheduler and taskhub, packaged into an easy-to-use Docker container. + +1. Install Docker: If it is not already installed. + +2. Pull the Docker Image for the Emulator: +```bash +docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +3. Run the Emulator: Wait a few seconds for the container to be ready. +```bash +docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 +``` + +4. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +Note: The example code has been updated to use the default emulator settings automatically (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Running with a Deployed Scheduler and Taskhub Resource in Azure +For production scenarios or when you're ready to deploy to Azure, you can create a taskhub using the Azure CLI: + +1. Create a Scheduler: +```bash +az durabletask scheduler create --resource-group --name --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" +``` + +2. Create Your Taskhub: +```bash +az durabletask taskhub create --resource-group --scheduler-name --name +``` + +3. Retrieve the Endpoint for the Scheduler: Locate the taskhub in the Azure portal to find the endpoint. + +4. Set the Environment Variables: +Bash: +```bash +export TASKHUB= +export ENDPOINT= +``` +Powershell: +```powershell +$env:TASKHUB = "" +$env:ENDPOINT = "" +``` + +5. Install the Required Packages: +```bash +pip install -r requirements.txt +``` + +### Running the Examples +You can now execute the sample using Python: + +Start the worker and ensure the TASKHUB and ENDPOINT environment variables are set in your shell: +```bash +python3 ./worker.py +``` + +Next, start the orchestrator and make sure the TASKHUB and ENDPOINT environment variables are set in your shell: +```bash +python3 ./orchestrator.py +``` + +You should start seeing logs for processing orders in both shell outputs. + +### Review Orchestration History and Status in the Durable Task Scheduler Dashboard +To access the Durable Task Scheduler Dashboard, follow these steps: + +- **Using the Emulator**: By default, the dashboard runs on portal 8082. Navigate to http://localhost:8082 and click on the default task hub. + +- **Using a Deployed Scheduler**: Navigate to the Scheduler resource. Then, go to the Task Hub subresource that you are using and click on the dashboard URL in the top right corner. diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py new file mode 100644 index 0000000..a5e013b --- /dev/null +++ b/examples/sub-orchestrations-with-fan-out-fan-in/orchestrator.py @@ -0,0 +1,28 @@ +import os +from azure.identity import DefaultAzureCredential +from durabletask import client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# Create a client, start an orchestration, and wait for it to finish +c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=credential) + +instance_id = c.schedule_new_orchestration("orchestrator") + +state = c.wait_for_orchestration_completion(instance_id, timeout=30) + +if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') +elif state: + print(f'Orchestration failed: {state.failure_details}') +exit() diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/requirements.txt b/examples/sub-orchestrations-with-fan-out-fan-in/requirements.txt new file mode 100644 index 0000000..5339ebe --- /dev/null +++ b/examples/sub-orchestrations-with-fan-out-fan-in/requirements.txt @@ -0,0 +1,2 @@ +durabletask-azuremanaged +azure-identity \ No newline at end of file diff --git a/examples/sub-orchestrations-with-fan-out-fan-in/worker.py b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py new file mode 100644 index 0000000..8ca447d --- /dev/null +++ b/examples/sub-orchestrations-with-fan-out-fan-in/worker.py @@ -0,0 +1,139 @@ +import os +import random +import time +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + + +def get_orders(ctx, _) -> list[str]: + """Activity function that returns a list of work items""" + # return a random number of work items + count = random.randint(2, 10) + print(f'generating {count} orders...') + return [f'order {i}' for i in range(count)] + + +def check_and_update_inventory(ctx, order: str) -> str: + """Activity function that checks inventory for a given order""" + print(f'checking inventory for order: {order}') + + # simulate inventory check + time.sleep(random.random() * 2) + + # return a random boolean indicating if the item is in stock + return random.choices([True, False], weights=[9, 1]) + + +def charge_payment(ctx, order: str) -> bool: + """Activity function that charges payment for a given order""" + print(f'charging payment for order: {order}') + + # simulate payment processing + time.sleep(random.random() * 2) + + # return a random boolean indicating if the payment was successful + return random.choices([True, False], weights=[9, 1]) + + +def ship_order(ctx, order: str) -> bool: + """Activity function that ships a given order""" + print(f'shipping order: {order}') + + # simulate shipping process + time.sleep(random.random() * 2) + + # return a random boolean indicating if the shipping was successful + return random.choices([True, False], weights=[9, 1]) + + +def notify_customer(ctx, order: str) -> bool: + """Activity function that notifies the customer about the order status""" + print(f'notifying customer about order: {order}') + + # simulate customer notification + time.sleep(random.random() * 2) + + # return a random boolean indicating if the notification was successful + return random.choices([True, False], weights=[9, 1]) + + +def process_order(ctx, order: str) -> dict: + """Sub-orchestration function that processes a given order by performing all steps""" + print(f'processing order: {order}') + + # Check inventory + inventory_checked = yield ctx.call_activity('check_and_update_inventory', input=order) + + if not inventory_checked: + return {'order': order, 'status': 'failed', 'reason': 'out of stock'} + + # Charge payment + payment_charged = yield ctx.call_activity('charge_payment', input=order) + + if not payment_charged: + return {'order': order, 'status': 'failed', 'reason': 'payment failed'} + + # Ship order + order_shipped = yield ctx.call_activity('ship_order', input=order) + + if not order_shipped: + return {'order': order, 'status': 'failed', 'reason': 'shipping failed'} + + # Notify customer + customer_notified = yield ctx.call_activity('notify_customer', input=order) + + if not customer_notified: + return {'order': order, 'status': 'failed', 'reason': 'customer notification failed'} + + # Return success status + return {'order': order, 'status': 'completed'} + + +def orchestrator(ctx, _): + """Orchestrator function that calls the 'get_orders' and 'process_order' + sub-orchestration functions in parallel, waits for them all to complete, and prints + an aggregate summary of the outputs""" + + orders: list[str] = yield ctx.call_activity('get_orders') + + # Execute the orders in parallel and wait for them all to return + tasks = [ctx.call_sub_orchestrator(process_order, input=order) for order in orders] + results: list[dict] = yield task.when_all(tasks) + + # Return an aggregate summary of the results + return { + 'orders': orders, + 'results': results, + 'total_completed': sum(1 for result in results if result['status'] == 'completed'), + 'total_failed': sum(1 for result in results if result['status'] == 'failed'), + 'details': results, + } + + +# Use environment variables if provided, otherwise use default emulator values +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +print(f"Using taskhub: {taskhub_name}") +print(f"Using endpoint: {endpoint}") + +# Set credential to None for emulator, or DefaultAzureCredential for Azure +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + +# Configure and start the worker +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=credential) as w: + + w.add_orchestrator(orchestrator) + w.add_orchestrator(process_order) + w.add_activity(get_orders) + w.add_activity(check_and_update_inventory) + w.add_activity(charge_payment) + w.add_activity(ship_order) + w.add_activity(notify_customer) + + w.start() + + while True: + time.sleep(1)