From 67e4f29d9b1e7ff4d2a805bf602ad9d128765cfb Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 9 Jun 2025 11:27:27 -0700 Subject: [PATCH 1/7] WIP activity reset --- temporalio/activity.py | 2 ++ temporalio/bridge/src/client.rs | 3 +++ temporalio/client.py | 3 ++- temporalio/service.py | 2 +- temporalio/worker/_activity.py | 2 ++ tests/worker/test_activity.py | 34 +++++++++++++++++++++++++++++++++ 6 files changed, 44 insertions(+), 2 deletions(-) diff --git a/temporalio/activity.py b/temporalio/activity.py index f259a675d..e08081f19 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -154,6 +154,7 @@ class ActivityCancellationDetails: not_found: bool = False cancel_requested: bool = False paused: bool = False + reset: bool = False timed_out: bool = False worker_shutdown: bool = False @@ -167,6 +168,7 @@ def _from_proto( paused=proto.is_paused, timed_out=proto.is_timed_out, worker_shutdown=proto.is_worker_shutdown, + reset=proto.is_reset, ) diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 1ae1967ce..2f4ab867e 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -262,6 +262,9 @@ impl ClientRef { "request_cancel_workflow_execution" => { rpc_call!(retry_client, call, request_cancel_workflow_execution) } + "reset_activity" => { + rpc_call!(retry_client, call, reset_activity) + } "reset_sticky_task_queue" => { rpc_call!(retry_client, call, reset_sticky_task_queue) } diff --git a/temporalio/client.py b/temporalio/client.py index 50fab46ba..93029a334 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6363,11 +6363,12 @@ async def heartbeat_async_activity( metadata=input.rpc_metadata, timeout=input.rpc_timeout, ) - if resp_by_id.cancel_requested or resp_by_id.activity_paused: + if resp_by_id.cancel_requested or resp_by_id.activity_paused or resp_by_id.activity_reset: raise AsyncActivityCancelledError( details=ActivityCancellationDetails( cancel_requested=resp_by_id.cancel_requested, paused=resp_by_id.activity_paused, + reset=resp_by_id.activity_reset ) ) diff --git a/temporalio/service.py b/temporalio/service.py index b22906b9b..48db175f0 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) # Set to true to log all requests and responses -LOG_PROTOS = False +LOG_PROTOS = True @dataclass diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index c76c8f005..d6a65b677 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -152,6 +152,8 @@ async def raise_from_exception_queue() -> NoReturn: await exception_task task = await poll_task + logger.info(f"Activity task {task}") + if task.HasField("start"): # Cancelled event and sync field will be updated inside # _run_activity when the activity function is obtained. Max diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 8d39069a7..b1e7bcb3d 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -15,7 +15,9 @@ from datetime import datetime, timedelta, timezone from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type +import temporalio.api.workflowservice.v1 from temporalio import activity, workflow +from temporalio.api.workflowservice.v1.request_response_pb2 import ResetActivityRequest from temporalio.client import ( AsyncActivityHandle, Client, @@ -1486,3 +1488,35 @@ async def h(): client, worker, heartbeat, retry_max_attempts=2 ) assert result.result == "details: Some detail" + +async def test_activity_reset(client: Client, worker: ExternalWorker): + + @activity.defn + async def reset_activity() -> None: + + await client.workflow_service.reset_activity(temporalio.api.workflowservice.v1.ResetActivityRequest( + namespace=client.namespace, + execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=activity.info().workflow_id, + run_id=activity.info().workflow_run_id, + ), + id=activity.info().activity_id, + )) + reset = False + for _ in range(5): + try: + if reset: + return None + await asyncio.sleep(1) + except Exception as e: + activity.logger.warning("Exception: ", e) + reset = True + raise + + assert False + + await _execute_workflow_with_activity( + client, worker, reset_activity + ) + + From 12f4828621c897969fe14a29e73d3b5f01e3e66b Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 28 Aug 2025 11:06:50 -0700 Subject: [PATCH 2/7] Fix test for activity reset --- temporalio/activity.py | 1 + temporalio/client.py | 9 +++++-- tests/worker/test_activity.py | 45 ++++++++++++++++++++++++++++------- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/temporalio/activity.py b/temporalio/activity.py index e08081f19..934585cb5 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -162,6 +162,7 @@ class ActivityCancellationDetails: def _from_proto( proto: temporalio.bridge.proto.activity_task.ActivityCancellationDetails, ) -> ActivityCancellationDetails: + print(f"Building activity cancellation details, reset: {proto.is_reset}") return ActivityCancellationDetails( not_found=proto.is_not_found, cancel_requested=proto.is_cancelled, diff --git a/temporalio/client.py b/temporalio/client.py index 93029a334..4eb1dc868 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -6363,12 +6363,16 @@ async def heartbeat_async_activity( metadata=input.rpc_metadata, timeout=input.rpc_timeout, ) - if resp_by_id.cancel_requested or resp_by_id.activity_paused or resp_by_id.activity_reset: + if ( + resp_by_id.cancel_requested + or resp_by_id.activity_paused + or resp_by_id.activity_reset + ): raise AsyncActivityCancelledError( details=ActivityCancellationDetails( cancel_requested=resp_by_id.cancel_requested, paused=resp_by_id.activity_paused, - reset=resp_by_id.activity_reset + reset=resp_by_id.activity_reset, ) ) @@ -6389,6 +6393,7 @@ async def heartbeat_async_activity( details=ActivityCancellationDetails( cancel_requested=resp.cancel_requested, paused=resp.activity_paused, + reset=resp.activity_reset, ) ) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index b1e7bcb3d..3f7d92860 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -17,7 +17,6 @@ import temporalio.api.workflowservice.v1 from temporalio import activity, workflow -from temporalio.api.workflowservice.v1.request_response_pb2 import ResetActivityRequest from temporalio.client import ( AsyncActivityHandle, Client, @@ -1489,25 +1488,27 @@ async def h(): ) assert result.result == "details: Some detail" -async def test_activity_reset(client: Client, worker: ExternalWorker): +async def test_activity_reset(client: Client, worker: ExternalWorker): @activity.defn async def reset_activity() -> None: - - await client.workflow_service.reset_activity(temporalio.api.workflowservice.v1.ResetActivityRequest( + req = temporalio.api.workflowservice.v1.ResetActivityRequest( namespace=client.namespace, execution=temporalio.api.common.v1.WorkflowExecution( workflow_id=activity.info().workflow_id, run_id=activity.info().workflow_run_id, ), id=activity.info().activity_id, - )) + ) + activity.logger.info(f"Sending reset request: {req}") + await client.workflow_service.reset_activity(req) reset = False for _ in range(5): + activity.heartbeat() try: if reset: return None - await asyncio.sleep(1) + await asyncio.sleep(0.3) except Exception as e: activity.logger.warning("Exception: ", e) reset = True @@ -1515,8 +1516,34 @@ async def reset_activity() -> None: assert False - await _execute_workflow_with_activity( - client, worker, reset_activity - ) + await _execute_workflow_with_activity(client, worker, reset_activity) + +async def test_activity_reset_catch(client: Client, worker: ExternalWorker): + @activity.defn + async def wait_cancel() -> str: + req = temporalio.api.workflowservice.v1.ResetActivityRequest( + namespace=client.namespace, + execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=activity.info().workflow_id, + run_id=activity.info().workflow_run_id, + ), + id=activity.info().activity_id, + ) + activity.logger.info(f"Sending reset request: {req}") + await client.workflow_service.reset_activity(req) + try: + while True: + await asyncio.sleep(0.3) + activity.heartbeat() + except asyncio.CancelledError: + details = activity.cancellation_details() + assert details is not None + return "Got cancelled error, reset? " + str(details.reset) + result = await _execute_workflow_with_activity( + client, + worker, + wait_cancel, + ) + assert result.result == "Got cancelled error, reset? True" From c4cade54e4abecd3466ef5b0ce1bfbff75eabd1b Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Thu, 28 Aug 2025 11:12:17 -0700 Subject: [PATCH 3/7] Cleanup, remove bad test --- temporalio/activity.py | 1 - temporalio/service.py | 2 +- temporalio/worker/_activity.py | 2 -- tests/worker/test_activity.py | 30 ------------------------------ 4 files changed, 1 insertion(+), 34 deletions(-) diff --git a/temporalio/activity.py b/temporalio/activity.py index 934585cb5..e08081f19 100644 --- a/temporalio/activity.py +++ b/temporalio/activity.py @@ -162,7 +162,6 @@ class ActivityCancellationDetails: def _from_proto( proto: temporalio.bridge.proto.activity_task.ActivityCancellationDetails, ) -> ActivityCancellationDetails: - print(f"Building activity cancellation details, reset: {proto.is_reset}") return ActivityCancellationDetails( not_found=proto.is_not_found, cancel_requested=proto.is_cancelled, diff --git a/temporalio/service.py b/temporalio/service.py index 48db175f0..b22906b9b 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) # Set to true to log all requests and responses -LOG_PROTOS = True +LOG_PROTOS = False @dataclass diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index d6a65b677..c76c8f005 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -152,8 +152,6 @@ async def raise_from_exception_queue() -> NoReturn: await exception_task task = await poll_task - logger.info(f"Activity task {task}") - if task.HasField("start"): # Cancelled event and sync field will be updated inside # _run_activity when the activity function is obtained. Max diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 3f7d92860..72db685df 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1489,36 +1489,6 @@ async def h(): assert result.result == "details: Some detail" -async def test_activity_reset(client: Client, worker: ExternalWorker): - @activity.defn - async def reset_activity() -> None: - req = temporalio.api.workflowservice.v1.ResetActivityRequest( - namespace=client.namespace, - execution=temporalio.api.common.v1.WorkflowExecution( - workflow_id=activity.info().workflow_id, - run_id=activity.info().workflow_run_id, - ), - id=activity.info().activity_id, - ) - activity.logger.info(f"Sending reset request: {req}") - await client.workflow_service.reset_activity(req) - reset = False - for _ in range(5): - activity.heartbeat() - try: - if reset: - return None - await asyncio.sleep(0.3) - except Exception as e: - activity.logger.warning("Exception: ", e) - reset = True - raise - - assert False - - await _execute_workflow_with_activity(client, worker, reset_activity) - - async def test_activity_reset_catch(client: Client, worker: ExternalWorker): @activity.defn async def wait_cancel() -> str: From a2953390f56acb037bae0338ca47faec6cdb75ae Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 2 Sep 2025 09:15:05 -0700 Subject: [PATCH 4/7] Add test and error case --- temporalio/worker/_activity.py | 18 +++++++++++++ temporalio/worker/_workflow_instance.py | 1 + tests/worker/test_activity.py | 36 ++++++++++++++++++++++++- tests/worker/test_workflow.py | 30 ++++++++++++++++++++- 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_activity.py b/temporalio/worker/_activity.py index c76c8f005..19e406e6f 100644 --- a/temporalio/worker/_activity.py +++ b/temporalio/worker/_activity.py @@ -334,6 +334,24 @@ async def _handle_start_activity_task( ), completion.result.failed.failure, ) + elif ( + isinstance( + err, + (asyncio.CancelledError, temporalio.exceptions.CancelledError), + ) + and running_activity.cancellation_details.details + and running_activity.cancellation_details.details.reset + ): + temporalio.activity.logger.warning( + "Completing as failure due to unhandled cancel error produced by activity reset", + ) + await self._data_converter.encode_failure( + temporalio.exceptions.ApplicationError( + type="ActivityReset", + message="Unhandled activity cancel error produced by activity reset", + ), + completion.result.failed.failure, + ) elif ( isinstance( err, diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index c93155672..795f1226d 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2791,6 +2791,7 @@ def _apply_schedule_command( command.user_metadata.summary.CopyFrom( self._instance._payload_converter.to_payload(self._input.summary) ) + print("Activity summary: ", command.user_metadata.summary) if self._input.priority: command.schedule_activity.priority.CopyFrom( self._input.priority._to_proto() diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 72db685df..050955f51 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -9,10 +9,12 @@ import threading import time import uuid +from concurrent.futures import ThreadPoolExecutor from concurrent.futures.process import BrokenProcessPool from contextvars import ContextVar from dataclasses import dataclass from datetime import datetime, timedelta, timezone +from time import sleep from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type import temporalio.api.workflowservice.v1 @@ -1500,7 +1502,6 @@ async def wait_cancel() -> str: ), id=activity.info().activity_id, ) - activity.logger.info(f"Sending reset request: {req}") await client.workflow_service.reset_activity(req) try: while True: @@ -1511,9 +1512,42 @@ async def wait_cancel() -> str: assert details is not None return "Got cancelled error, reset? " + str(details.reset) + @activity.defn + def sync_wait_cancel() -> str: + req = temporalio.api.workflowservice.v1.ResetActivityRequest( + namespace=client.namespace, + execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=activity.info().workflow_id, + run_id=activity.info().workflow_run_id, + ), + id=activity.info().activity_id, + ) + asyncio.run(client.workflow_service.reset_activity(req)) + try: + while True: + sleep(0.3) + activity.heartbeat() + except temporalio.exceptions.CancelledError: + details = activity.cancellation_details() + assert details is not None + return "Got cancelled error, reset? " + str(details.reset) + except Exception as e: + return str(type(e)) + str(e) + result = await _execute_workflow_with_activity( client, worker, wait_cancel, ) assert result.result == "Got cancelled error, reset? True" + + config = WorkerConfig( + activity_executor=ThreadPoolExecutor(max_workers=1), + ) + result = await _execute_workflow_with_activity( + client, + worker, + sync_wait_cancel, + worker_config=config, + ) + assert result.result == "Got cancelled error, reset? True" diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index e97bf3e02..42bfedba4 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -814,7 +814,10 @@ class SimpleActivityWorkflow: @workflow.run async def run(self, name: str) -> str: return await workflow.execute_activity( - say_hello, name, schedule_to_close_timeout=timedelta(seconds=5) + say_hello, + name, + schedule_to_close_timeout=timedelta(seconds=5), + summary="Do a thing", ) @@ -8327,3 +8330,28 @@ async def test_workflow_headers_with_codec( assert headers["foo"].data == b"bar" else: assert headers["foo"].data != b"bar" + + +async def test_summary_with_codec(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Time skipping server doesn't persist headers.") + + # Make client with this codec and run a couple of existing tests + config = client.config() + config["data_converter"] = DataConverter(payload_codec=SimpleCodec()) + client = Client(**config) + + async with new_worker( + client, + SimpleActivityWorkflow, + SignalAndQueryWorkflow, + activities=[say_hello], + ) as worker: + workflow_handle = await client.start_workflow( + SimpleActivityWorkflow.run, + "Temporal", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert await workflow_handle.result() == "Hello, Temporal!" + assert False From 70612e585f8f1e429dcfb00fb9a57b06dbca255f Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 2 Sep 2025 10:29:42 -0700 Subject: [PATCH 5/7] Remove investigation code accidentally added --- tests/worker/test_workflow.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 42bfedba4..07bbfe803 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -8331,27 +8331,3 @@ async def test_workflow_headers_with_codec( else: assert headers["foo"].data != b"bar" - -async def test_summary_with_codec(client: Client, env: WorkflowEnvironment): - if env.supports_time_skipping: - pytest.skip("Time skipping server doesn't persist headers.") - - # Make client with this codec and run a couple of existing tests - config = client.config() - config["data_converter"] = DataConverter(payload_codec=SimpleCodec()) - client = Client(**config) - - async with new_worker( - client, - SimpleActivityWorkflow, - SignalAndQueryWorkflow, - activities=[say_hello], - ) as worker: - workflow_handle = await client.start_workflow( - SimpleActivityWorkflow.run, - "Temporal", - id=f"workflow-{uuid.uuid4()}", - task_queue=worker.task_queue, - ) - assert await workflow_handle.result() == "Hello, Temporal!" - assert False From afdb7579b551dd9710743df7f6cb68d835276dc1 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 3 Sep 2025 08:40:37 -0700 Subject: [PATCH 6/7] Add test --- tests/worker/test_activity.py | 30 ++++++++++++++++++++++++++++++ tests/worker/test_workflow.py | 1 - 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 050955f51..c31b855f5 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1551,3 +1551,33 @@ def sync_wait_cancel() -> str: worker_config=config, ) assert result.result == "Got cancelled error, reset? True" + + +async def test_activity_reset_history(client: Client, worker: ExternalWorker): + @activity.defn + async def wait_cancel() -> str: + req = temporalio.api.workflowservice.v1.ResetActivityRequest( + namespace=client.namespace, + execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=activity.info().workflow_id, + run_id=activity.info().workflow_run_id, + ), + id=activity.info().activity_id, + ) + await client.workflow_service.reset_activity(req) + while True: + await asyncio.sleep(0.3) + activity.heartbeat() + + with pytest.raises(WorkflowFailureError) as e: + result = await _execute_workflow_with_activity( + client, + worker, + wait_cancel, + ) + assert isinstance(e.value.cause, ActivityError) + assert isinstance(e.value.cause.cause, ApplicationError) + assert ( + e.value.cause.cause.message + == "Unhandled activity cancel error produced by activity reset" + ) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 07bbfe803..862f9a456 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -8330,4 +8330,3 @@ async def test_workflow_headers_with_codec( assert headers["foo"].data == b"bar" else: assert headers["foo"].data != b"bar" - From 52c754583157dde4ff969fdef59ef578dfab96c6 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 3 Sep 2025 11:21:02 -0700 Subject: [PATCH 7/7] Skip on time skipping --- tests/worker/test_activity.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index c31b855f5..7868fc281 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -1491,7 +1491,12 @@ async def h(): assert result.result == "details: Some detail" -async def test_activity_reset_catch(client: Client, worker: ExternalWorker): +async def test_activity_reset_catch( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Time skipping server doesn't support activity reset") + @activity.defn async def wait_cancel() -> str: req = temporalio.api.workflowservice.v1.ResetActivityRequest( @@ -1553,7 +1558,12 @@ def sync_wait_cancel() -> str: assert result.result == "Got cancelled error, reset? True" -async def test_activity_reset_history(client: Client, worker: ExternalWorker): +async def test_activity_reset_history( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Time skipping server doesn't support activity reset") + @activity.defn async def wait_cancel() -> str: req = temporalio.api.workflowservice.v1.ResetActivityRequest(