Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class ActivityCancellationDetails:
not_found: bool = False
cancel_requested: bool = False
paused: bool = False
reset: bool = False
timed_out: bool = False
worker_shutdown: bool = False

Expand All @@ -167,6 +168,7 @@ def _from_proto(
paused=proto.is_paused,
timed_out=proto.is_timed_out,
worker_shutdown=proto.is_worker_shutdown,
reset=proto.is_reset,
)


Expand Down
3 changes: 3 additions & 0 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ impl ClientRef {
"request_cancel_workflow_execution" => {
rpc_call!(retry_client, call, request_cancel_workflow_execution)
}
"reset_activity" => {
rpc_call!(retry_client, call, reset_activity)
}
"reset_sticky_task_queue" => {
rpc_call!(retry_client, call, reset_sticky_task_queue)
}
Expand Down
8 changes: 7 additions & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6363,11 +6363,16 @@ async def heartbeat_async_activity(
metadata=input.rpc_metadata,
timeout=input.rpc_timeout,
)
if resp_by_id.cancel_requested or resp_by_id.activity_paused:
if (
resp_by_id.cancel_requested
or resp_by_id.activity_paused
or resp_by_id.activity_reset
):
raise AsyncActivityCancelledError(
details=ActivityCancellationDetails(
cancel_requested=resp_by_id.cancel_requested,
paused=resp_by_id.activity_paused,
reset=resp_by_id.activity_reset,
)
)

Expand All @@ -6388,6 +6393,7 @@ async def heartbeat_async_activity(
details=ActivityCancellationDetails(
cancel_requested=resp.cancel_requested,
paused=resp.activity_paused,
reset=resp.activity_reset,
)
)

Expand Down
31 changes: 31 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type

import temporalio.api.workflowservice.v1
from temporalio import activity, workflow
from temporalio.client import (
AsyncActivityHandle,
Expand Down Expand Up @@ -1486,3 +1487,33 @@ async def h():
client, worker, heartbeat, retry_max_attempts=2
)
assert result.result == "details: Some detail"


async def test_activity_reset_catch(client: Client, worker: ExternalWorker):
@activity.defn
async def wait_cancel() -> str:
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
namespace=client.namespace,
execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=activity.info().workflow_id,
run_id=activity.info().workflow_run_id,
),
id=activity.info().activity_id,
)
activity.logger.info(f"Sending reset request: {req}")
await client.workflow_service.reset_activity(req)
try:
while True:
await asyncio.sleep(0.3)
activity.heartbeat()
except asyncio.CancelledError:
details = activity.cancellation_details()
assert details is not None
return "Got cancelled error, reset? " + str(details.reset)

result = await _execute_workflow_with_activity(
client,
worker,
wait_cancel,
)
assert result.result == "Got cancelled error, reset? True"
Loading