Skip to content

Commit d93b32d

Browse files
authored
Merge pull request #6935 from cylc/8.5.x-sync
🤖 Merge 8.5.x-sync into master
2 parents 3706f18 + 9352f61 commit d93b32d

File tree

21 files changed

+260
-119
lines changed

21 files changed

+260
-119
lines changed

changes.d/6909.fix.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix potential timeout of the play and vr commands for workflows with contact
2+
files, due to an unnecessary remote process check - now only done if the
3+
workflow fails to respond on the network.

changes.d/6911.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix an issue where trigger could fail to run a task, removing it instead.

cylc/flow/cfgspec/globalcfg.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,23 @@ def default_for(
10441044
10451045
{REPLACES}``[suite servers][run host select]rank``.
10461046
''')
1047+
Conf('process check timeout', VDR.V_INTERVAL, DurationFloat(10),
1048+
desc='''
1049+
Maximum time for the `cylc play` and `cylc vr` commands to wait
1050+
for a remote process that checks if an unresponsive scheduler
1051+
is still alive (for workflows with existing contact files).
1052+
1053+
.. note::
10471054
1055+
This check involves running ``cylc psutil`` on the run host.
1056+
You may need to increase the timeout if shared filesystem
1057+
latency (for example) results in slow Python script startup.
1058+
Increasing the timeout unnecessarily, however, will just
1059+
cause these commands to hang for an unnecessarily long time
1060+
in this circumstance.
1061+
1062+
.. versionadded:: 8.5.2
1063+
''')
10481064
with Conf('host self-identification', desc=f'''
10491065
How Cylc determines and shares the identity of the workflow host.
10501066

cylc/flow/clean.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
from cylc.flow import LOG
4949
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
5050
from cylc.flow.exceptions import (
51-
ContactFileExists,
51+
SchedulerAlive,
5252
CylcError,
5353
InputError,
5454
PlatformError,
@@ -123,7 +123,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None:
123123
return
124124
try:
125125
detect_old_contact_file(id_)
126-
except ContactFileExists as exc:
126+
except SchedulerAlive as exc:
127127
raise ServiceFileError(
128128
f"Cannot clean running workflow {id_}.\n\n{exc}"
129129
) from None

cylc/flow/commands.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
from cylc.flow.task_id import TaskID
9393
from cylc.flow.task_state import (
9494
TASK_STATUS_PREPARING,
95-
TASK_STATUS_WAITING,
9695
TASK_STATUSES_ACTIVE,
9796
)
9897
from cylc.flow.taskdef import generate_graph_children
@@ -749,7 +748,7 @@ def _force_trigger_tasks(
749748
# Remove non group start and final-status group start tasks, and
750749
# trigger them from scratch (so only the TaskDef matters).
751750

752-
# Waiting group start tasks are not removed, but a reload would
751+
# Group start tasks are not removed, but a reload would
753752
# replace them, so using the TaskDef is fine.
754753

755754
if not any(
@@ -766,12 +765,13 @@ def _force_trigger_tasks(
766765
continue
767766

768767
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
768+
# This is a live active group start task
769769
warnings_has_job.append(str(itask))
770770
# Just merge the flows.
771771
schd.pool.merge_flows(itask, flow_nums)
772772

773-
elif itask.state(TASK_STATUS_WAITING):
774-
# This is a waiting active group start task...
773+
else:
774+
# This is a non-live active group start task...
775775
# ... satisfy off-group (i.e. all) prerequisites
776776
itask.state.set_all_task_prerequisites_satisfied()
777777
# ... and satisfy all xtrigger prerequisites.
@@ -783,8 +783,7 @@ def _force_trigger_tasks(
783783

784784
# Trigger group start task.
785785
schd.pool.queue_or_trigger(itask, on_resume)
786-
else:
787-
active_to_remove.append(itask)
786+
788787
else:
789788
active_to_remove.append(itask)
790789

cylc/flow/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ class WorkflowFilesError(CylcError):
136136
bullet = "\n -"
137137

138138

139-
class ContactFileExists(CylcError):
140-
"""Workflow contact file exists."""
139+
class SchedulerAlive(CylcError):
140+
"""Workflow contact file exists and scheduler is alive."""
141141

142142

143143
class FileRemovalError(CylcError):

cylc/flow/network/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
)
4242
from cylc.flow.exceptions import (
4343
ClientTimeout,
44-
ContactFileExists,
44+
SchedulerAlive,
4545
CylcError,
4646
RequestError,
4747
WorkflowStopped,
@@ -158,6 +158,10 @@ def timeout_handler(self) -> None:
158158
WorkflowStopped: if the workflow has already stopped.
159159
CyclError: if the workflow has moved to different host/port.
160160
"""
161+
LOG.warning(
162+
f"{self.workflow} {self.host}:{self.port}:"
163+
f" Connection timed out ({self.timeout} ms)"
164+
)
161165
contact_host, contact_port, *_ = get_location(self.workflow)
162166
if (
163167
contact_host != get_fqdn_by_host(self._orig_host)
@@ -177,7 +181,7 @@ def timeout_handler(self) -> None:
177181
# behind a contact file?
178182
try:
179183
detect_old_contact_file(self.workflow)
180-
except ContactFileExists:
184+
except SchedulerAlive:
181185
# old contact file exists and the workflow process still alive
182186
return
183187
else:

cylc/flow/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ def _configure_contact(self) -> None:
11441144
"""Create contact file."""
11451145
# Make sure another workflow of the same name hasn't started while this
11461146
# one is starting
1147-
# NOTE: raises ContactFileExists if workflow is running
1147+
# NOTE: raises SchedulerAlive if workflow is running
11481148
workflow_files.detect_old_contact_file(self.workflow)
11491149

11501150
# Extract contact data.

cylc/flow/scheduler_cli.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030

3131
from cylc.flow import LOG, __version__
3232
from cylc.flow.exceptions import (
33-
ContactFileExists,
3433
CylcError,
3534
ServiceFileError,
35+
WorkflowStopped,
3636
)
37+
from cylc.flow.scripts.ping import run as cylc_ping
3738
import cylc.flow.flags
3839
from cylc.flow.id import upgrade_legacy_ids
3940
from cylc.flow.host_select import select_workflow_host
@@ -60,7 +61,6 @@
6061
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
6162
from cylc.flow.workflow_files import (
6263
SUITERC_DEPR_MSG,
63-
detect_old_contact_file,
6464
get_workflow_srv_dir,
6565
)
6666
from cylc.flow.terminal import (
@@ -474,13 +474,29 @@ async def _scheduler_cli_3(
474474
async def _resume(workflow_id, options):
475475
"""Resume the workflow if it is already running."""
476476
try:
477-
detect_old_contact_file(workflow_id)
478-
except ContactFileExists as exc:
479-
print(f"Resuming already-running workflow\n\n{exc}")
480477
pclient = WorkflowRuntimeClient(
481478
workflow_id,
482479
timeout=options.comms_timeout,
483480
)
481+
except WorkflowStopped:
482+
# Not running - don't resume.
483+
return
484+
485+
# Is it running? If yes, send resume command.
486+
try:
487+
await cylc_ping(options, workflow_id, pclient)
488+
except WorkflowStopped:
489+
# Not running, restart instead of resume.
490+
# (Orphaned contact file will be removed by cylc_ping client logic).
491+
return
492+
except CylcError as exc:
493+
# PID check failed - abort.
494+
LOG.error(exc)
495+
LOG.critical('Cannot tell if the workflow is running')
496+
sys.exit(1)
497+
else:
498+
# It's running: resume it and exit.
499+
print("Resuming already-running workflow")
484500
mutation_kwargs = {
485501
'request_string': RESUME_MUTATION,
486502
'variables': {
@@ -489,13 +505,6 @@ async def _resume(workflow_id, options):
489505
}
490506
await pclient.async_request('graphql', mutation_kwargs)
491507
sys.exit(0)
492-
except CylcError as exc:
493-
LOG.error(exc)
494-
LOG.critical(
495-
'Cannot tell if the workflow is running'
496-
'\nNote, Cylc 8 cannot restart Cylc 7 workflows.'
497-
)
498-
sys.exit(1)
499508

500509

501510
def _version_check(

cylc/flow/scripts/clean.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async def run(*ids: str, opts: 'Values') -> None:
220220
workflows, multi_mode = await scan(workflows, multi_mode)
221221

222222
if not workflows:
223-
LOG.warning(f"No workflows matching {', '.join(ids)}")
223+
LOG.warning(f"No stopped workflows matching {', '.join(ids)}")
224224
return
225225

226226
workflows.sort()

0 commit comments

Comments
 (0)