Skip to content

Commit a109d2a

Browse files
committed
Merge branch Master into 8.4.x-sync
2 parents 35909c0 + c984044 commit a109d2a

File tree

4 files changed

+115
-6
lines changed

4 files changed

+115
-6
lines changed

changes.d/6711.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Stop broadcast allowing `[remote]host` if `platform` set, or vice-versa

cylc/flow/broadcast_mgr.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride
3434
from cylc.flow.parsec.validate import BroadcastConfigValidator
3535
from cylc.flow.run_modes import WORKFLOW_ONLY_MODES
36+
from cylc.flow.platforms import (
37+
fail_if_platform_and_host_conflict,
38+
PlatformLookupError,
39+
)
3640

3741

3842
if TYPE_CHECKING:
@@ -64,10 +68,11 @@ class BroadcastMgr:
6468

6569
REC_SECTION = re.compile(r"\[([^\]]+)\]")
6670

67-
def __init__(self, workflow_db_mgr, data_store_mgr, run_mode):
68-
self.workflow_run_mode = run_mode
69-
self.workflow_db_mgr = workflow_db_mgr
70-
self.data_store_mgr = data_store_mgr
71+
def __init__(self, schd):
72+
self.schd = schd
73+
self.workflow_run_mode = schd.get_run_mode()
74+
self.workflow_db_mgr = schd.workflow_db_mgr
75+
self.data_store_mgr = schd.data_store_mgr
7176
self.linearized_ancestors = {}
7277
self.broadcasts = {}
7378
self.ext_triggers = {} # Can use collections.Counter in future
@@ -320,6 +325,23 @@ def put_broadcast(
320325
if namespace not in self.linearized_ancestors:
321326
bad_namespaces.append(namespace)
322327
elif not bad_point:
328+
# Check broadcast against config and against
329+
# existing broadcasts:
330+
newconfig = pdeepcopy(self.schd.config.get_config(
331+
['runtime', namespace]
332+
))
333+
poverride(
334+
newconfig,
335+
self.broadcasts.get(point_string, {})
336+
.get(namespace, {})
337+
)
338+
if self.bc_mixes_old_and_new_platform_settings(
339+
newconfig,
340+
namespace,
341+
coerced_setting,
342+
):
343+
continue
344+
323345
if namespace not in self.broadcasts[point_string]:
324346
self.broadcasts[point_string][namespace] = {}
325347

@@ -348,6 +370,23 @@ def put_broadcast(
348370
self.data_store_mgr.delta_broadcast()
349371
return modified_settings, bad_options
350372

373+
@staticmethod
374+
def bc_mixes_old_and_new_platform_settings(
375+
task_config, namespace, coerced_setting
376+
):
377+
"""Check for combination of old ([remote]host) and new (platform)
378+
settings in the task config as it will be after merger.
379+
"""
380+
task_config.update(coerced_setting)
381+
try:
382+
fail_if_platform_and_host_conflict(
383+
task_config, namespace
384+
)
385+
return False
386+
except PlatformLookupError as exc:
387+
LOG.error('Cannot apply broadcast:\n' + '\n '.join(exc.args))
388+
return True
389+
351390
@staticmethod
352391
def _cancel_keys_in_prunes(prunes, cancel_keys):
353392
"""Is cancel_keys pruned?"""

cylc/flow/scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,7 @@ async def initialise(self):
402402
403403
"""
404404
self.data_store_mgr = DataStoreMgr(self)
405-
self.broadcast_mgr = BroadcastMgr(
406-
self.workflow_db_mgr, self.data_store_mgr, self.get_run_mode())
405+
self.broadcast_mgr = BroadcastMgr(self)
407406

408407
self.server = WorkflowRuntimeServer(self)
409408

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
"""Tests for Broadcast Manager."""
18+
19+
20+
async def test_reject_valid_broadcast_is_remote_clash_with_config(
21+
one_conf, flow, start, scheduler, log_filter
22+
):
23+
"""`put_broadcast` gracefully rejects invalid broadcast:
24+
25+
Existing config = [task][remote]host = foo
26+
Broadcast = [task]platform = bar
27+
28+
https://github.com/cylc/cylc-flow/issues/6693
29+
"""
30+
one_conf.update({'runtime': {'root': {'platform': 'foo'}}})
31+
wid = flow(one_conf)
32+
schd = scheduler(wid)
33+
async with start(schd):
34+
bc_mgr = schd.broadcast_mgr
35+
bc_mgr.put_broadcast(
36+
point_strings=['1'],
37+
namespaces=['one'],
38+
settings=[{'remote': {'host': 'bar'}}]
39+
)
40+
assert log_filter(contains='Cannot apply broadcast')
41+
assert bc_mgr.broadcasts == {'1': {}}
42+
43+
44+
async def test_reject_valid_broadcast_is_remote_clash_with_broadcast(
45+
one_conf, flow, start, scheduler, log_filter
46+
):
47+
"""`put_broadcast` gracefully rejects invalid broadcast:
48+
49+
Existing Broadcast = [task][remote]host = foo
50+
New Broadcast = [task]platform = bar
51+
52+
https://github.com/cylc/cylc-flow/pull/6711/files#r2033457964
53+
"""
54+
schd = scheduler(flow(one_conf))
55+
async with start(schd):
56+
bc_mgr = schd.broadcast_mgr
57+
bc_mgr.put_broadcast(
58+
point_strings=['1'],
59+
namespaces=['one'],
60+
settings=[{'remote': {'host': 'bar'}}]
61+
)
62+
# this should not be allowed, if it is the scheduler will crash
63+
# when unpaused:
64+
bc_mgr.put_broadcast(
65+
point_strings=['1'],
66+
namespaces=['one'],
67+
settings=[{'platform': 'foo'}]
68+
)
69+
assert log_filter(contains='Cannot apply broadcast')
70+
assert bc_mgr.broadcasts == {'1': {'one': {'remote': {'host': 'bar'}}}}

0 commit comments

Comments
 (0)