Skip to content

Commit e51537e

Browse files
post_configure: ensure asyncio tasks created by plugins are awaited
* Await any background tasks started by a plugin before continuing. * Addresses cylc/cylc-rose#274 * Centralise the plugin loading/running/reporting logic. * Fix the installation test for back-compat-mode.
1 parent 24caa24 commit e51537e

21 files changed

+475
-201
lines changed

cylc/flow/async_util.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"""Utilities for use with asynchronous code."""
1717

1818
import asyncio
19+
from contextlib import asynccontextmanager
1920
from functools import partial, wraps
2021
from inspect import signature
2122
import os
@@ -478,3 +479,48 @@ async def _fcn(*args, executor=None, **kwargs):
478479

479480

480481
async_listdir = make_async(os.listdir)
482+
483+
484+
@asynccontextmanager
485+
async def async_block():
486+
"""Ensure all tasks started within the context are awaited when it closes.
487+
488+
Normally, you would await a task e.g:
489+
490+
await three()
491+
492+
If it's possible to await the task, do that, however, this isn't always an
493+
option. This interface exists is to help patch over issues where async code
494+
(one) calls sync code (two) which calls async code (three) e.g:
495+
496+
async def one():
497+
two()
498+
499+
def two():
500+
# this breaks - event loop is already running
501+
asyncio.get_event_loop().run_until_complete(three())
502+
503+
async def three():
504+
await asyncio.sleep(1)
505+
506+
This code will error because you can't nest asyncio (without nest-asyncio)
507+
which means you can schedule tasks the tasks in "two", but you can't await
508+
them.
509+
510+
def two():
511+
# this works, but it doesn't wait for three() to complete
512+
asyncio.create_task(three())
513+
514+
This interface allows you to await the tasks
515+
516+
async def one()
517+
async with async_block():
518+
two()
519+
# any tasks two() started will have been awaited by now
520+
"""
521+
# make a list of all tasks running before we enter the context manager
522+
tasks_before = asyncio.all_tasks()
523+
# run the user code
524+
yield
525+
# await any new tasks
526+
await asyncio.gather(*(asyncio.all_tasks() - tasks_before))

cylc/flow/parsec/fileparse.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@
3737
import sys
3838
import typing as t
3939

