@@ -53,6 +53,7 @@ def start_daemon_worker_in_foreground_and_redirect_streams(
5353
5454    try :
5555        pid  =  os .getpid ()
56+         # For easier debugging you can change these to stdout 
5657        sys .stdout  =  open (log_dir  /  f'worker-{ pid }  , 'w' )
5758        sys .stderr  =  open (log_dir  /  f'worker-{ pid }  , 'w' )
5859        start_daemon_worker (False , aiida_profile_name )
@@ -65,17 +66,32 @@ def start_daemon_worker_in_foreground_and_redirect_streams(
6566            sys .stderr  =  original_stderr 
6667
6768
69+ from  aiida .engine .utils  import  exponential_backoff_retry 
70+ 
71+ 
6872# We have to define the mock functions globally as we cannot pass local function to a spawn process 
6973class  MockFunctions :
7074    @staticmethod  
7175    def  mock_open (_ ):
7276        raise  Exception ('Mock open exception' )
7377
7478    @staticmethod  
75-     async  def  mock_exponential_backoff_retry (* _ , ** __ ):
79+     async  def  exponential_backoff_retry_fail_upload (fct : t .Callable [..., t .Any ], * args , ** kwargs ):
80+         from  aiida .common .exceptions  import  TransportTaskException 
81+ 
82+         if  'do_upload'  in  fct .__name__ :
83+             raise  TransportTaskException 
84+         else :
85+             return  await  exponential_backoff_retry (fct , * args , ** kwargs )
86+ 
87+     @staticmethod  
88+     async  def  exponential_backoff_retry_fail_kill (fct : t .Callable [..., t .Any ], * args , ** kwargs ):
7689        from  aiida .common .exceptions  import  TransportTaskException 
7790
78-         raise  TransportTaskException 
91+         if  'do_kill'  in  fct .__name__ :
92+             raise  TransportTaskException 
93+         else :
94+             return  await  exponential_backoff_retry (fct , * args , ** kwargs )
7995
8096
8197@pytest .fixture (scope = 'function' ) 
@@ -213,11 +229,12 @@ def make_a_builder(sleep_seconds=0):
213229
214230@pytest .mark .requires_rmq  
215231@pytest .mark .usefixtures ('started_daemon_client' ) 
216- def  test_process_kill_failng_ebm (
232+ def  test_process_kill_failing_ebm_upload (
217233    fork_worker_context , submit_and_await , aiida_code_installed , run_cli_command , monkeypatch 
218234):
219-     """9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally. 
220-     # (e.g. in scenarios that transport is working again) 
235+     """Kill a process that is waiting after failed EBM during upload. It should be possible to kill it normally. 
236+ 
237+     A process that failed upload (e.g. in scenarios that transport is working again) and is then killed with 
221238    """ 
222239    from  aiida .orm  import  Int 
223240
@@ -232,7 +249,10 @@ def make_a_builder(sleep_seconds=0):
232249
233250    kill_timeout  =  10 
234251
235-     monkeypatch_args  =  ('aiida.engine.utils.exponential_backoff_retry' , MockFunctions .mock_exponential_backoff_retry )
252+     monkeypatch_args  =  (
253+         'aiida.engine.utils.exponential_backoff_retry' ,
254+         MockFunctions .exponential_backoff_retry_fail_upload ,
255+     )
236256    with  fork_worker_context (monkeypatch .setattr , monkeypatch_args ):
237257        node  =  submit_and_await (make_a_builder (), ProcessState .WAITING )
238258        await_condition (
@@ -241,11 +261,56 @@ def make_a_builder(sleep_seconds=0):
241261            timeout = kill_timeout ,
242262        )
243263
244-         # should restart EBM and be again not successful 
264+         # kill should start EBM and should successfully kill 
265+         run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
266+         await_condition (lambda : node .is_killed , timeout = kill_timeout )
267+ 
268+ 
269+ @pytest .mark .requires_rmq  
270+ @pytest .mark .usefixtures ('started_daemon_client' ) 
271+ def  test_process_kill_failing_ebm_kill (
272+     fork_worker_context , submit_and_await , aiida_code_installed , run_cli_command , monkeypatch 
273+ ):
274+     """Kill a process that with a failng EBM during the kill. 
275+ 
276+     Killing a process tries to gracefully cancel the job on the remote node. If there are connection problems it retries 
277+     it in using the EBM. If this fails another kill command can be send to restart the cancelation of the job scheduler. 
278+     """ 
279+     from  aiida .orm  import  Int 
280+ 
281+     code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
282+ 
283+     def  make_a_builder (sleep_seconds = 0 ):
284+         builder  =  code .get_builder ()
285+         builder .x  =  Int (1 )
286+         builder .y  =  Int (1 )
287+         builder .metadata .options .sleep  =  sleep_seconds 
288+         return  builder 
289+ 
290+     kill_timeout  =  10 
291+ 
292+     monkeypatch_args  =  (
293+         'aiida.engine.utils.exponential_backoff_retry' ,
294+         MockFunctions .exponential_backoff_retry_fail_kill ,
295+     )
296+     # from aiida.engine.utils import exponential_backoff_retry 
297+     # monkeypatch_args = ('aiida.engine.utils.exponential_backoff_retry', exponential_backoff_retry) 
298+     with  fork_worker_context (monkeypatch .setattr , monkeypatch_args ):
299+         node  =  submit_and_await (make_a_builder (kill_timeout  +  10 ), ProcessState .WAITING , timeout = kill_timeout )
300+         await_condition (
301+             lambda : node .process_status  ==  'Monitoring scheduler: job state RUNNING' ,
302+             timeout = kill_timeout ,
303+         )
304+ 
305+         # kill should start EBM and be not successful in EBM 
306+         run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
307+         await_condition (lambda : not  node .is_killed , timeout = kill_timeout )
308+ 
309+         # kill should restart EBM and be not successful in EBM 
245310        run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
246311        await_condition (lambda : not  node .is_killed , timeout = kill_timeout )
247312
248-         # should skip EBM and successfully kill the process 
313+         # force kill  should skip EBM and successfully kill the process 
249314        run_cli_command (cmd_process .process_kill , [str (node .pk ), '-F' , '--wait' ])
250315        await_condition (lambda : node .is_killed , timeout = kill_timeout )
251316
0 commit comments