Skip to content

Commit b00306f

Browse files
services: validate-reinstall
* Closes #526 * Add basic support for the `cylc vr` command.
1 parent d7d8b04 commit b00306f

File tree

3 files changed

+145
-33
lines changed

3 files changed

+145
-33
lines changed

cylc/uiserver/resolvers.py

Lines changed: 95 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -216,31 +216,34 @@ class Services:
216216
# log file stream lag
217217
CAT_LOG_SLEEP = 1
218218

219+
# command timeout for commands which start schedulers
220+
START_TIMEOUT = 120
221+
219222
@staticmethod
220223
def _error(message: Union[Exception, str]):
221224
"""Format error case response."""
222-
return [
225+
return (
223226
False,
224227
str(message)
225-
]
228+
)
226229

227230
@staticmethod
228231
def _return(message: str):
229232
"""Format success case response."""
230-
return [
233+
return (
231234
True,
232235
message
233-
]
236+
)
234237

235238
@classmethod
236239
async def clean(
237240
cls,
241+
workflows_mgr: 'WorkflowsManager',
238242
workflows: Iterable['Tokens'],
239243
args: dict,
240-
workflows_mgr: 'WorkflowsManager',
241244
executor: 'Executor',
242245
log: 'Logger'
243-
):
246+
) -> tuple[bool, str]:
244247
"""Calls `cylc clean`"""
245248
# Convert Schema options → cylc.flow.workflow_files.init_clean opts:
246249
opts = _schema_opts_to_api_opts(args, schema=CleanOptions)
@@ -273,25 +276,50 @@ async def scan(
273276
cls,
274277
args: dict,
275278
workflows_mgr: 'WorkflowsManager',
276-
):
279+
) -> tuple[bool, str]:
277280
await workflows_mgr.scan()
278281
return cls._return("Scan requested")
279282

280283
@classmethod
281-
async def play(
284+
async def run_command(
282285
cls,
286+
command: Iterable[str],
283287
workflows: Iterable[Tokens],
284288
args: Dict[str, Any],
285-
workflows_mgr: 'WorkflowsManager',
286289
log: 'Logger',
287-
) -> List[Union[bool, str]]:
288-
"""Calls `cylc play`."""
290+
timeout: int,
291+
success_msg: str = 'Command succeeded',
292+
fail_msg: str = 'Command failed',
293+
) -> tuple[bool, str]:
294+
"""Calls the specified Cylc command.
295+
296+
Args:
297+
command:
298+
The Cylc subcommand to run.
299+
e.g ["play"] or ["cat-log", "-m", "p"].
300+
workflows:
301+
The workflows to run this command against.
302+
args:
303+
CLI arguments to be provided to this command.
304+
e.g {'color': 'never'} would result in "--color=never".
305+
log:
306+
The application log, used to record this command invocation.
307+
timeout:
308+
Length of time to wait for the command to complete.
309+
success_msg:
310+
Message to be used in the response if the command succeeds.
311+
fail_msg:
312+
Message to be used in the response if the command fails.
313+
314+
Returns:
315+
316+
"""
289317
cylc_version = args.pop('cylc_version', None)
290318
results: Dict[str, str] = {}
291319
failed = False
292320
for tokens in workflows:
293321
try:
294-
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)
322+
cmd = _build_cmd(['cylc', *command, '--color=never'], args)
295323

