@@ -255,49 +255,43 @@ async def _handle_activation(
255255 )
256256 completion .successful .SetInParent ()
257257 try :
258- # Decode the activation if there's a codec and not cache remove job
259-
260258 if LOG_PROTOS :
261259 logger .debug ("Received workflow activation:\n %s" , act )
262260
263- # If the workflow is not running yet, create it
264261 workflow = self ._running_workflows .get (act .run_id )
265-
266- if data_converter .payload_codec :
267- await temporalio .bridge .worker .decode_activation (
268- act ,
269- data_converter .payload_codec ,
270- decode_headers = self ._encode_headers ,
271- )
272262 if not workflow :
273- # Must have a initialize job to create instance
274263 if not init_job :
275264 raise RuntimeError (
276265 "Missing initialize workflow, workflow could have unexpectedly been removed from cache"
277266 )
278- data_converter = self ._data_converter ._with_context (
279- temporalio .converter .WorkflowSerializationContext (
280- namespace = self ._namespace ,
281- workflow_id = init_job .workflow_id ,
267+ workflow_id = init_job .workflow_id
268+ else :
269+ workflow_id = workflow .workflow_id
270+ if init_job :
271+ # Should never happen
272+ logger .warning (
273+ "Cache already exists for activation with initialize job"
282274 )
283- )
284- workflow = _RunningWorkflow (
285- self ._create_workflow_instance (act , init_job ),
286- init_job .workflow_id ,
287- )
288- self ._running_workflows [act .run_id ] = workflow
289- elif init_job :
290- # This should never happen
291- logger .warning (
292- "Cache already exists for activation with initialize job"
293- )
294275
295276 data_converter = self ._data_converter ._with_context (
296277 temporalio .converter .WorkflowSerializationContext (
297278 namespace = self ._namespace ,
298- workflow_id = workflow . workflow_id ,
279+ workflow_id = workflow_id ,
299280 )
300281 )
282+ if data_converter .payload_codec :
283+ await temporalio .bridge .worker .decode_activation (
284+ act ,
285+ data_converter .payload_codec ,
286+ decode_headers = self ._encode_headers ,
287+ )
288+ if not workflow :
289+ assert init_job
290+ workflow = _RunningWorkflow (
291+ self ._create_workflow_instance (act , init_job ),
292+ workflow_id ,
293+ )
294+ self ._running_workflows [act .run_id ] = workflow
301295
302296 # Run activation in separate thread so we can check if it's
303297 # deadlocked
@@ -337,7 +331,6 @@ async def _handle_activation(
337331 "Failed handling activation on workflow with run ID %s" , act .run_id
338332 )
339333
340- # Set completion failure
341334 completion .failed .failure .SetInParent ()
342335 try :
343336 data_converter .failure_converter .to_failure (
@@ -354,10 +347,9 @@ async def _handle_activation(
354347 f"Failed converting activation exception: { inner_err } "
355348 )
356349
357- # Always set the run ID on the completion
358350 completion .run_id = act .run_id
359351
360- # Encode the completion if there's a codec and not cache remove job
352+ # Encode completion
361353 if data_converter .payload_codec :
362354 try :
363355 await temporalio .bridge .worker .encode_completion (
0 commit comments