@@ -60,6 +60,7 @@ def __init__(
6060 [str , temporalio .bridge .proto .workflow_activation .RemoveFromCache ], None
6161 ]
6262 ],
63+ disable_safe_eviction : bool ,
6364 ) -> None :
6465 self ._bridge_worker = bridge_worker
6566 self ._namespace = namespace
@@ -91,6 +92,7 @@ def __init__(
9192 self ._running_workflows : Dict [str , WorkflowInstance ] = {}
9293 self ._disable_eager_activity_execution = disable_eager_activity_execution
9394 self ._on_eviction_hook = on_eviction_hook
95+ self ._disable_safe_eviction = disable_safe_eviction
9496 self ._throw_after_activation : Optional [Exception ] = None
9597
9698 # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
@@ -99,6 +101,9 @@ def __init__(
99101 None if debug_mode or os .environ .get ("TEMPORAL_DEBUG" ) else 2
100102 )
101103
104+ # Keep track of workflows that could not be evicted
105+ self ._could_not_evict_count = 0
106+
102107 # Validate and build workflow dict
103108 self ._workflows : Dict [str , temporalio .workflow ._Definition ] = {}
104109 self ._dynamic_workflow : Optional [temporalio .workflow ._Definition ] = None
@@ -155,6 +160,13 @@ async def run(self) -> None:
155160 if self ._throw_after_activation :
156161 raise self ._throw_after_activation
157162
163+ def notify_shutdown (self ) -> None :
164+ if self ._could_not_evict_count :
165+ logger .warn (
166+ f"Shutting down workflow worker, but { self ._could_not_evict_count } "
167+ + "workflow(s) could not be evicted previously, so the shutdown will hang"
168+ )
169+
158170 # Only call this if run() raised an error
159171 async def drain_poll_queue (self ) -> None :
160172 while True :
@@ -182,42 +194,43 @@ async def _handle_activation(
182194 cache_remove_job = job .remove_from_cache
183195 elif job .HasField ("start_workflow" ):
184196 start_job = job .start_workflow
185- cache_remove_only_activation = len (act .jobs ) == 1 and cache_remove_job
186197
187198 # Build default success completion (e.g. remove-job-only activations)
188199 completion = (
189200 temporalio .bridge .proto .workflow_completion .WorkflowActivationCompletion ()
190201 )
191202 completion .successful .SetInParent ()
192203 try :
193- # Decode the activation if there's a codec and it's not a
194- # cache-remove-only activation
195- if self ._data_converter .payload_codec and not cache_remove_only_activation :
204+ # Decode the activation if there's a codec and not cache remove job
205+ if self ._data_converter .payload_codec and not cache_remove_job :
196206 await temporalio .bridge .worker .decode_activation (
197207 act , self ._data_converter .payload_codec
198208 )
199209
200210 if LOG_PROTOS :
201211 logger .debug ("Received workflow activation:\n %s" , act )
202212
203- # We only have to run if there are any non-remove-from-cache jobs
204- if not cache_remove_only_activation :
205- # If the workflow is not running yet, create it
213+ # If the workflow is not running yet and this isn't a cache remove
214+ # job, create it. We do not even fetch a workflow if it's a cache
215+ # remove job and safe evictions are enabled
216+ workflow = None
217+ if not cache_remove_job or not self ._disable_safe_eviction :
206218 workflow = self ._running_workflows .get (act .run_id )
207- if not workflow :
208- # Must have a start job to create instance
209- if not start_job :
210- raise RuntimeError (
211- "Missing start workflow, workflow could have unexpectedly been removed from cache"
212- )
213- workflow = self ._create_workflow_instance (act , start_job )
214- self ._running_workflows [act .run_id ] = workflow
215- elif start_job :
216- # This should never happen
217- logger .warn ("Cache already exists for activation with start job" )
218-
219- # Run activation in separate thread so we can check if it's
220- # deadlocked
219+ if not workflow and not cache_remove_job :
220+ # Must have a start job to create instance
221+ if not start_job :
222+ raise RuntimeError (
223+ "Missing start workflow, workflow could have unexpectedly been removed from cache"
224+ )
225+ workflow = self ._create_workflow_instance (act , start_job )
226+ self ._running_workflows [act .run_id ] = workflow
227+ elif start_job :
228+ # This should never happen
229+ logger .warn ("Cache already exists for activation with start job" )
230+
231+ # Run activation in separate thread so we can check if it's
232+ # deadlocked
233+ if workflow :
221234 activate_task = asyncio .get_running_loop ().run_in_executor (
222235 self ._workflow_task_executor ,
223236 workflow .activate ,
@@ -234,6 +247,17 @@ async def _handle_activation(
234247 f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within { self ._deadlock_timeout_seconds } second(s)"
235248 )
236249 except Exception as err :
250+ # We cannot fail a cache eviction, we must just log and not complete
251+ # the activation (failed or otherwise). This should only happen in
252+ # cases of deadlock or tasks not properly completing, and yes this
253+ # means that a slot is forever taken.
254+ # TODO(cretz): Should we build a complex mechanism to continually
255+ # try the eviction until it succeeds?
256+ if cache_remove_job :
257+ logger .exception ("Failed running eviction job, not evicting" )
258+ self ._could_not_evict_count += 1
259+ return
260+
237261 logger .exception (
238262 "Failed handling activation on workflow with run ID %s" , act .run_id
239263 )
@@ -257,7 +281,9 @@ async def _handle_activation(
257281 # Always set the run ID on the completion
258282 completion .run_id = act .run_id
259283
260- # If there is a remove-from-cache job, do so
284+ # If there is a remove-from-cache job, do so. We don't need to log a
285+ # warning if there's not, because create workflow failing for
286+ # unregistered workflow still triggers cache remove job
261287 if cache_remove_job :
262288 if act .run_id in self ._running_workflows :
263289 logger .debug (
@@ -266,16 +292,9 @@ async def _handle_activation(
266292 cache_remove_job .message ,
267293 )
268294 del self ._running_workflows [act .run_id ]
269- else :
270- logger .warn (
271- "Eviction request on unknown workflow with run ID %s, message: %s" ,
272- act .run_id ,
273- cache_remove_job .message ,
274- )
275295
276- # Encode the completion if there's a codec and it's not a
277- # cache-remove-only activation
278- if self ._data_converter .payload_codec and not cache_remove_only_activation :
296+ # Encode the completion if there's a codec and not cache remove job
297+ if self ._data_converter .payload_codec and not cache_remove_job :
279298 try :
280299 await temporalio .bridge .worker .encode_completion (
281300 completion , self ._data_converter .payload_codec
0 commit comments