@@ -255,19 +255,36 @@ async def _handle_activation(
255255 )
256256 completion .successful .SetInParent ()
257257 try :
258+ # Decode the activation if there's a codec and not cache remove job
259+
258260 if LOG_PROTOS :
259261 logger .debug ("Received workflow activation:\n %s" , act )
260262
261263 # If the workflow is not running yet, create it
262264 workflow = self ._running_workflows .get (act .run_id )
265+
266+ if data_converter .payload_codec :
267+ await temporalio .bridge .worker .decode_activation (
268+ act ,
269+ data_converter .payload_codec ,
270+ decode_headers = self ._encode_headers ,
271+ )
263272 if not workflow :
264273 # Must have a initialize job to create instance
265274 if not init_job :
266275 raise RuntimeError (
267276 "Missing initialize workflow, workflow could have unexpectedly been removed from cache"
268277 )
269- workflow_instance , det = self ._create_workflow_instance (act , init_job )
270- workflow = _RunningWorkflow (workflow_instance , det .info .workflow_id )
278+ data_converter = self ._data_converter ._with_context (
279+ temporalio .converter .WorkflowSerializationContext (
280+ namespace = self ._namespace ,
281+ workflow_id = init_job .workflow_id ,
282+ )
283+ )
284+ workflow = _RunningWorkflow (
285+ self ._create_workflow_instance (act , init_job ),
286+ init_job .workflow_id ,
287+ )
271288 self ._running_workflows [act .run_id ] = workflow
272289 elif init_job :
273290 # This should never happen
@@ -281,12 +298,6 @@ async def _handle_activation(
281298 workflow_id = workflow .workflow_id ,
282299 )
283300 )
284- if data_converter .payload_codec :
285- await temporalio .bridge .worker .decode_activation (
286- act ,
287- data_converter .payload_codec ,
288- decode_headers = self ._encode_headers ,
289- )
290301
291302 # Run activation in separate thread so we can check if it's
292303 # deadlocked
@@ -495,7 +506,7 @@ def _create_workflow_instance(
495506 self ,
496507 act : temporalio .bridge .proto .workflow_activation .WorkflowActivation ,
497508 init : temporalio .bridge .proto .workflow_activation .InitializeWorkflow ,
498- ) -> tuple [ WorkflowInstance , WorkflowInstanceDetails ] :
509+ ) -> WorkflowInstance :
499510 # Get the definition
500511 defn = self ._workflows .get (init .workflow_type , self ._dynamic_workflow )
501512 if not defn :
@@ -575,9 +586,9 @@ def _create_workflow_instance(
575586 last_failure = last_failure ,
576587 )
577588 if defn .sandboxed :
578- return self ._workflow_runner .create_instance (det ), det
589+ return self ._workflow_runner .create_instance (det )
579590 else :
580- return self ._unsandboxed_workflow_runner .create_instance (det ), det
591+ return self ._unsandboxed_workflow_runner .create_instance (det )
581592
582593 def nondeterminism_as_workflow_fail (self ) -> bool :
583594 return any (
0 commit comments