Skip to content

Commit 73c7f47

Browse files
committed
Wire codec context in worker activation processing
1 parent ef5981c commit 73c7f47

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

temporalio/worker/_workflow.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -248,20 +248,13 @@ async def _handle_activation(
248248
await self._handle_cache_eviction(act, cache_remove_job)
249249
return
250250

251+
data_converter = self._data_converter
251252
# Build default success completion (e.g. remove-job-only activations)
252253
completion = (
253254
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
254255
)
255256
completion.successful.SetInParent()
256257
try:
257-
# Decode the activation if there's a codec and not cache remove job
258-
if self._data_converter.payload_codec:
259-
await temporalio.bridge.worker.decode_activation(
260-
act,
261-
self._data_converter.payload_codec,
262-
decode_headers=self._encode_headers,
263-
)
264-
265258
if LOG_PROTOS:
266259
logger.debug("Received workflow activation:\n%s", act)
267260

@@ -273,16 +266,28 @@ async def _handle_activation(
273266
raise RuntimeError(
274267
"Missing initialize workflow, workflow could have unexpectedly been removed from cache"
275268
)
276-
workflow = _RunningWorkflow(
277-
self._create_workflow_instance(act, init_job)
278-
)
269+
workflow_instance, det = self._create_workflow_instance(act, init_job)
270+
workflow = _RunningWorkflow(workflow_instance, det.info.workflow_id)
279271
self._running_workflows[act.run_id] = workflow
280272
elif init_job:
281273
# This should never happen
282274
logger.warning(
283275
"Cache already exists for activation with initialize job"
284276
)
285277

278+
data_converter = self._data_converter._with_context(
279+
temporalio.converter.WorkflowSerializationContext(
280+
namespace=self._namespace,
281+
workflow_id=workflow.workflow_id,
282+
)
283+
)
284+
if data_converter.payload_codec:
285+
await temporalio.bridge.worker.decode_activation(
286+
act,
287+
data_converter.payload_codec,
288+
decode_headers=self._encode_headers,
289+
)
290+
286291
# Run activation in separate thread so we can check if it's
287292
# deadlocked
288293
if workflow:
@@ -325,9 +330,9 @@ async def _handle_activation(
325330
# Set completion failure
326331
completion.failed.failure.SetInParent()
327332
try:
328-
self._data_converter.failure_converter.to_failure(
333+
data_converter.failure_converter.to_failure(
329334
err,
330-
self._data_converter.payload_converter,
335+
data_converter.payload_converter,
331336
completion.failed.failure,
332337
)
333338
except Exception as inner_err:
@@ -343,11 +348,11 @@ async def _handle_activation(
343348
completion.run_id = act.run_id
344349

345350
# Encode the completion if there's a codec and not cache remove job
346-
if self._data_converter.payload_codec:
351+
if data_converter.payload_codec:
347352
try:
348353
await temporalio.bridge.worker.encode_completion(
349354
completion,
350-
self._data_converter.payload_codec,
355+
data_converter.payload_codec,
351356
encode_headers=self._encode_headers,
352357
)
353358
except Exception as err:
@@ -491,7 +496,7 @@ def _create_workflow_instance(
491496
self,
492497
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
493498
init: temporalio.bridge.proto.workflow_activation.InitializeWorkflow,
494-
) -> WorkflowInstance:
499+
) -> tuple[WorkflowInstance, WorkflowInstanceDetails]:
495500
# Get the definition
496501
defn = self._workflows.get(init.workflow_type, self._dynamic_workflow)
497502
if not defn:
@@ -571,9 +576,9 @@ def _create_workflow_instance(
571576
last_failure=last_failure,
572577
)
573578
if defn.sandboxed:
574-
return self._workflow_runner.create_instance(det)
579+
return self._workflow_runner.create_instance(det), det
575580
else:
576-
return self._unsandboxed_workflow_runner.create_instance(det)
581+
return self._unsandboxed_workflow_runner.create_instance(det), det
577582

578583
def nondeterminism_as_workflow_fail(self) -> bool:
579584
return any(
@@ -667,8 +672,9 @@ def _gen_tb_helper(
667672

668673

669674
class _RunningWorkflow:
670-
def __init__(self, instance: WorkflowInstance):
675+
def __init__(self, instance: WorkflowInstance, workflow_id: str):
671676
self.instance = instance
677+
self.workflow_id = workflow_id
672678
self.deadlocked_activation_task: Optional[Awaitable] = None
673679
self._deadlock_can_be_interrupted_lock = threading.Lock()
674680
self._deadlock_can_be_interrupted = False

0 commit comments

Comments
 (0)