Skip to content

Commit 12f4828

Browse files
committed
Fix test for activity reset
1 parent 67e4f29 commit 12f4828

File tree

3 files changed

+44
-11
lines changed

3 files changed

+44
-11
lines changed

temporalio/activity.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ class ActivityCancellationDetails:
162162
def _from_proto(
163163
proto: temporalio.bridge.proto.activity_task.ActivityCancellationDetails,
164164
) -> ActivityCancellationDetails:
165+
print(f"Building activity cancellation details, reset: {proto.is_reset}")
165166
return ActivityCancellationDetails(
166167
not_found=proto.is_not_found,
167168
cancel_requested=proto.is_cancelled,

temporalio/client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6363,12 +6363,16 @@ async def heartbeat_async_activity(
63636363
metadata=input.rpc_metadata,
63646364
timeout=input.rpc_timeout,
63656365
)
6366-
if resp_by_id.cancel_requested or resp_by_id.activity_paused or resp_by_id.activity_reset:
6366+
if (
6367+
resp_by_id.cancel_requested
6368+
or resp_by_id.activity_paused
6369+
or resp_by_id.activity_reset
6370+
):
63676371
raise AsyncActivityCancelledError(
63686372
details=ActivityCancellationDetails(
63696373
cancel_requested=resp_by_id.cancel_requested,
63706374
paused=resp_by_id.activity_paused,
6371-
reset=resp_by_id.activity_reset
6375+
reset=resp_by_id.activity_reset,
63726376
)
63736377
)
63746378

@@ -6389,6 +6393,7 @@ async def heartbeat_async_activity(
63896393
details=ActivityCancellationDetails(
63906394
cancel_requested=resp.cancel_requested,
63916395
paused=resp.activity_paused,
6396+
reset=resp.activity_reset,
63926397
)
63936398
)
63946399

tests/worker/test_activity.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import temporalio.api.workflowservice.v1
1919
from temporalio import activity, workflow
20-
from temporalio.api.workflowservice.v1.request_response_pb2 import ResetActivityRequest
2120
from temporalio.client import (
2221
AsyncActivityHandle,
2322
Client,
@@ -1489,34 +1488,62 @@ async def h():
14891488
)
14901489
assert result.result == "details: Some detail"
14911490

1492-
async def test_activity_reset(client: Client, worker: ExternalWorker):
14931491

1492+
async def test_activity_reset(client: Client, worker: ExternalWorker):
14941493
@activity.defn
14951494
async def reset_activity() -> None:
1496-
1497-
await client.workflow_service.reset_activity(temporalio.api.workflowservice.v1.ResetActivityRequest(
1495+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
14981496
namespace=client.namespace,
14991497
execution=temporalio.api.common.v1.WorkflowExecution(
15001498
workflow_id=activity.info().workflow_id,
15011499
run_id=activity.info().workflow_run_id,
15021500
),
15031501
id=activity.info().activity_id,
1504-
))
1502+
)
1503+
activity.logger.info(f"Sending reset request: {req}")
1504+
await client.workflow_service.reset_activity(req)
15051505
reset = False
15061506
for _ in range(5):
1507+
activity.heartbeat()
15071508
try:
15081509
if reset:
15091510
return None
1510-
await asyncio.sleep(1)
1511+
await asyncio.sleep(0.3)
15111512
except Exception as e:
15121513
activity.logger.warning("Exception: ", e)
15131514
reset = True
15141515
raise
15151516

15161517
assert False
15171518

1518-
await _execute_workflow_with_activity(
1519-
client, worker, reset_activity
1520-
)
1519+
await _execute_workflow_with_activity(client, worker, reset_activity)
1520+
15211521

1522+
async def test_activity_reset_catch(client: Client, worker: ExternalWorker):
1523+
@activity.defn
1524+
async def wait_cancel() -> str:
1525+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
1526+
namespace=client.namespace,
1527+
execution=temporalio.api.common.v1.WorkflowExecution(
1528+
workflow_id=activity.info().workflow_id,
1529+
run_id=activity.info().workflow_run_id,
1530+
),
1531+
id=activity.info().activity_id,
1532+
)
1533+
activity.logger.info(f"Sending reset request: {req}")
1534+
await client.workflow_service.reset_activity(req)
1535+
try:
1536+
while True:
1537+
await asyncio.sleep(0.3)
1538+
activity.heartbeat()
1539+
except asyncio.CancelledError:
1540+
details = activity.cancellation_details()
1541+
assert details is not None
1542+
return "Got cancelled error, reset? " + str(details.reset)
15221543

1544+
result = await _execute_workflow_with_activity(
1545+
client,
1546+
worker,
1547+
wait_cancel,
1548+
)
1549+
assert result.result == "Got cancelled error, reset? True"

0 commit comments

Comments
 (0)