Skip to content

Commit c984044

Browse files
authored
Merge pull request #6711 from wxtim/fix.6693.prevent-illegal-platform-host-bc
Prevent illegal platform host broadcast
2 parents 53a268d + 91af276 commit c984044

File tree

4 files changed

+115
-5
lines changed

4 files changed

+115
-5
lines changed

changes.d/6711.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Stop broadcast allowing `[remote]host` if `platform` set, or vice-versa

cylc/flow/broadcast_mgr.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
from cylc.flow.exceptions import PointParsingError
3333
from cylc.flow.parsec.util import listjoin, pdeepcopy, poverride
3434
from cylc.flow.parsec.validate import BroadcastConfigValidator
35+
from cylc.flow.platforms import (
36+
fail_if_platform_and_host_conflict,
37+
PlatformLookupError,
38+
)
39+
3540

3641
if TYPE_CHECKING:
3742
from cylc.flow.id import Tokens
@@ -62,9 +67,10 @@ class BroadcastMgr:
6267

6368
REC_SECTION = re.compile(r"\[([^\]]+)\]")
6469

65-
def __init__(self, workflow_db_mgr, data_store_mgr):
66-
self.workflow_db_mgr = workflow_db_mgr
67-
self.data_store_mgr = data_store_mgr
70+
def __init__(self, schd):
71+
self.schd = schd
72+
self.workflow_db_mgr = schd.workflow_db_mgr
73+
self.data_store_mgr = schd.data_store_mgr
6874
self.linearized_ancestors = {}
6975
self.broadcasts = {}
7076
self.ext_triggers = {} # Can use collections.Counter in future
@@ -304,6 +310,23 @@ def put_broadcast(
304310
if namespace not in self.linearized_ancestors:
305311
bad_namespaces.append(namespace)
306312
elif not bad_point:
313+
# Check broadcast against config and against
314+
# existing broadcasts:
315+
newconfig = pdeepcopy(self.schd.config.get_config(
316+
['runtime', namespace]
317+
))
318+
poverride(
319+
newconfig,
320+
self.broadcasts.get(point_string, {})
321+
.get(namespace, {})
322+
)
323+
if self.bc_mixes_old_and_new_platform_settings(
324+
newconfig,
325+
namespace,
326+
coerced_setting,
327+
):
328+
continue
329+
307330
if namespace not in self.broadcasts[point_string]:
308331
self.broadcasts[point_string][namespace] = {}
309332

@@ -332,6 +355,23 @@ def put_broadcast(
332355
self.data_store_mgr.delta_broadcast()
333356
return modified_settings, bad_options
334357

358+
@staticmethod
359+
def bc_mixes_old_and_new_platform_settings(
360+
task_config, namespace, coerced_setting
361+
):
362+
"""Check for combination of old ([remote]host) and new (platform)
363+
settings in the task config as it will be after merger.
364+
"""
365+
task_config.update(coerced_setting)
366+
try:
367+
fail_if_platform_and_host_conflict(
368+
task_config, namespace
369+
)
370+
return False
371+
except PlatformLookupError as exc:
372+
LOG.error('Cannot apply broadcast:\n' + '\n '.join(exc.args))
373+
return True
374+
335375
@staticmethod
336376
def _cancel_keys_in_prunes(prunes, cancel_keys):
337377
"""Is cancel_keys pruned?"""

cylc/flow/scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,7 @@ async def initialise(self):
402402
403403
"""
404404
self.data_store_mgr = DataStoreMgr(self)
405-
self.broadcast_mgr = BroadcastMgr(
406-
self.workflow_db_mgr, self.data_store_mgr)
405+
self.broadcast_mgr = BroadcastMgr(self)
407406

408407
self.server = WorkflowRuntimeServer(self)
409408

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
2+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
#
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU General Public License for more details.
13+
#
14+
# You should have received a copy of the GNU General Public License
15+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
"""Tests for Broadcast Manager."""
18+
19+
20+
async def test_reject_valid_broadcast_is_remote_clash_with_config(
21+
one_conf, flow, start, scheduler, log_filter
22+
):
23+
"""`put_broadcast` gracefully rejects invalid broadcast:
24+
25+
Existing config = [task][remote]host = foo
26+
Broadcast = [task]platform = bar
27+
28+
https://github.com/cylc/cylc-flow/issues/6693
29+
"""
30+
one_conf.update({'runtime': {'root': {'platform': 'foo'}}})
31+
wid = flow(one_conf)
32+
schd = scheduler(wid)
33+
async with start(schd):
34+
bc_mgr = schd.broadcast_mgr
35+
bc_mgr.put_broadcast(
36+
point_strings=['1'],
37+
namespaces=['one'],
38+
settings=[{'remote': {'host': 'bar'}}]
39+
)
40+
assert log_filter(contains='Cannot apply broadcast')
41+
assert bc_mgr.broadcasts == {'1': {}}
42+
43+
44+
async def test_reject_valid_broadcast_is_remote_clash_with_broadcast(
45+
one_conf, flow, start, scheduler, log_filter
46+
):
47+
"""`put_broadcast` gracefully rejects invalid broadcast:
48+
49+
Existing Broadcast = [task][remote]host = foo
50+
New Broadcast = [task]platform = bar
51+
52+
https://github.com/cylc/cylc-flow/pull/6711/files#r2033457964
53+
"""
54+
schd = scheduler(flow(one_conf))
55+
async with start(schd):
56+
bc_mgr = schd.broadcast_mgr
57+
bc_mgr.put_broadcast(
58+
point_strings=['1'],
59+
namespaces=['one'],
60+
settings=[{'remote': {'host': 'bar'}}]
61+
)
62+
# this should not be allowed, if it is the scheduler will crash
63+
# when unpaused:
64+
bc_mgr.put_broadcast(
65+
point_strings=['1'],
66+
namespaces=['one'],
67+
settings=[{'platform': 'foo'}]
68+
)
69+
assert log_filter(contains='Cannot apply broadcast')
70+
assert bc_mgr.broadcasts == {'1': {'one': {'remote': {'host': 'bar'}}}}

0 commit comments

Comments
 (0)