@@ -43,6 +43,7 @@ def __init__(
4343 interceptors : Sequence [Interceptor ] = [],
4444 build_id : Optional [str ] = None ,
4545 identity : Optional [str ] = None ,
46+ workflow_failure_exception_types : Sequence [Type [BaseException ]] = [],
4647 debug_mode : bool = False ,
4748 runtime : Optional [temporalio .runtime .Runtime ] = None ,
4849 disable_safe_workflow_eviction : bool = False ,
@@ -66,6 +67,7 @@ def __init__(
6667 interceptors = interceptors ,
6768 build_id = build_id ,
6869 identity = identity ,
70+ workflow_failure_exception_types = workflow_failure_exception_types ,
6971 debug_mode = debug_mode ,
7072 runtime = runtime ,
7173 disable_safe_workflow_eviction = disable_safe_workflow_eviction ,
@@ -153,35 +155,6 @@ async def workflow_replay_iterator(
153155 An async iterator that returns replayed workflow results as they are
154156 replayed.
155157 """
156- # Create bridge worker
157- task_queue = f"replay-{ self ._config ['build_id' ]} "
158- runtime = self ._config ["runtime" ] or temporalio .runtime .Runtime .default ()
159- bridge_worker , pusher = temporalio .bridge .worker .Worker .for_replay (
160- runtime ._core_runtime ,
161- temporalio .bridge .worker .WorkerConfig (
162- namespace = self ._config ["namespace" ],
163- task_queue = task_queue ,
164- build_id = self ._config ["build_id" ] or load_default_build_id (),
165- identity_override = self ._config ["identity" ],
166- # All values below are ignored but required by Core
167- max_cached_workflows = 2 ,
168- max_outstanding_workflow_tasks = 2 ,
169- max_outstanding_activities = 1 ,
170- max_outstanding_local_activities = 1 ,
171- max_concurrent_workflow_task_polls = 1 ,
172- nonsticky_to_sticky_poll_ratio = 1 ,
173- max_concurrent_activity_task_polls = 1 ,
174- no_remote_activities = True ,
175- sticky_queue_schedule_to_start_timeout_millis = 1000 ,
176- max_heartbeat_throttle_interval_millis = 1000 ,
177- default_heartbeat_throttle_interval_millis = 1000 ,
178- max_activities_per_second = None ,
179- max_task_queue_activities_per_second = None ,
180- graceful_shutdown_period_millis = 0 ,
181- use_worker_versioning = False ,
182- ),
183- )
184-
185158 try :
186159 last_replay_failure : Optional [Exception ]
187160 last_replay_complete = asyncio .Event ()
@@ -212,29 +185,62 @@ def on_eviction_hook(
212185 last_replay_failure = None
213186 last_replay_complete .set ()
214187
215- # Start the worker
216- workflow_worker_task = asyncio .create_task (
217- _WorkflowWorker (
218- bridge_worker = lambda : bridge_worker ,
188+ # Create worker referencing bridge worker
189+ bridge_worker : temporalio .bridge .worker .Worker
190+ task_queue = f"replay-{ self ._config ['build_id' ]} "
191+ runtime = self ._config ["runtime" ] or temporalio .runtime .Runtime .default ()
192+ workflow_worker = _WorkflowWorker (
193+ bridge_worker = lambda : bridge_worker ,
194+ namespace = self ._config ["namespace" ],
195+ task_queue = task_queue ,
196+ workflows = self ._config ["workflows" ],
197+ workflow_task_executor = self ._config ["workflow_task_executor" ],
198+ workflow_runner = self ._config ["workflow_runner" ],
199+ unsandboxed_workflow_runner = self ._config ["unsandboxed_workflow_runner" ],
200+ data_converter = self ._config ["data_converter" ],
201+ interceptors = self ._config ["interceptors" ],
202+ workflow_failure_exception_types = self ._config [
203+ "workflow_failure_exception_types"
204+ ],
205+ debug_mode = self ._config ["debug_mode" ],
206+ metric_meter = runtime .metric_meter ,
207+ on_eviction_hook = on_eviction_hook ,
208+ disable_eager_activity_execution = False ,
209+ disable_safe_eviction = self ._config ["disable_safe_workflow_eviction" ],
210+ )
211+ # Create bridge worker
212+ bridge_worker , pusher = temporalio .bridge .worker .Worker .for_replay (
213+ runtime ._core_runtime ,
214+ temporalio .bridge .worker .WorkerConfig (
219215 namespace = self ._config ["namespace" ],
220216 task_queue = task_queue ,
221- workflows = self ._config ["workflows" ],
222- workflow_task_executor = self ._config ["workflow_task_executor" ],
223- workflow_runner = self ._config ["workflow_runner" ],
224- unsandboxed_workflow_runner = self ._config [
225- "unsandboxed_workflow_runner"
226- ],
227- data_converter = self ._config ["data_converter" ],
228- interceptors = self ._config ["interceptors" ],
229- debug_mode = self ._config ["debug_mode" ],
230- metric_meter = runtime .metric_meter ,
231- on_eviction_hook = on_eviction_hook ,
232- disable_eager_activity_execution = False ,
233- disable_safe_eviction = self ._config [
234- "disable_safe_workflow_eviction"
235- ],
236- ).run ()
217+ build_id = self ._config ["build_id" ] or load_default_build_id (),
218+ identity_override = self ._config ["identity" ],
219+ # Need to tell core whether we want to consider all
220+ # non-determinism exceptions as workflow fail, and whether we do
221+ # per workflow type
222+ nondeterminism_as_workflow_fail = workflow_worker .nondeterminism_as_workflow_fail (),
223+ nondeterminism_as_workflow_fail_for_types = workflow_worker .nondeterminism_as_workflow_fail_for_types (),
224+ # All values below are ignored but required by Core
225+ max_cached_workflows = 2 ,
226+ max_outstanding_workflow_tasks = 2 ,
227+ max_outstanding_activities = 1 ,
228+ max_outstanding_local_activities = 1 ,
229+ max_concurrent_workflow_task_polls = 1 ,
230+ nonsticky_to_sticky_poll_ratio = 1 ,
231+ max_concurrent_activity_task_polls = 1 ,
232+ no_remote_activities = True ,
233+ sticky_queue_schedule_to_start_timeout_millis = 1000 ,
234+ max_heartbeat_throttle_interval_millis = 1000 ,
235+ default_heartbeat_throttle_interval_millis = 1000 ,
236+ max_activities_per_second = None ,
237+ max_task_queue_activities_per_second = None ,
238+ graceful_shutdown_period_millis = 0 ,
239+ use_worker_versioning = False ,
240+ ),
237241 )
242+ # Start worker
243+ workflow_worker_task = asyncio .create_task (workflow_worker .run ())
238244
239245 # Yield iterator
240246 async def replay_iterator () -> AsyncIterator [WorkflowReplayResult ]:
@@ -301,6 +307,7 @@ class ReplayerConfig(TypedDict, total=False):
301307 interceptors : Sequence [Interceptor ]
302308 build_id : Optional [str ]
303309 identity : Optional [str ]
310+ workflow_failure_exception_types : Sequence [Type [BaseException ]]
304311 debug_mode : bool
305312 runtime : Optional [temporalio .runtime .Runtime ]
306313 disable_safe_workflow_eviction : bool
0 commit comments