Skip to content

Commit 56db57e

Browse files
committed
WIP: wire async activity completion context
1 parent da08ddc commit 56db57e

File tree

1 file changed

+31
-7
lines changed

1 file changed

+31
-7
lines changed

temporalio/client.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@
6262
import temporalio.service
6363
import temporalio.workflow
6464
from temporalio.activity import ActivityCancellationDetails
65-
from temporalio.converter import WorkflowSerializationContext
65+
from temporalio.converter import (
66+
ActivitySerializationContext,
67+
DataConverter,
68+
WorkflowSerializationContext,
69+
)
6670
from temporalio.service import (
6771
HttpConnectProxyConfig,
6872
KeepAliveConfig,
@@ -6382,6 +6386,7 @@ async def _start_workflow_update_with_start(
63826386
async def heartbeat_async_activity(
63836387
self, input: HeartbeatAsyncActivityInput
63846388
) -> None:
6389+
# TODO
63856390
details = (
63866391
None
63876392
if not input.details
@@ -6435,11 +6440,29 @@ async def heartbeat_async_activity(
64356440
)
64366441
)
64376442

6443+
def _async_activity_data_converter(
6444+
self, id_or_token: Union[AsyncActivityIDReference, bytes]
6445+
) -> DataConverter:
6446+
context = ActivitySerializationContext(
6447+
namespace=self._client.namespace,
6448+
workflow_id=(
6449+
id_or_token.workflow_id
6450+
if isinstance(id_or_token, AsyncActivityIDReference)
6451+
else ""
6452+
),
6453+
workflow_type="",
6454+
activity_type="",
6455+
activity_task_queue="",
6456+
is_local=False,
6457+
)
6458+
return self._client.data_converter._with_context(context)
6459+
64386460
async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> None:
6461+
data_converter = self._async_activity_data_converter(input.id_or_token)
64396462
result = (
64406463
None
64416464
if input.result is temporalio.common._arg_unset
6442-
else await self._client.data_converter.encode_wrapper([input.result])
6465+
else await data_converter.encode_wrapper([input.result])
64436466
)
64446467
if isinstance(input.id_or_token, AsyncActivityIDReference):
64456468
await self._client.workflow_service.respond_activity_task_completed_by_id(
@@ -6469,14 +6492,14 @@ async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> No
64696492
)
64706493

64716494
async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
6495+
data_converter = self._async_activity_data_converter(input.id_or_token)
6496+
64726497
failure = temporalio.api.failure.v1.Failure()
6473-
await self._client.data_converter.encode_failure(input.error, failure)
6498+
await data_converter.encode_failure(input.error, failure)
64746499
last_heartbeat_details = (
64756500
None
64766501
if not input.last_heartbeat_details
6477-
else await self._client.data_converter.encode_wrapper(
6478-
input.last_heartbeat_details
6479-
)
6502+
else await data_converter.encode_wrapper(input.last_heartbeat_details)
64806503
)
64816504
if isinstance(input.id_or_token, AsyncActivityIDReference):
64826505
await self._client.workflow_service.respond_activity_task_failed_by_id(
@@ -6510,10 +6533,11 @@ async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
65106533
async def report_cancellation_async_activity(
65116534
self, input: ReportCancellationAsyncActivityInput
65126535
) -> None:
6536+
data_converter = self._async_activity_data_converter(input.id_or_token)
65136537
details = (
65146538
None
65156539
if not input.details
6516-
else await self._client.data_converter.encode_wrapper(input.details)
6540+
else await data_converter.encode_wrapper(input.details)
65176541
)
65186542
if isinstance(input.id_or_token, AsyncActivityIDReference):
65196543
await self._client.workflow_service.respond_activity_task_canceled_by_id(

0 commit comments

Comments
 (0)