diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index 70ff470..dddcc53 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -1,50 +1,58 @@ -# This workflow will install Python dependencies, run tests and lint with a variety of Python versions -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python - -name: Build Validation - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - build: - - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] - - steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install flake8 pytest - pip install -r requirements.txt - - name: Lint with flake8 - run: | - flake8 . --count --show-source --statistics --exit-zero - - name: Pytest unit tests - run: | - pytest -m "not e2e" --verbose - - # Sidecar for running e2e tests requires Go SDK - - name: Install Go SDK - uses: actions/setup-go@v5 - with: - go-version: 'stable' - - # Install and run the durabletask-go sidecar for running e2e tests - - name: Pytest e2e tests - run: | - go install github.com/microsoft/durabletask-go@main - durabletask-go --port 4001 & - pytest -m "e2e" --verbose +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Build Validation + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + merge_group: + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install durabletask dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install -r requirements.txt + - name: Install durabletask-azuremanaged dependencies + working-directory: examples/dts + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Lint with flake8 + run: | + flake8 . --count --show-source --statistics --exit-zero + - name: Pytest unit tests + working-directory: tests/durabletask + run: | + pytest -m "not e2e and not dts" --verbose + + # Sidecar for running e2e tests requires Go SDK + - name: Install Go SDK + uses: actions/setup-go@v5 + with: + go-version: 'stable' + + # Install and run the durabletask-go sidecar for running e2e tests + - name: Pytest e2e tests + working-directory: tests/durabletask + run: | + go install github.com/microsoft/durabletask-go@main + durabletask-go --port 4001 & + pytest -m "e2e and not dts" --verbose diff --git a/.github/workflows/publish-dts-sdk.yml b/.github/workflows/publish-dts-sdk.yml new file mode 100644 index 0000000..3b4e397 --- /dev/null +++ b/.github/workflows/publish-dts-sdk.yml @@ -0,0 +1,110 @@ +name: Publish Durable Task Scheduler to PyPI + +on: + push: + branches: + - "main" + tags: + - "azuremanaged-v*" # Only run for tags starting with "azuremanaged-v" + pull_request: + branches: + - "main" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: 3.12 + - name: Install dependencies + working-directory: durabletask-azuremanaged + run: | + python -m pip install --upgrade pip + pip install setuptools wheel tox + pip install flake8 + - name: Run flake8 Linter + working-directory: durabletask-azuremanaged + run: flake8 . + + run-docker-tests: + env: + EMULATOR_VERSION: "v0.0.4" # Define the variable + needs: lint + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Pull Docker image + run: docker pull mcr.microsoft.com/dts/dts-emulator:$EMULATOR_VERSION + + - name: Run Docker container + run: | + docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:$EMULATOR_VERSION + + - name: Wait for container to be ready + run: sleep 10 # Adjust if your service needs more time to start + + - name: Set environment variables + run: | + echo "TASKHUB=default" >> $GITHUB_ENV + echo "ENDPOINT=http://localhost:8080" >> $GITHUB_ENV + + - name: Install durabletask dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install -r requirements.txt + + - name: Install durabletask-azuremanaged dependencies + working-directory: examples/dts + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run the tests + working-directory: tests/durabletask-azuremanaged + run: | + pytest -m "dts" --verbose + + publish: + if: startsWith(github.ref, 'refs/tags/azuremanaged-v') # Only run if a matching tag is pushed + needs: run-docker-tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azuremanaged-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Build package from directory durabletask-azuremanaged + working-directory: durabletask-azuremanaged + run: | + python -m build + + - name: Check package + working-directory: durabletask-azuremanaged + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREMANAGED }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azuremanaged + run: | + twine upload dist/* \ No newline at end of file diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/client.py b/durabletask-azuremanaged/durabletask/azuremanaged/client.py index f641eae..1d8cecd 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/client.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/client.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. from azure.core.credentials import TokenCredential +from typing import Optional from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \ DTSDefaultClientInterceptorImpl @@ -13,7 +14,7 @@ class DurableTaskSchedulerClient(TaskHubGrpcClient): def __init__(self, *, host_address: str, taskhub: str, - token_credential: TokenCredential, + token_credential: Optional[TokenCredential], secure_channel: bool = True): if not taskhub: diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/internal/durabletask_grpc_interceptor.py b/durabletask-azuremanaged/durabletask/azuremanaged/internal/durabletask_grpc_interceptor.py index a23cac9..077905e 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/internal/durabletask_grpc_interceptor.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/internal/durabletask_grpc_interceptor.py @@ -2,6 +2,8 @@ # Licensed under the MIT License. import grpc +from typing import Optional + from azure.core.credentials import TokenCredential from durabletask.azuremanaged.internal.access_token_manager import \ @@ -15,7 +17,7 @@ class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl): StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an interceptor to add additional headers to all calls as needed.""" - def __init__(self, token_credential: TokenCredential, taskhub_name: str): + def __init__(self, token_credential: Optional[TokenCredential], taskhub_name: str): self._metadata = [("taskhub", taskhub_name)] super().__init__(self._metadata) diff --git a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py index d10c2f7..8bdff3d 100644 --- a/durabletask-azuremanaged/durabletask/azuremanaged/worker.py +++ b/durabletask-azuremanaged/durabletask/azuremanaged/worker.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. from azure.core.credentials import TokenCredential +from typing import Optional from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \ DTSDefaultClientInterceptorImpl @@ -13,7 +14,7 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker): def __init__(self, *, host_address: str, taskhub: str, - token_credential: TokenCredential, + token_credential: Optional[TokenCredential], secure_channel: bool = True): if not taskhub: diff --git a/durabletask-azuremanaged/pyproject.toml b/durabletask-azuremanaged/pyproject.toml index ac6be6f..c4c8a96 100644 --- a/durabletask-azuremanaged/pyproject.toml +++ b/durabletask-azuremanaged/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask.azuremanaged" -version = "0.1b1" +version = "0.1.2" description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure" keywords = [ "durable", diff --git a/examples/dts/README.md b/examples/dts/README.md index 9b4a3fd..8df2b75 100644 --- a/examples/dts/README.md +++ b/examples/dts/README.md @@ -4,8 +4,13 @@ This directory contains examples of how to author durable orchestrations using t ## Prerequisites -All the examples assume that you have a Durable Task Scheduler taskhub created. +There are 2 separate ways to run an example: +1. Using the emulator. +2. Using a real scheduler and taskhub. +All the examples by defualt assume that you have a Durable Task Scheduler taskhub created. + +## Running with a scheduler and taskhub resource The simplest way to create a taskhub is by using the az cli commands: 1. Create a scheduler: @@ -46,6 +51,29 @@ The simplest way to create a taskhub is by using the az cli commands: 1. Grant yourself the `Durable Task Data Contributor` role over your scheduler +## Running with the emulator +The emulator is a simulation of a scheduler and taskhub. It is the 'backend' of the durabletask-azuremanaged system packaged up into an easy to use docker container. For these steps, it is assumed that you are using port 8080. + +In order to use the emulator for the examples, perform the following steps: +1. Install docker if it is not already installed. + +2. Pull down the docker image for the emulator: + `docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.4` + +3. Run the emulator and wait a few seconds for the container to be ready: +`docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.4` + +4. Set the environment variables that are referenced and used in the examples: + 1. If you are using windows powershell: + `$env:TASKHUB="default"` + `$env:ENDPOINT="http://localhost:8080"` + 2. If you are using bash: + `export TASKHUB=default` + `export ENDPOINT=http://localhost:8080` + +5. Finally, edit the examples to change the `token_credential` input of both the `DurableTaskSchedulerWorker` and `DurableTaskSchedulerClient` to a value of `None` + + ## Running the examples Now, you can simply execute any of the examples in this directory using `python3`: diff --git a/examples/dts/requirements.txt b/examples/dts/requirements.txt new file mode 100644 index 0000000..b12d5a2 --- /dev/null +++ b/examples/dts/requirements.txt @@ -0,0 +1,6 @@ +autopep8 +grpcio>=1.60.0 # 1.60.0 is the version introducing protobuf 1.25.X support, newer versions are backwards compatible +protobuf +azure-identity +durabletask-azuremanaged +durabletask \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 577824b..d3d9429 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "0.2b1" +version = "0.2.0" description = "A Durable Task Client SDK for Python" keywords = [ "durable", diff --git a/tests/durabletask-azuremanaged/__init__.py b/tests/durabletask-azuremanaged/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/durabletask-azuremanaged/test_dts_activity_sequence.py b/tests/durabletask-azuremanaged/test_dts_activity_sequence.py new file mode 100644 index 0000000..c875e49 --- /dev/null +++ b/tests/durabletask-azuremanaged/test_dts_activity_sequence.py @@ -0,0 +1,69 @@ +"""End-to-end sample that demonstrates how to configure an orchestrator +that calls an activity function in a sequence and prints the outputs.""" +import os + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +import pytest + + +pytestmark = pytest.mark.dts + +def hello(ctx: task.ActivityContext, name: str) -> str: + """Activity function that returns a greeting""" + return f'Hello {name}!' + + +def sequence(ctx: task.OrchestrationContext, _): + """Orchestrator function that calls the 'hello' activity function in a sequence""" + # call "hello" activity function in a sequence + result1 = yield ctx.call_activity(hello, input='Tokyo') + result2 = yield ctx.call_activity(hello, input='Seattle') + result3 = yield ctx.call_activity(hello, input='London') + + # return an array of results + return [result1, result2, result3] + + +# Read the environment variable +taskhub_name = os.getenv("TASKHUB") + +# Check if the variable exists +if taskhub_name: + print(f"The value of TASKHUB is: {taskhub_name}") +else: + print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") + print("If you are using windows powershell, run the following: $env:TASKHUB=\"\"") + print("If you are using bash, run the following: export TASKHUB=\"\"") + exit() + +# Read the environment variable +endpoint = os.getenv("ENDPOINT") + +# Check if the variable exists +if endpoint: + print(f"The value of ENDPOINT is: {endpoint}") +else: + print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") + print("If you are using windows powershell, run the following: $env:ENDPOINT=\"\"") + print("If you are using bash, run the following: export ENDPOINT=\"\"") + exit() + +# configure and start the worker +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(sequence) + w.add_activity(hello) + w.start() + + # Construct the client and run the orchestrations + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + instance_id = c.schedule_new_orchestration(sequence) + state = c.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: + print(f'Orchestration completed! Result: {state.serialized_output}') + elif state: + print(f'Orchestration failed: {state.failure_details}') diff --git a/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py new file mode 100644 index 0000000..f10e605 --- /dev/null +++ b/tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py @@ -0,0 +1,503 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json +import threading +import time +import os +from datetime import timedelta + +import pytest + +from durabletask import client, task +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +# NOTE: These tests assume a sidecar process is running. Example command: +# docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator +pytestmark = pytest.mark.dts + +# Read the environment variables +taskhub_name = os.getenv("TASKHUB", "default") +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + +def test_empty_orchestration(): + + invoked = False + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + nonlocal invoked # don't do this in a real app! + invoked = True + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(empty_orchestrator) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert invoked + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_input is None + assert state.serialized_output is None + assert state.serialized_custom_status is None + + +def test_activity_sequence(): + + def plus_one(_: task.ActivityContext, input: int) -> int: + return input + 1 + + def sequence(ctx: task.OrchestrationContext, start_val: int): + numbers = [start_val] + current = start_val + for _ in range(10): + current = yield ctx.call_activity(plus_one, input=current) + numbers.append(current) + return numbers + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(sequence) + w.add_activity(plus_one) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(sequence, input=1) + state = task_hub_client.wait_for_orchestration_completion( + id, timeout=30) + + assert state is not None + assert state.name == task.get_name(sequence) + assert state.instance_id == id + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.failure_details is None + assert state.serialized_input == json.dumps(1) + assert state.serialized_output == json.dumps([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) + assert state.serialized_custom_status is None + + +def test_activity_error_handling(): + + def throw(_: task.ActivityContext, input: int) -> int: + raise RuntimeError("Kah-BOOOOM!!!") + + compensation_counter = 0 + + def increment_counter(ctx, _): + nonlocal compensation_counter + compensation_counter += 1 + + def orchestrator(ctx: task.OrchestrationContext, input: int): + error_msg = "" + try: + yield ctx.call_activity(throw, input=input) + except task.TaskFailedError as e: + error_msg = e.details.message + + # compensating actions + yield ctx.call_activity(increment_counter) + yield ctx.call_activity(increment_counter) + + return error_msg + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(orchestrator) + w.add_activity(throw) + w.add_activity(increment_counter) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(orchestrator, input=1) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(orchestrator) + assert state.instance_id == id + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_output == json.dumps("Kah-BOOOOM!!!") + assert state.failure_details is None + assert state.serialized_custom_status is None + assert compensation_counter == 2 + + +def test_sub_orchestration_fan_out(): + threadLock = threading.Lock() + activity_counter = 0 + + def increment(ctx, _): + with threadLock: + nonlocal activity_counter + activity_counter += 1 + + def orchestrator_child(ctx: task.OrchestrationContext, activity_count: int): + for _ in range(activity_count): + yield ctx.call_activity(increment) + + def parent_orchestrator(ctx: task.OrchestrationContext, count: int): + # Fan out to multiple sub-orchestrations + tasks = [] + for _ in range(count): + tasks.append(ctx.call_sub_orchestrator( + orchestrator_child, input=3)) + # Wait for all sub-orchestrations to complete + yield task.when_all(tasks) + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_activity(increment) + w.add_orchestrator(orchestrator_child) + w.add_orchestrator(parent_orchestrator) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=10) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.failure_details is None + assert activity_counter == 30 + + +def test_wait_for_multiple_external_events(): + def orchestrator(ctx: task.OrchestrationContext, _): + a = yield ctx.wait_for_external_event('A') + b = yield ctx.wait_for_external_event('B') + c = yield ctx.wait_for_external_event('C') + return [a, b, c] + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(orchestrator) + w.start() + + # Start the orchestration and immediately raise events to it. + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(orchestrator) + task_hub_client.raise_orchestration_event(id, 'A', data='a') + task_hub_client.raise_orchestration_event(id, 'B', data='b') + task_hub_client.raise_orchestration_event(id, 'C', data='c') + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_output == json.dumps(['a', 'b', 'c']) + + +# @pytest.mark.parametrize("raise_event", [True, False]) +# def test_wait_for_external_event_timeout(raise_event: bool): +# def orchestrator(ctx: task.OrchestrationContext, _): +# approval: task.Task[bool] = ctx.wait_for_external_event('Approval') +# timeout = ctx.create_timer(timedelta(seconds=3)) +# winner = yield task.when_any([approval, timeout]) +# if winner == approval: +# return "approved" +# else: +# return "timed out" + +# # Start a worker, which will connect to the sidecar in a background thread +# with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) as w: +# w.add_orchestrator(orchestrator) +# w.start() + +# # Start the orchestration and immediately raise events to it. +# task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) +# id = task_hub_client.schedule_new_orchestration(orchestrator) +# if raise_event: +# task_hub_client.raise_orchestration_event(id, 'Approval') +# state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + +# assert state is not None +# assert state.runtime_status == client.OrchestrationStatus.COMPLETED +# if raise_event: +# assert state.serialized_output == json.dumps("approved") +# else: +# assert state.serialized_output == json.dumps("timed out") + + +# def test_suspend_and_resume(): +# def orchestrator(ctx: task.OrchestrationContext, _): +# result = yield ctx.wait_for_external_event("my_event") +# return result + +# # Start a worker, which will connect to the sidecar in a background thread +# with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) as w: +# w.add_orchestrator(orchestrator) +# w.start() + +# task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) +# id = task_hub_client.schedule_new_orchestration(orchestrator) +# state = task_hub_client.wait_for_orchestration_start(id, timeout=30) +# assert state is not None + +# # Suspend the orchestration and wait for it to go into the SUSPENDED state +# task_hub_client.suspend_orchestration(id) +# counter = 0 +# while state.runtime_status == client.OrchestrationStatus.RUNNING and counter < 1200: +# time.sleep(0.1) +# state = task_hub_client.get_orchestration_state(id) +# assert state is not None +# counter+=1 +# assert state.runtime_status == client.OrchestrationStatus.SUSPENDED + +# # Raise an event to the orchestration and confirm that it does NOT complete +# task_hub_client.raise_orchestration_event(id, "my_event", data=42) +# try: +# state = task_hub_client.wait_for_orchestration_completion(id, timeout=3) +# assert False, "Orchestration should not have completed" +# except TimeoutError: +# pass + +# # Resume the orchestration and wait for it to complete +# task_hub_client.resume_orchestration(id) +# state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) +# assert state is not None +# assert state.runtime_status == client.OrchestrationStatus.COMPLETED +# assert state.serialized_output == json.dumps(42) + + +def test_terminate(): + def orchestrator(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event("my_event") + return result + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(orchestrator) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(orchestrator) + state = task_hub_client.wait_for_orchestration_start(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.RUNNING + + task_hub_client.terminate_orchestration(id, output="some reason for termination") + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.TERMINATED + assert state.serialized_output == json.dumps("some reason for termination") + +def test_terminate_recursive(): + def root(ctx: task.OrchestrationContext, _): + result = yield ctx.call_sub_orchestrator(child) + return result + def child(ctx: task.OrchestrationContext, _): + result = yield ctx.wait_for_external_event("my_event") + return result + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(root) + w.add_orchestrator(child) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(root) + state = task_hub_client.wait_for_orchestration_start(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.RUNNING + + # Terminate root orchestration(recursive set to True by default) + task_hub_client.terminate_orchestration(id, output="some reason for termination") + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.TERMINATED + + # Verify that child orchestration is also terminated + c = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.TERMINATED + + task_hub_client.purge_orchestration(id) + state = task_hub_client.get_orchestration_state(id) + assert state is None + + +# def test_continue_as_new(): +# all_results = [] + +# def orchestrator(ctx: task.OrchestrationContext, input: int): +# result = yield ctx.wait_for_external_event("my_event") +# if not ctx.is_replaying: +# # NOTE: Real orchestrations should never interact with nonlocal variables like this. +# nonlocal all_results +# all_results.append(result) + +# if len(all_results) <= 4: +# ctx.continue_as_new(max(all_results), save_events=True) +# else: +# return all_results + +# # Start a worker, which will connect to the sidecar in a background thread +# with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) as w: +# w.add_orchestrator(orchestrator) +# w.start() + +# task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, +# taskhub=taskhub_name, token_credential=None) +# id = task_hub_client.schedule_new_orchestration(orchestrator, input=0) +# task_hub_client.raise_orchestration_event(id, "my_event", data=1) +# task_hub_client.raise_orchestration_event(id, "my_event", data=2) +# task_hub_client.raise_orchestration_event(id, "my_event", data=3) +# task_hub_client.raise_orchestration_event(id, "my_event", data=4) +# task_hub_client.raise_orchestration_event(id, "my_event", data=5) + +# state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) +# assert state is not None +# assert state.runtime_status == client.OrchestrationStatus.COMPLETED +# assert state.serialized_output == json.dumps(all_results) +# assert state.serialized_input == json.dumps(4) +# assert all_results == [1, 2, 3, 4, 5] + + +# NOTE: This test fails when running against durabletask-go with sqlite because the sqlite backend does not yet +# support orchestration ID reuse. This gap is being tracked here: +# https://github.com/microsoft/durabletask-go/issues/42 +def test_retry_policies(): + # This test verifies that the retry policies are working as expected. + # It does this by creating an orchestration that calls a sub-orchestrator, + # which in turn calls an activity that always fails. + # In this test, the retry policies are added, and the orchestration + # should still fail. But, number of times the sub-orchestrator and activity + # is called should increase as per the retry policies. + + child_orch_counter = 0 + throw_activity_counter = 0 + + # Second setup: With retry policies + retry_policy = task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=1, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=30)) + + def parent_orchestrator_with_retry(ctx: task.OrchestrationContext, _): + yield ctx.call_sub_orchestrator(child_orchestrator_with_retry, retry_policy=retry_policy) + + def child_orchestrator_with_retry(ctx: task.OrchestrationContext, _): + nonlocal child_orch_counter + if not ctx.is_replaying: + # NOTE: Real orchestrations should never interact with nonlocal variables like this. + # This is done only for testing purposes. + child_orch_counter += 1 + yield ctx.call_activity(throw_activity_with_retry, retry_policy=retry_policy) + + def throw_activity_with_retry(ctx: task.ActivityContext, _): + nonlocal throw_activity_counter + throw_activity_counter += 1 + raise RuntimeError("Kah-BOOOOM!!!") + + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(parent_orchestrator_with_retry) + w.add_orchestrator(child_orchestrator_with_retry) + w.add_activity(throw_activity_with_retry) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(parent_orchestrator_with_retry) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.FAILED + assert state.failure_details is not None + assert state.failure_details.error_type == "TaskFailedError" + assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:") + assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert state.failure_details.stack_trace is not None + assert throw_activity_counter == 9 + assert child_orch_counter == 3 + + +def test_retry_timeout(): + # This test verifies that the retry timeout is working as expected. + # Max number of attempts is 5 and retry timeout is 14 seconds. + # Total seconds consumed till 4th attempt is 1 + 2 + 4 + 8 = 15 seconds. + # So, the 5th attempt should not be made and the orchestration should fail. + throw_activity_counter = 0 + retry_policy = task.RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=5, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=14)) + + def mock_orchestrator(ctx: task.OrchestrationContext, _): + yield ctx.call_activity(throw_activity, retry_policy=retry_policy) + + def throw_activity(ctx: task.ActivityContext, _): + nonlocal throw_activity_counter + throw_activity_counter += 1 + raise RuntimeError("Kah-BOOOOM!!!") + + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(mock_orchestrator) + w.add_activity(throw_activity) + w.start() + + task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = task_hub_client.schedule_new_orchestration(mock_orchestrator) + state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.FAILED + assert state.failure_details is not None + assert state.failure_details.error_type == "TaskFailedError" + assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") + assert state.failure_details.stack_trace is not None + assert throw_activity_counter == 4 + +def test_custom_status(): + + def empty_orchestrator(ctx: task.OrchestrationContext, _): + ctx.set_custom_status("foobaz") + + # Start a worker, which will connect to the sidecar in a background thread + with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) as w: + w.add_orchestrator(empty_orchestrator) + w.start() + + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, + taskhub=taskhub_name, token_credential=None) + id = c.schedule_new_orchestration(empty_orchestrator) + state = c.wait_for_orchestration_completion(id, timeout=30) + + assert state is not None + assert state.name == task.get_name(empty_orchestrator) + assert state.instance_id == id + assert state.failure_details is None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_input is None + assert state.serialized_output is None + assert state.serialized_custom_status == "\"foobaz\"" diff --git a/tests/test_activity_executor.py b/tests/durabletask/test_activity_executor.py similarity index 100% rename from tests/test_activity_executor.py rename to tests/durabletask/test_activity_executor.py diff --git a/tests/test_client.py b/tests/durabletask/test_client.py similarity index 100% rename from tests/test_client.py rename to tests/durabletask/test_client.py diff --git a/tests/test_orchestration_e2e.py b/tests/durabletask/test_orchestration_e2e.py similarity index 100% rename from tests/test_orchestration_e2e.py rename to tests/durabletask/test_orchestration_e2e.py diff --git a/tests/test_orchestration_executor.py b/tests/durabletask/test_orchestration_executor.py similarity index 100% rename from tests/test_orchestration_executor.py rename to tests/durabletask/test_orchestration_executor.py