1717import temporalio .client
1818import temporalio .converter
1919import temporalio .runtime
20+ import temporalio .worker
2021import temporalio .workflow
2122
2223from ..common import HeaderCodecBehavior
@@ -89,7 +90,7 @@ def __init__(
8990 self ._config = plugin .configure_replayer (self ._config )
9091
9192 # Validate workflows after plugin configuration
92- if not self ._config [ "workflows" ] :
93+ if not self ._config . get ( "workflows" ) :
9394 raise ValueError ("At least one workflow must be specified" )
9495
9596 def config (self , * , active_config : bool = False ) -> ReplayerConfig :
@@ -103,7 +104,7 @@ def config(self, *, active_config: bool = False) -> ReplayerConfig:
103104 Configuration, shallow-copied.
104105 """
105106 config = self ._config .copy () if active_config else self ._initial_config .copy ()
106- config ["workflows" ] = list (config [ "workflows" ] )
107+ config ["workflows" ] = list (config . get ( "workflows" , []) )
107108 return config
108109
109110 async def replay_workflow (
@@ -191,6 +192,11 @@ def make_lambda(plugin, next):
191192 async def _workflow_replay_iterator (
192193 self , histories : AsyncIterator [temporalio .client .WorkflowHistory ]
193194 ) -> AsyncIterator [AsyncIterator [WorkflowReplayResult ]]:
195+ # Initialize variables to avoid unbound variable errors
196+ pusher = None
197+ workflow_worker_task = None
198+ bridge_worker_scope = None
199+
194200 try :
195201 last_replay_failure : Optional [Exception ]
196202 last_replay_complete = asyncio .Event ()
@@ -223,39 +229,50 @@ def on_eviction_hook(
223229
224230 # Create worker referencing bridge worker
225231 bridge_worker : temporalio .bridge .worker .Worker
226- task_queue = f"replay-{ self ._config ['build_id' ]} "
227- runtime = self ._config ["runtime" ] or temporalio .runtime .Runtime .default ()
232+ task_queue = f"replay-{ self ._config .get ('build_id' )} "
233+ runtime = (
234+ self ._config .get ("runtime" ) or temporalio .runtime .Runtime .default ()
235+ )
228236 workflow_worker = _WorkflowWorker (
229237 bridge_worker = lambda : bridge_worker ,
230- namespace = self ._config [ "namespace" ] ,
238+ namespace = self ._config . get ( "namespace" , "ReplayNamespace" ) ,
231239 task_queue = task_queue ,
232- workflows = self ._config [ "workflows" ] ,
233- workflow_task_executor = self ._config [ "workflow_task_executor" ] ,
240+ workflows = self ._config . get ( "workflows" , []) ,
241+ workflow_task_executor = self ._config . get ( "workflow_task_executor" ) ,
234242 max_concurrent_workflow_tasks = 5 ,
235- workflow_runner = self ._config ["workflow_runner" ],
236- unsandboxed_workflow_runner = self ._config ["unsandboxed_workflow_runner" ],
237- data_converter = self ._config ["data_converter" ],
238- interceptors = self ._config ["interceptors" ],
239- workflow_failure_exception_types = self ._config [
240- "workflow_failure_exception_types"
241- ],
242- debug_mode = self ._config ["debug_mode" ],
243+ workflow_runner = self ._config .get ("workflow_runner" )
244+ or SandboxedWorkflowRunner (),
245+ unsandboxed_workflow_runner = self ._config .get (
246+ "unsandboxed_workflow_runner"
247+ )
248+ or UnsandboxedWorkflowRunner (),
249+ data_converter = self ._config .get ("data_converter" )
250+ or temporalio .converter .DataConverter .default ,
251+ interceptors = self ._config .get ("interceptors" , []),
252+ workflow_failure_exception_types = self ._config .get (
253+ "workflow_failure_exception_types" , []
254+ ),
255+ debug_mode = self ._config .get ("debug_mode" , False ),
243256 metric_meter = runtime .metric_meter ,
244257 on_eviction_hook = on_eviction_hook ,
245258 disable_eager_activity_execution = False ,
246- disable_safe_eviction = self ._config ["disable_safe_workflow_eviction" ],
259+ disable_safe_eviction = self ._config .get (
260+ "disable_safe_workflow_eviction" , False
261+ ),
247262 should_enforce_versioning_behavior = False ,
248263 assert_local_activity_valid = lambda a : None ,
249- encode_headers = self ._config ["header_codec_behavior" ]
264+ encode_headers = self ._config .get (
265+ "header_codec_behavior" , HeaderCodecBehavior .NO_CODEC
266+ )
250267 != HeaderCodecBehavior .NO_CODEC ,
251268 )
252269 # Create bridge worker
253270 bridge_worker , pusher = temporalio .bridge .worker .Worker .for_replay (
254271 runtime ._core_runtime ,
255272 temporalio .bridge .worker .WorkerConfig (
256- namespace = self ._config [ "namespace" ] ,
273+ namespace = self ._config . get ( "namespace" , "ReplayNamespace" ) ,
257274 task_queue = task_queue ,
258- identity_override = self ._config [ "identity" ] ,
275+ identity_override = self ._config . get ( "identity" ) ,
259276 # Need to tell core whether we want to consider all
260277 # non-determinism exceptions as workflow fail, and whether we do
261278 # per workflow type
@@ -292,7 +309,7 @@ def on_eviction_hook(
292309 max_task_queue_activities_per_second = None ,
293310 graceful_shutdown_period_millis = 0 ,
294311 versioning_strategy = temporalio .bridge .worker .WorkerVersioningStrategyNone (
295- build_id_no_versioning = self ._config [ "build_id" ]
312+ build_id_no_versioning = self ._config . get ( "build_id" )
296313 or load_default_build_id (),
297314 ),
298315 workflow_task_poller_behavior = temporalio .bridge .worker .PollerBehaviorSimpleMaximum (
@@ -307,6 +324,8 @@ def on_eviction_hook(
307324 plugins = [plugin .name () for plugin in self .plugins ],
308325 ),
309326 )
327+ bridge_worker_scope = bridge_worker
328+
310329 # Start worker
311330 workflow_worker_task = asyncio .create_task (workflow_worker .run ())
312331
@@ -347,18 +366,20 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
347366 yield replay_iterator ()
348367 finally :
349368 # Close the pusher
350- pusher .close ()
369+ if pusher is not None :
370+ pusher .close ()
351371 # If the workflow worker task is not done, wait for it
352372 try :
353- if not workflow_worker_task .done ():
373+ if workflow_worker_task is not None and not workflow_worker_task .done ():
354374 await workflow_worker_task
355375 except Exception :
356376 logger .warning ("Failed to shutdown worker" , exc_info = True )
357377 finally :
358378 # We must shutdown here
359379 try :
360- bridge_worker .initiate_shutdown ()
361- await bridge_worker .finalize_shutdown ()
380+ if bridge_worker_scope is not None :
381+ bridge_worker_scope .initiate_shutdown ()
382+ await bridge_worker_scope .finalize_shutdown ()
362383 except Exception :
363384 logger .warning ("Failed to finalize shutdown" , exc_info = True )
364385
0 commit comments