@@ -216,31 +216,34 @@ class Services:
216216 # log file stream lag
217217 CAT_LOG_SLEEP = 1
218218
219+ # command timeout for commands which start schedulers
220+ START_TIMEOUT = 120
221+
219222 @staticmethod
220223 def _error (message : Union [Exception , str ]):
221224 """Format error case response."""
222- return [
225+ return (
223226 False ,
224227 str (message )
225- ]
228+ )
226229
227230 @staticmethod
228231 def _return (message : str ):
229232 """Format success case response."""
230- return [
233+ return (
231234 True ,
232235 message
233- ]
236+ )
234237
235238 @classmethod
236239 async def clean (
237240 cls ,
241+ workflows_mgr : 'WorkflowsManager' ,
238242 workflows : Iterable ['Tokens' ],
239243 args : dict ,
240- workflows_mgr : 'WorkflowsManager' ,
241244 executor : 'Executor' ,
242245 log : 'Logger'
243- ):
246+ ) -> tuple [ bool , str ] :
244247 """Calls `cylc clean`"""
245248 # Convert Schema options → cylc.flow.workflow_files.init_clean opts:
246249 opts = _schema_opts_to_api_opts (args , schema = CleanOptions )
@@ -273,25 +276,50 @@ async def scan(
273276 cls ,
274277 args : dict ,
275278 workflows_mgr : 'WorkflowsManager' ,
276- ):
279+ ) -> tuple [ bool , str ] :
277280 await workflows_mgr .scan ()
278281 return cls ._return ("Scan requested" )
279282
280283 @classmethod
281- async def play (
284+ async def run_command (
282285 cls ,
286+ command : Iterable [str ],
283287 workflows : Iterable [Tokens ],
284288 args : Dict [str , Any ],
285- workflows_mgr : 'WorkflowsManager' ,
286289 log : 'Logger' ,
287- ) -> List [Union [bool , str ]]:
288- """Calls `cylc play`."""
290+ timeout : int ,
291+ success_msg : str = 'Command succeeded' ,
292+ fail_msg : str = 'Command failed' ,
293+ ) -> tuple [bool , str ]:
294+ """Calls the specified Cylc command.
295+
296+ Args:
297+ command:
298+ The Cylc subcommand to run.
299+ e.g ["play"] or ["cat-log", "-m", "p"].
300+ workflows:
301+ The workflows to run this command against.
302+ args:
303+ CLI arguments to be provided to this command.
304+ e.g {'color': 'never'} would result in "--color=never".
305+ log:
306+ The application log, used to record this command invocation.
307+ timeout:
308+ Length of time to wait for the command to complete.
309+ success_msg:
310+ Message to be used in the response if the command succeeds.
311+ fail_msg:
312+ Message to be used in the response if the command fails.
313+
314+ Returns:
315+
316+ """
289317 cylc_version = args .pop ('cylc_version' , None )
290318 results : Dict [str , str ] = {}
291319 failed = False
292320 for tokens in workflows :
293321 try :
294- cmd = _build_cmd (['cylc' , 'play' , '--color=never' ], args )
322+ cmd = _build_cmd (['cylc' , * command , '--color=never' ], args )
295323
296324 if tokens ['user' ] and tokens ['user' ] != getuser ():
297325 return cls ._error (
@@ -322,10 +350,10 @@ async def play(
322350 stderr = PIPE ,
323351 text = True
324352 )
325- ret_code = proc .wait (timeout = 120 )
353+ ret_code = proc .wait (timeout = timeout )
326354
327355 if ret_code :
328- msg = f"Command failed ({ ret_code } ): { cmd_repr } "
356+ msg = f"{ fail_msg } ({ ret_code } ): { cmd_repr } "
329357 out , err = proc .communicate ()
330358 results [wflow ] = err .strip () or out .strip () or msg
331359 log .error (
@@ -335,26 +363,79 @@ async def play(
335363 )
336364 failed = True
337365 else :
338- results [wflow ] = 'started'
366+ results [wflow ] = success_msg
339367
340368 except Exception as exc : # unexpected error
341369 log .exception (exc )
342370 return cls ._error (exc )
343371
344372 if failed :
345373 if len (results ) == 1 :
374+ # all commands failed
346375 return cls ._error (results .popitem ()[1 ])
347- # else log each workflow result on separate lines
376+
377+ # some commands failed
348378 return cls ._error (
379+ # log each workflow result on separate lines
349380 "\n \n " + "\n \n " .join (
350381 f"{ wflow } : { msg } " for wflow , msg in results .items ()
351382 )
352383 )
353384
385+ # all commands succeeded
386+ return cls ._return (f'Workflow(s) { success_msg } ' )
387+
388+ @classmethod
389+ async def play (
390+ cls ,
391+ workflows_mgr : 'WorkflowsManager' ,
392+ workflows : Iterable [Tokens ],
393+ args : dict ,
394+ log ,
395+ ** kwargs ,
396+ ) -> tuple [bool , str ]:
397+ """Calls `cylc play`."""
398+ ret = await cls .run_command (
399+ ('play' ,),
400+ workflows ,
401+ args ,
402+ log ,
403+ cls .START_TIMEOUT ,
404+ ** kwargs ,
405+ success_msg = 'started' ,
406+ )
407+
408+ # trigger a re-scan
409+ await workflows_mgr .scan ()
410+
411+ # return results
412+ return ret
413+
414+ @classmethod
415+ async def validate_reinstall (
416+ cls ,
417+ workflows_mgr : 'WorkflowsManager' ,
418+ workflows : Iterable [Tokens ],
419+ args : dict ,
420+ log ,
421+ ** kwargs ,
422+ ) -> tuple [bool , str ]:
423+ """Calls `cylc validate-reinstall`."""
424+ ret = await cls .run_command (
425+ ('validate-reinstall' , '--yes' ),
426+ workflows ,
427+ args ,
428+ log ,
429+ cls .START_TIMEOUT ,
430+ ** kwargs ,
431+ success_msg = 'reinstalled' ,
432+ )
433+
354434 # trigger a re-scan
355435 await workflows_mgr .scan ()
356- # send a success message
357- return cls ._return ('Workflow(s) started' )
436+
437+ # return results
438+ return ret
358439
359440 @staticmethod
360441 async def enqueue (stream , queue ):
@@ -581,8 +662,7 @@ async def service(
581662 command : str ,
582663 workflows : Iterable ['Tokens' ],
583664 kwargs : Dict [str , Any ],
584- ) -> List [Union [bool , str ]]:
585-
665+ ) -> tuple [bool , str ]:
586666 # GraphQL v3 includes all variables that are set, even if set to null.
587667 kwargs = {
588668 k : v
@@ -592,19 +672,26 @@ async def service(
592672
593673 if command == 'clean' : # noqa: SIM116
594674 return await Services .clean (
675+ self .workflows_mgr ,
595676 workflows ,
596677 kwargs ,
597- self .workflows_mgr ,
598678 log = self .log ,
599679 executor = self .executor
600680 )
601- elif command == 'play' :
681+ elif command == 'play' : # noqa: SIM116
602682 return await Services .play (
683+ self .workflows_mgr ,
603684 workflows ,
604685 kwargs ,
605- self .workflows_mgr ,
606686 log = self .log
607687 )
688+ elif command == 'validate_reinstall' :
689+ return await Services .validate_reinstall (
690+ self .workflows_mgr ,
691+ workflows ,
692+ kwargs ,
693+ log = self .log ,
694+ )
608695 elif command == 'scan' :
609696 return await Services .scan (
610697 kwargs ,
0 commit comments