diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index 2bd71587..b21223b8 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -11,9 +11,12 @@ class ComposeGreetingInput: name: str -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() - # If this raises an exception because it's not done yet, the activity will - # continually be scheduled for retry - return await test_service.get_service_result(input) +class ComposeGreeting: + def __init__(self): + self.test_service = TestService() + + @activity.defn + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # If this raises an exception because it's not done yet, the activity will + # continually be scheduled for retry + return await self.test_service.get_service_result(input) diff --git a/polling/infrequent/run_worker.py b/polling/infrequent/run_worker.py index f600b949..020c04a9 100644 --- a/polling/infrequent/run_worker.py +++ b/polling/infrequent/run_worker.py @@ -3,18 +3,18 @@ from temporalio.client import Client from temporalio.worker import Worker -from polling.infrequent.activities import compose_greeting +from polling.infrequent.activities import ComposeGreeting from polling.infrequent.workflows import GreetingWorkflow async def main(): client = await Client.connect("localhost:7233") - + activities = ComposeGreeting() worker = Worker( client, task_queue="infrequent-activity-retry-task-queue", workflows=[GreetingWorkflow], - activities=[compose_greeting], + activities=[activities.compose_greeting], ) await worker.run() diff --git a/polling/infrequent/workflows.py b/polling/infrequent/workflows.py index 35769573..05abe943 100644 --- a/polling/infrequent/workflows.py +++ b/polling/infrequent/workflows.py @@ -4,15 +4,15 @@ from temporalio.common import RetryPolicy with workflow.unsafe.imports_passed_through(): - from polling.infrequent.activities import ComposeGreetingInput, compose_greeting + from polling.infrequent.activities import ComposeGreeting, ComposeGreetingInput @workflow.defn class GreetingWorkflow: @workflow.run async def run(self, name: str) -> str: - return await workflow.execute_activity( - compose_greeting, + return await workflow.execute_activity_method( + ComposeGreeting.compose_greeting, ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=2), retry_policy=RetryPolicy( diff --git a/polling/periodic_sequence/activities.py b/polling/periodic_sequence/activities.py index 1a1196c6..d858ac19 100644 --- a/polling/periodic_sequence/activities.py +++ b/polling/periodic_sequence/activities.py @@ -2,6 +2,8 @@ from temporalio import activity +from polling.test_service import TestService + @dataclass class ComposeGreetingInput: @@ -9,6 +11,12 @@ class ComposeGreetingInput: name: str -@activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: - raise RuntimeError("Service is down") +class ComposeGreeting: + def __init__(self): + self.test_service = TestService(error_attempts=23) + + @activity.defn + async def compose_greeting(self, input: ComposeGreetingInput) -> str: + # If this raises an exception because it's not done yet, the activity will + # continually be scheduled for retry + return await self.test_service.get_service_result(input) diff --git a/polling/periodic_sequence/run_worker.py b/polling/periodic_sequence/run_worker.py index e04ac4dc..64772b54 100644 --- a/polling/periodic_sequence/run_worker.py +++ b/polling/periodic_sequence/run_worker.py @@ -3,18 +3,18 @@ from temporalio.client import Client from temporalio.worker import Worker -from polling.periodic_sequence.activities import compose_greeting +from polling.periodic_sequence.activities import ComposeGreeting from polling.periodic_sequence.workflows import ChildWorkflow, GreetingWorkflow async def main(): client = await Client.connect("localhost:7233") - + activities = ComposeGreeting() worker = Worker( client, task_queue="periodic-retry-task-queue", workflows=[GreetingWorkflow, ChildWorkflow], - activities=[compose_greeting], + activities=[activities.compose_greeting], ) await worker.run() diff --git a/polling/periodic_sequence/workflows.py b/polling/periodic_sequence/workflows.py index d38d41ce..c07c971f 100644 --- a/polling/periodic_sequence/workflows.py +++ b/polling/periodic_sequence/workflows.py @@ -7,10 +7,12 @@ with workflow.unsafe.imports_passed_through(): from polling.periodic_sequence.activities import ( + ComposeGreeting, ComposeGreetingInput, - compose_greeting, ) +MAX_RETRY_PER_CHILD_FLOW = 10 + @workflow.defn class GreetingWorkflow: @@ -26,10 +28,10 @@ async def run(self, name: str) -> str: class ChildWorkflow: @workflow.run async def run(self, name: str) -> str: - for i in range(10): + for i in range(MAX_RETRY_PER_CHILD_FLOW): try: - return await workflow.execute_activity( - compose_greeting, + return await workflow.execute_activity_method( + ComposeGreeting.compose_greeting, ComposeGreetingInput("Hello", name), start_to_close_timeout=timedelta(seconds=4), retry_policy=RetryPolicy( @@ -38,8 +40,10 @@ async def run(self, name: str) -> str: ) except ActivityError: - workflow.logger.error("Activity failed, retrying in 1 seconds") + workflow.logger.error( + f"Activity failed ({i}/{MAX_RETRY_PER_CHILD_FLOW}), retrying in 1 seconds" + ) await asyncio.sleep(1) - workflow.continue_as_new(name) + workflow.continue_as_new(name) raise Exception("Polling failed after all attempts") diff --git a/polling/test_service.py b/polling/test_service.py index 3744998a..005150d5 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,7 +1,7 @@ class TestService: - def __init__(self): + def __init__(self, error_attempts: int = 5): self.try_attempts = 0 - self.error_attempts = 5 + self.error_attempts = error_attempts async def get_service_result(self, input): print(