Skip to content

Commit a295339

Browse files
committed
Add test and error case
1 parent c4cade5 commit a295339

File tree

4 files changed

+83
-2
lines changed

4 files changed

+83
-2
lines changed

temporalio/worker/_activity.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,24 @@ async def _handle_start_activity_task(
334334
),
335335
completion.result.failed.failure,
336336
)
337+
elif (
338+
isinstance(
339+
err,
340+
(asyncio.CancelledError, temporalio.exceptions.CancelledError),
341+
)
342+
and running_activity.cancellation_details.details
343+
and running_activity.cancellation_details.details.reset
344+
):
345+
temporalio.activity.logger.warning(
346+
"Completing as failure due to unhandled cancel error produced by activity reset",
347+
)
348+
await self._data_converter.encode_failure(
349+
temporalio.exceptions.ApplicationError(
350+
type="ActivityReset",
351+
message="Unhandled activity cancel error produced by activity reset",
352+
),
353+
completion.result.failed.failure,
354+
)
337355
elif (
338356
isinstance(
339357
err,

temporalio/worker/_workflow_instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2791,6 +2791,7 @@ def _apply_schedule_command(
27912791
command.user_metadata.summary.CopyFrom(
27922792
self._instance._payload_converter.to_payload(self._input.summary)
27932793
)
2794+
print("Activity summary: ", command.user_metadata.summary)
27942795
if self._input.priority:
27952796
command.schedule_activity.priority.CopyFrom(
27962797
self._input.priority._to_proto()

tests/worker/test_activity.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
import threading
1010
import time
1111
import uuid
12+
from concurrent.futures import ThreadPoolExecutor
1213
from concurrent.futures.process import BrokenProcessPool
1314
from contextvars import ContextVar
1415
from dataclasses import dataclass
1516
from datetime import datetime, timedelta, timezone
17+
from time import sleep
1618
from typing import Any, Callable, List, NoReturn, Optional, Sequence, Type
1719

1820
import temporalio.api.workflowservice.v1
@@ -1500,7 +1502,6 @@ async def wait_cancel() -> str:
15001502
),
15011503
id=activity.info().activity_id,
15021504
)
1503-
activity.logger.info(f"Sending reset request: {req}")
15041505
await client.workflow_service.reset_activity(req)
15051506
try:
15061507
while True:
@@ -1511,9 +1512,42 @@ async def wait_cancel() -> str:
15111512
assert details is not None
15121513
return "Got cancelled error, reset? " + str(details.reset)
15131514

1515+
@activity.defn
1516+
def sync_wait_cancel() -> str:
1517+
req = temporalio.api.workflowservice.v1.ResetActivityRequest(
1518+
namespace=client.namespace,
1519+
execution=temporalio.api.common.v1.WorkflowExecution(
1520+
workflow_id=activity.info().workflow_id,
1521+
run_id=activity.info().workflow_run_id,
1522+
),
1523+
id=activity.info().activity_id,
1524+
)
1525+
asyncio.run(client.workflow_service.reset_activity(req))
1526+
try:
1527+
while True:
1528+
sleep(0.3)
1529+
activity.heartbeat()
1530+
except temporalio.exceptions.CancelledError:
1531+
details = activity.cancellation_details()
1532+
assert details is not None
1533+
return "Got cancelled error, reset? " + str(details.reset)
1534+
except Exception as e:
1535+
return str(type(e)) + str(e)
1536+
15141537
result = await _execute_workflow_with_activity(
15151538
client,
15161539
worker,
15171540
wait_cancel,
15181541
)
15191542
assert result.result == "Got cancelled error, reset? True"
1543+
1544+
config = WorkerConfig(
1545+
activity_executor=ThreadPoolExecutor(max_workers=1),
1546+
)
1547+
result = await _execute_workflow_with_activity(
1548+
client,
1549+
worker,
1550+
sync_wait_cancel,
1551+
worker_config=config,
1552+
)
1553+
assert result.result == "Got cancelled error, reset? True"

tests/worker/test_workflow.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,10 @@ class SimpleActivityWorkflow:
814814
@workflow.run
815815
async def run(self, name: str) -> str:
816816
return await workflow.execute_activity(
817-
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
817+
say_hello,
818+
name,
819+
schedule_to_close_timeout=timedelta(seconds=5),
820+
summary="Do a thing",
818821
)
819822

820823

@@ -8327,3 +8330,28 @@ async def test_workflow_headers_with_codec(
83278330
assert headers["foo"].data == b"bar"
83288331
else:
83298332
assert headers["foo"].data != b"bar"
8333+
8334+
8335+
async def test_summary_with_codec(client: Client, env: WorkflowEnvironment):
8336+
if env.supports_time_skipping:
8337+
pytest.skip("Time skipping server doesn't persist headers.")
8338+
8339+
# Make client with this codec and run a couple of existing tests
8340+
config = client.config()
8341+
config["data_converter"] = DataConverter(payload_codec=SimpleCodec())
8342+
client = Client(**config)
8343+
8344+
async with new_worker(
8345+
client,
8346+
SimpleActivityWorkflow,
8347+
SignalAndQueryWorkflow,
8348+
activities=[say_hello],
8349+
) as worker:
8350+
workflow_handle = await client.start_workflow(
8351+
SimpleActivityWorkflow.run,
8352+
"Temporal",
8353+
id=f"workflow-{uuid.uuid4()}",
8354+
task_queue=worker.task_queue,
8355+
)
8356+
assert await workflow_handle.result() == "Hello, Temporal!"
8357+
assert False

0 commit comments

Comments
 (0)