296324
if tokens['user'] and tokens['user'] != getuser():
297325
return cls._error(
@@ -322,10 +350,10 @@ async def play(
322350
stderr=PIPE,
323351
text=True
324352
)
325-
ret_code = proc.wait(timeout=120)
353+
ret_code = proc.wait(timeout=timeout)
326354

327355
if ret_code:
328-
msg = f"Command failed ({ret_code}): {cmd_repr}"
356+
msg = f"{fail_msg} ({ret_code}): {cmd_repr}"
329357
out, err = proc.communicate()
330358
results[wflow] = err.strip() or out.strip() or msg
331359
log.error(
@@ -335,26 +363,65 @@ async def play(
335363
)
336364
failed = True
337365
else:
338-
results[wflow] = 'started'
366+
results[wflow] = success_msg
339367

340368
except Exception as exc: # unexpected error
341369
log.exception(exc)
342370
return cls._error(exc)
343371

344372
if failed:
345373
if len(results) == 1:
374+
# all commands failed
346375
return cls._error(results.popitem()[1])
347-
# else log each workflow result on separate lines
376+
377+
# some commands failed
348378
return cls._error(
379+
# log each workflow result on separate lines
349380
"\n\n" + "\n\n".join(
350381
f"{wflow}: {msg}" for wflow, msg in results.items()
351382
)
352383
)
353384

385+
# all commands succeeded
386+
return cls._return(f'Workflow(s) {success_msg}')
387+
388+
@classmethod
389+
async def play(
390+
cls, workflows_mgr: 'WorkflowsManager', *args, **kwargs
391+
) -> tuple[bool, str]:
392+
"""Calls `cylc play`."""
393+
ret = await cls.run_command(
394+
('play',),
395+
*args,
396+
**kwargs,
397+
timeout=cls.START_TIMEOUT,
398+
success_msg='started',
399+
)
400+
401+
# trigger a re-scan
402+
await workflows_mgr.scan()
403+
404+
# return results
405+
return ret
406+
407+
@classmethod
408+
async def validate_reinstall(
409+
cls, workflows_mgr: 'WorkflowsManager', *args, **kwargs
410+
) -> tuple[bool, str]:
411+
"""Calls `cylc validate-reinstall`."""
412+
ret = await cls.run_command(
413+
('validate-reinstall', '--yes'),
414+
*args,
415+
**kwargs,
416+
timeout=cls.START_TIMEOUT,
417+
success_msg='reinstalled',
418+
)
419+
354420
# trigger a re-scan
355421
await workflows_mgr.scan()
356-
# send a success message
357-
return cls._return('Workflow(s) started')
422+
423+
# return results
424+
return ret
358425

359426
@staticmethod
360427
async def enqueue(stream, queue):
@@ -581,8 +648,7 @@ async def service(
581648
command: str,
582649
workflows: Iterable['Tokens'],
583650
kwargs: Dict[str, Any],
584-
) -> List[Union[bool, str]]:
585-
651+
) -> tuple[bool, str]:
586652
# GraphQL v3 includes all variables that are set, even if set to null.
587653
kwargs = {
588654
k: v
@@ -592,19 +658,26 @@ async def service(
592658

593659
if command == 'clean': # noqa: SIM116
594660
return await Services.clean(
661+
self.workflows_mgr,
595662
workflows,
596663
kwargs,
597-
self.workflows_mgr,
598664
log=self.log,
599665
executor=self.executor
600666
)
601667
elif command == 'play':
602668
return await Services.play(
669+
self.workflows_mgr,
603670
workflows,
604671
kwargs,
605-
self.workflows_mgr,
606672
log=self.log
607673
)
674+
elif command == 'validate_reinstall':
675+
return await Services.validate_reinstall(
676+
self.workflows_mgr,
677+
workflows,
678+
kwargs,
679+
log=self.log,
680+
)
608681
elif command == 'scan':
609682
return await Services.scan(
610683
kwargs,

cylc/uiserver/schema.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,44 @@ class Arguments:
241241
result = GenericScalar()
242242

243243

244+
class ValidateReinstall(graphene.Mutation):
245+
class Meta:
246+
description = sstrip('''
247+
Validate, reinstall, then reload or restart as appropriate.
248+
249+
This command updates a workflow to reflect any new changes made in
250+
the workflow source directory since it was installed.
251+
252+
The workflow will be reinstalled, then either:
253+
* Reloaded (if the workflow is running),
254+
* or restarted (if it is stopped).
255+
''')
256+
# Note we have the "resume" mutation for paused workflows.
257+
resolver = partial(mutator, command='validate_reinstall')
258+
259+
class Arguments:
260+
workflows = graphene.List(WorkflowID, required=True)
261+
cylc_version = CylcVersion(
262+
description=sstrip('''
263+
Set the Cylc version that the workflow starts with.
264+
''')
265+
)
266+
set = graphene.List( # noqa: A003 (graphql field name)
267+
graphene.String,
268+
description=sstrip('''
269+
Set the value of a Jinja2 template variable in the workflow
270+
definition. Values should be valid Python literals so strings
271+
must be quoted e.g. `STR="string"`, `INT=43`, `BOOL=True`.
272+
This option can be used multiple times on the command line.
273+
NOTE: these settings persist across workflow restarts, but can
274+
be set again on the `cylc play` command line if they need to be
275+
overridden.
276+
''')
277+
)
278+
279+
result = GenericScalar()
280+
281+
244282
class Clean(graphene.Mutation):
245283
class Meta:
246284
description = sstrip('''
@@ -894,6 +932,7 @@ class Logs(graphene.ObjectType):
894932

895933
class UISMutations(Mutations):
896934
play = _mut_field(Play)
935+
validate_reinstall = _mut_field(ValidateReinstall)
897936
clean = _mut_field(Clean)
898937
scan = _mut_field(Scan)
899938

cylc/uiserver/tests/test_resolvers.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ def test__schema_opts_to_api_opts(schema_opts, schema, expect):
6464
@pytest.mark.parametrize(
6565
'func, message, expect',
6666
[
67-
(services._return, 'Hello.', [True, 'Hello.']),
68-
(services._error, 'Goodbye.', [False, 'Goodbye.'])
67+
(services._return, 'Hello.', (True, 'Hello.')),
68+
(services._error, 'Goodbye.', (False, 'Goodbye.'))
6969
]
7070
)
7171
def test_Services_anciliary_methods(func, message, expect):
@@ -81,31 +81,31 @@ def test_Services_anciliary_methods(func, message, expect):
8181
[Tokens('wflow1'), Tokens('~murray/wflow2')],
8282
{},
8383
{},
84-
[True, "Workflow(s) started"],
84+
(True, "Workflow(s) started"),
8585
{},
8686
id="multiple"
8787
),
8888
pytest.param(
8989
[Tokens('~feynman/wflow1')],
9090
{},
9191
{},
92-
[False, "Cannot start workflows for other users."],
92+
(False, "Cannot start workflows for other users."),
9393
{},
9494
id="other user's wflow"
9595
),
9696
pytest.param(
9797
[Tokens('wflow1')],
9898
{'cylc_version': 'top'},
9999
{'CYLC_VERSION': 'bottom', 'CYLC_ENV_NAME': 'quark'},
100-
[True, "Workflow(s) started"],
100+
(True, "Workflow(s) started"),
101101
{'CYLC_VERSION': 'top'},
102102
id="cylc version overrides env"
103103
),
104104
pytest.param(
105105
[Tokens('wflow1')],
106106
{},
107107
{'CYLC_VERSION': 'charm', 'CYLC_ENV_NAME': 'quark'},
108-
[True, "Workflow(s) started"],
108+
(True, "Workflow(s) started"),
109109
{'CYLC_VERSION': 'charm', 'CYLC_ENV_NAME': 'quark'},
110110
id="cylc env not overriden if no version specified"
111111
),
@@ -144,9 +144,9 @@ async def test_play(
144144
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)
145145

146146
ret = await Services.play(
147+
Mock(spec=WorkflowsManager),
147148
workflows,
148149
{'some': 'opt', **args},
149-
workflows_mgr=Mock(spec=WorkflowsManager),
150150
log=Mock(),
151151
)
152152

@@ -228,9 +228,9 @@ async def test_play_fail(
228228
caplog.set_level(logging.ERROR)
229229

230230
status, message = await Services.play(
231+
Mock(spec=WorkflowsManager),
231232
workflows,
232233
{},
233-
workflows_mgr=Mock(spec=WorkflowsManager),
234234
log=logging.root,
235235
)
236236
assert status is False
@@ -252,9 +252,9 @@ def wait(timeout):
252252
monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen)
253253

254254
ret = await Services.play(
255+
Mock(spec=WorkflowsManager),
255256
[Tokens('wflow1')],
256257
{},
257-
workflows_mgr=Mock(spec=WorkflowsManager),
258258
log=Mock(),
259259
)
260260
assert ret == [
@@ -401,12 +401,12 @@ def bad_clean(*a, **k):
401401
caplog.set_level(logging.ERROR)
402402

403403
ret = Services.clean(
404+
Mock(spec=WorkflowsManager),
404405
[Tokens('wflow1')],
405406
{},
406-
workflows_mgr=Mock(spec=WorkflowsManager),
407407
executor=ThreadPoolExecutor(1),
408408
log=logging.root,
409409
)
410410
err_msg = "CylcError: bad things!!"
411-
assert (await ret) == [False, err_msg]
411+
assert (await ret) == (False, err_msg)
412412
assert err_msg in caplog.text

0 commit comments

Comments
 (0)