Skip to content

Commit 3e53f35

Browse files
committed
WIP: wire async activity completion context
1 parent 9d5ec58 commit 3e53f35

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

temporalio/client.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@
6262
import temporalio.service
6363
import temporalio.workflow
6464
from temporalio.activity import ActivityCancellationDetails
65-
from temporalio.converter import WorkflowSerializationContext
65+
from temporalio.converter import (
66+
ActivitySerializationContext,
67+
DataConverter,
68+
WorkflowSerializationContext,
69+
)
6670
from temporalio.service import (
6771
HttpConnectProxyConfig,
6872
KeepAliveConfig,
@@ -6391,6 +6395,7 @@ async def _start_workflow_update_with_start(
63916395
async def heartbeat_async_activity(
63926396
self, input: HeartbeatAsyncActivityInput
63936397
) -> None:
6398+
# TODO
63946399
details = (
63956400
None
63966401
if not input.details
@@ -6444,11 +6449,29 @@ async def heartbeat_async_activity(
64446449
)
64456450
)
64466451

6452+
def _async_activity_data_converter(
6453+
self, id_or_token: Union[AsyncActivityIDReference, bytes]
6454+
) -> DataConverter:
6455+
context = ActivitySerializationContext(
6456+
namespace=self._client.namespace,
6457+
workflow_id=(
6458+
id_or_token.workflow_id
6459+
if isinstance(id_or_token, AsyncActivityIDReference)
6460+
else ""
6461+
),
6462+
workflow_type="",
6463+
activity_type="",
6464+
activity_task_queue="",
6465+
is_local=False,
6466+
)
6467+
return self._client.data_converter._with_context(context)
6468+
64476469
async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> None:
6470+
data_converter = self._async_activity_data_converter(input.id_or_token)
64486471
result = (
64496472
None
64506473
if input.result is temporalio.common._arg_unset
6451-
else await self._client.data_converter.encode_wrapper([input.result])
6474+
else await data_converter.encode_wrapper([input.result])
64526475
)
64536476
if isinstance(input.id_or_token, AsyncActivityIDReference):
64546477
await self._client.workflow_service.respond_activity_task_completed_by_id(
@@ -6478,14 +6501,14 @@ async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> No
64786501
)
64796502

64806503
async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
6504+
data_converter = self._async_activity_data_converter(input.id_or_token)
6505+
64816506
failure = temporalio.api.failure.v1.Failure()
6482-
await self._client.data_converter.encode_failure(input.error, failure)
6507+
await data_converter.encode_failure(input.error, failure)
64836508
last_heartbeat_details = (
64846509
None
64856510
if not input.last_heartbeat_details
6486-
else await self._client.data_converter.encode_wrapper(
6487-
input.last_heartbeat_details
6488-
)
6511+
else await data_converter.encode_wrapper(input.last_heartbeat_details)
64896512
)
64906513
if isinstance(input.id_or_token, AsyncActivityIDReference):
64916514
await self._client.workflow_service.respond_activity_task_failed_by_id(
@@ -6519,10 +6542,11 @@ async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
65196542
async def report_cancellation_async_activity(
65206543
self, input: ReportCancellationAsyncActivityInput
65216544
) -> None:
6545+
data_converter = self._async_activity_data_converter(input.id_or_token)
65226546
details = (
65236547
None
65246548
if not input.details
6525-
else await self._client.data_converter.encode_wrapper(input.details)
6549+
else await data_converter.encode_wrapper(input.details)
65266550
)
65276551
if isinstance(input.id_or_token, AsyncActivityIDReference):
65286552
await self._client.workflow_service.respond_activity_task_canceled_by_id(

temporalio/worker/_workflow_instance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3187,7 +3187,7 @@ def _resolve_failure(self, err: BaseException) -> None:
31873187
self._result_fut.set_result(None)
31883188

31893189
def _apply_schedule_command(self) -> None:
3190-
payload = self._payload_converter.to_payload(self._input.input)
3190+
payload = self._payload_converter.to_payload(self._input.input) # type: ignore TODO
31913191
command = self._instance._add_command()
31923192
v = command.schedule_nexus_operation
31933193
v.seq = self._seq

0 commit comments

Comments
 (0)