2525from  aiida .common .log  import  LOG_LEVEL_REPORT 
2626from  aiida .engine  import  Process , ProcessState 
2727from  aiida .engine .processes  import  control  as  process_control 
28- from  aiida .orm  import  CalcJobNode , Group , WorkChainNode , WorkflowNode , WorkFunctionNode 
28+ from  aiida .engine .utils  import  exponential_backoff_retry 
29+ from  aiida .orm  import  CalcJobNode , Group , Int , WorkChainNode , WorkflowNode , WorkFunctionNode 
2930from  tests .utils .processes  import  WaitProcess 
3031
3132FuncArgs  =  tuple [t .Any , ...]
@@ -53,6 +54,7 @@ def start_daemon_worker_in_foreground_and_redirect_streams(
5354
5455    try :
5556        pid  =  os .getpid ()
57+         # For easier debugging you can change these to stdout 
5658        sys .stdout  =  open (log_dir  /  f'worker-{ pid }  , 'w' )
5759        sys .stderr  =  open (log_dir  /  f'worker-{ pid }  , 'w' )
5860        start_daemon_worker (False , aiida_profile_name )
@@ -72,10 +74,22 @@ def mock_open(_):
7274        raise  Exception ('Mock open exception' )
7375
7476    @staticmethod  
75-     async  def  mock_exponential_backoff_retry ( * _ , ** __ ):
77+     async  def  exponential_backoff_retry_fail_upload ( fct :  t . Callable [...,  t . Any ],  * args , ** kwargs ):
7678        from  aiida .common .exceptions  import  TransportTaskException 
7779
78-         raise  TransportTaskException 
80+         if  'do_upload'  in  fct .__name__ :
81+             raise  TransportTaskException 
82+         else :
83+             return  await  exponential_backoff_retry (fct , * args , ** kwargs )
84+ 
85+     @staticmethod  
86+     async  def  exponential_backoff_retry_fail_kill (fct : t .Callable [..., t .Any ], * args , ** kwargs ):
87+         from  aiida .common .exceptions  import  TransportTaskException 
88+ 
89+         if  'do_kill'  in  fct .__name__ :
90+             raise  TransportTaskException 
91+         else :
92+             return  await  exponential_backoff_retry (fct , * args , ** kwargs )
7993
8094
8195@pytest .fixture (scope = 'function' ) 
@@ -138,7 +152,6 @@ def test_process_kill_failing_transport(
138152    A failure in opening a transport connection results in the EBM to be fired blocking a regular kill command. 
139153    The force kill command will ignore the EBM and kill the process in any case.""" 
140154    from  aiida .cmdline .utils .common  import  get_process_function_report 
141-     from  aiida .orm  import  Int 
142155
143156    code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
144157
@@ -179,7 +192,6 @@ def test_process_kill_failing_transport_failed_kill(
179192    """ 
180193
181194    from  aiida .cmdline .utils .common  import  get_process_function_report 
182-     from  aiida .orm  import  Int 
183195
184196    code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
185197
@@ -213,14 +225,14 @@ def make_a_builder(sleep_seconds=0):
213225
214226@pytest .mark .requires_rmq  
215227@pytest .mark .usefixtures ('started_daemon_client' ) 
216- def  test_process_kill_failng_ebm (
228+ def  test_process_kill_failing_ebm_transport (
217229    fork_worker_context , submit_and_await , aiida_code_installed , run_cli_command , monkeypatch 
218230):
219-     """9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally. 
220-     # (e.g. in scenarios that transport is working again) 
221-     """ 
222-     from  aiida .orm  import  Int 
231+     """Kill a process that is waiting after failed EBM during a transport task. 
223232
233+     It should be possible to kill it normally. A process that failed upload (e.g. in scenarios that transport is working 
234+     again) and is then killed 
235+     """ 
224236    code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
225237
226238    def  make_a_builder (sleep_seconds = 0 ):
@@ -232,7 +244,10 @@ def make_a_builder(sleep_seconds=0):
232244
233245    kill_timeout  =  10 
234246
235-     monkeypatch_args  =  ('aiida.engine.utils.exponential_backoff_retry' , MockFunctions .mock_exponential_backoff_retry )
247+     monkeypatch_args  =  (
248+         'aiida.engine.utils.exponential_backoff_retry' ,
249+         MockFunctions .exponential_backoff_retry_fail_upload ,
250+     )
236251    with  fork_worker_context (monkeypatch .setattr , monkeypatch_args ):
237252        node  =  submit_and_await (make_a_builder (), ProcessState .WAITING )
238253        await_condition (
@@ -241,7 +256,60 @@ def make_a_builder(sleep_seconds=0):
241256            timeout = kill_timeout ,
242257        )
243258
259+         # kill should start EBM and should successfully kill 
260+         run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
261+         await_condition (lambda : node .is_killed , timeout = kill_timeout )
262+ 
263+ 
264+ @pytest .mark .requires_rmq  
265+ @pytest .mark .usefixtures ('started_daemon_client' ) 
266+ def  test_process_kill_failing_ebm_kill (
267+     fork_worker_context , submit_and_await , aiida_code_installed , run_cli_command , monkeypatch 
268+ ):
269+     """Kill a process that had previously failed with an EBM. 
270+ 
271+     Killing a process tries to gracefully cancel the job on the remote node. If there are connection problems it retries 
272+     it in using the EBM. If this fails another kill command can be send to restart the cancelation of the job scheduler. 
273+     """ 
274+     from  aiida .cmdline .utils .common  import  get_process_function_report 
275+ 
276+     code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
277+ 
278+     def  make_a_builder (sleep_seconds = 0 ):
279+         builder  =  code .get_builder ()
280+         builder .x  =  Int (1 )
281+         builder .y  =  Int (1 )
282+         builder .metadata .options .sleep  =  sleep_seconds 
283+         return  builder 
284+ 
285+     kill_timeout  =  10 
286+ 
287+     monkeypatch_args  =  (
288+         'aiida.engine.utils.exponential_backoff_retry' ,
289+         MockFunctions .exponential_backoff_retry_fail_kill ,
290+     )
291+     with  fork_worker_context (monkeypatch .setattr , monkeypatch_args ):
292+         node  =  submit_and_await (make_a_builder (kill_timeout  +  10 ), ProcessState .WAITING , timeout = kill_timeout )
293+         await_condition (
294+             lambda : node .process_status  ==  'Monitoring scheduler: job state RUNNING' ,
295+             timeout = kill_timeout ,
296+         )
297+ 
298+         # kill should start EBM and be not successful in EBM 
244299        run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
300+         await_condition (lambda : not  node .is_killed , timeout = kill_timeout )
301+ 
302+         # kill should restart EBM and be not successful in EBM 
303+         # this tests if the old task is cancelled and restarted successfully 
304+         run_cli_command (cmd_process .process_kill , [str (node .pk ), '--wait' ])
305+         await_condition (
306+             lambda : 'Found active scheduler job cancelation that will be rescheduled.' 
307+             in  get_process_function_report (node ),
308+             timeout = kill_timeout ,
309+         )
310+ 
311+         # force kill should skip EBM and successfully kill the process 
312+         run_cli_command (cmd_process .process_kill , [str (node .pk ), '-F' , '--wait' ])
245313        await_condition (lambda : node .is_killed , timeout = kill_timeout )
246314
247315
@@ -758,8 +826,6 @@ def test_process_kill(submit_and_await, run_cli_command, aiida_code_installed):
758826    assert  result .exit_code  ==  ExitCode .USAGE_ERROR 
759827    assert  len (result .output_lines ) >  0 
760828
761-     from  aiida .orm  import  Int 
762- 
763829    code  =  aiida_code_installed (default_calc_job_plugin = 'core.arithmetic.add' , filepath_executable = '/bin/bash' )
764830    builder  =  code .get_builder ()
765831    builder .x  =  Int (2 )
0 commit comments