40-
from cylc.flow import __version__, iter_entry_points
40+
from cylc.flow import __version__
4141
from cylc.flow import LOG
42-
from cylc.flow.exceptions import PluginError
4342
from cylc.flow.parsec.exceptions import (
4443
FileParseError, ParsecError, TemplateVarLanguageClash
4544
)
46-
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
4745
from cylc.flow.parsec.include import inline
46+
from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
47+
from cylc.flow.plugins import run_plugins
4848
from cylc.flow.parsec.util import itemstr
4949
from cylc.flow.templatevars import get_template_vars_from_db
5050
from cylc.flow.workflow_files import (
@@ -283,23 +283,11 @@ def process_plugins(fpath: 'Union[str, Path]', opts: 'Values'):
283283
return extra_vars
284284

285285
# Run entry point pre_configure items, trying to merge values with each.:
286-
for entry_point in iter_entry_points(
287-
'cylc.pre_configure'
286+
for entry_point, plugin_result in run_plugins(
287+
'cylc.pre_configure',
288+
srcdir=fpath.parent,
289+
opts=opts,
288290
):
289-
try:
290-
# If you want it to work on sourcedirs you need to get the options
291-
# to here.
292-
plugin_result = entry_point.load()(
293-
srcdir=fpath.parent, opts=opts
294-
)
295-
except Exception as exc:
296-
# NOTE: except Exception (purposefully vague)
297-
# this is to separate plugin from core Cylc errors
298-
raise PluginError(
299-
'cylc.pre_configure',
300-
entry_point.name,
301-
exc
302-
) from None
303291
for section in ['env', TEMPLATE_VARIABLES]:
304292
if section in plugin_result and plugin_result[section] is not None:
305293
# Raise error if multiple plugins try to update the same keys.

cylc/flow/plugins.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
#!/usr/bin/env python3
2+
3+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
4+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
#
11+
# This program is distributed in the hope that it will be useful,
12+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
# GNU General Public License for more details.
15+
#
16+
# You should have received a copy of the GNU General Public License
17+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
19+
"""Common functionality related to the loading and calling of plugins."""
20+
21+
import os
22+
from time import time
23+
24+
from cylc.flow import LOG, iter_entry_points
25+
from cylc.flow.async_util import async_block as _async_block
26+
from cylc.flow.exceptions import PluginError
27+
import cylc.flow.flags
28+
29+
30+
async def run_plugins_async(
31+
plugin_namespace,
32+
*args,
33+
async_block=False,
34+
**kwargs
35+
):
36+
"""Run all installed plugins for the given namespace.
37+
38+
This runs plugins in series, yielding the results one by one.
39+
40+
Args:
41+
plugin_namespace:
42+
The entry point namespace for the plugins to run,
43+
e.g. "cylc.post_install".
44+
args:
45+
Any arguments to call plugins with.
46+
async_block:
47+
If True, this will wait for any async tasks started by the plugin
48+
to complete before moving on to the next plugin.
49+
kwargs:
50+
Any kwargs to call plugins with.
51+
52+
Yields:
53+
(entry_point, plugin_result)
54+
55+
See https://github.com/cylc/cylc-rose/issues/274
56+
57+
"""
58+
startpoint = os.getcwd()
59+
for entry_point in iter_entry_points(plugin_namespace):
60+
try:
61+
# measure the import+run time for the plugin (debug mode)
62+
start_time = time()
63+
64+
# load the plugin
65+
meth = entry_point.load()
66+
67+
# run the plugin
68+
if async_block:
69+
# wait for any async tasks started by the plugin to complete
70+
async with _async_block():
71+
plugin_result = meth(*args, **kwargs)
72+
else:
73+
plugin_result = meth(*args, **kwargs)
74+
75+
# log the import+run time (debug mode)
76+
if cylc.flow.flags.verbosity > 1:
77+
LOG.debug(
78+
f'ran {entry_point.name} in {time() - start_time:0.05f}s'
79+
)
80+
81+
# yield the result to the caller
82+
yield entry_point, plugin_result
83+
84+
except Exception as exc: # NOTE: except Exception (purposefully vague)
85+
_raise_plugin_exception(exc, plugin_namespace, entry_point)
86+
87+
finally:
88+
# ensure the plugin does not change the CWD
89+
os.chdir(startpoint)
90+
91+
92+
def run_plugins(plugin_namespace, *args, **kwargs):
93+
"""Run all installed plugins for the given namespace.
94+
95+
This runs plugins in series, yielding the results one by one.
96+
97+
Warning:
98+
Use run_plugins_async for "cylc.post_install" plugins.
99+
See https://github.com/cylc/cylc-rose/issues/274
100+
101+
Args:
102+
plugin_namespace:
103+
The entry point namespace for the plugins to run,
104+
e.g. "cylc.post_install".
105+
args:
106+
Any arguments to call plugins with.
107+
kwargs:
108+
Any kwargs to call plugins with.
109+
110+
Yields:
111+
(entry_point, plugin_result)
112+
113+
"""
114+
startpoint = os.getcwd()
115+
for entry_point in iter_entry_points(plugin_namespace):
116+
try:
117+
# measure the import+run time for the plugin (debug mode)
118+
start_time = time()
119+
120+
# load the plugin
121+
meth = entry_point.load()
122+
123+
# run the plugin
124+
plugin_result = meth(*args, **kwargs)
125+
126+
# log the import+run time (debug mode)
127+
if cylc.flow.flags.verbosity > 1:
128+
LOG.debug(
129+
f'ran {entry_point.name} in {time() - start_time:0.05f}s'
130+
)
131+
132+
# yield the result to the caller
133+
yield entry_point, plugin_result
134+
135+
except Exception as exc: # NOTE: except Exception (purposefully vague)
136+
_raise_plugin_exception(exc, plugin_namespace, entry_point)
137+
138+
finally:
139+
# ensure the plugin does not change the CWD
140+
os.chdir(startpoint)
141+
142+
143+
def _raise_plugin_exception(exc, plugin_namespace, entry_point):
144+
"""Re-Raise an exception captured from a plugin."""
145+
if cylc.flow.flags.verbosity > 1:
146+
# raise the full exception in debug mode
147+
# (this helps plugin developers locate the error in their code)
148+
raise
149+
# raise a user-friendly exception
150+
raise PluginError(
151+
plugin_namespace,
152+
entry_point.name,
153+
exc
154+
) from None

cylc/flow/scheduler_cli.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,12 @@ async def scheduler_cli(
369369
functionality.
370370
371371
"""
372+
if options.starttask:
373+
options.starttask = upgrade_legacy_ids(
374+
*options.starttask,
375+
relative=True,
376+
)
377+
372378
# Parse workflow name but delay Cylc 7 suite.rc deprecation warning
373379
# until after the start-up splash is printed.
374380
# TODO: singleton
@@ -651,14 +657,4 @@ async def _run(scheduler: Scheduler) -> int:
651657
@cli_function(get_option_parser)
652658
def play(parser: COP, options: 'Values', id_: str):
653659
"""Implement cylc play."""
654-
return _play(parser, options, id_)
655-
656-
657-
def _play(parser: COP, options: 'Values', id_: str):
658-
"""Allows compound scripts to import play, but supply their own COP."""
659-
if options.starttask:
660-
options.starttask = upgrade_legacy_ids(
661-
*options.starttask,
662-
relative=True,
663-
)
664660
return asyncio.run(scheduler_cli(options, id_))

0 commit comments

Comments
 (0)