Skip to content

Commit 7cfd1ee

Browse files
Merge pull request #6564 from MetRonnie/tidy
Fix potentially unbound variables
2 parents 6bab70e + a27f1fd commit 7cfd1ee

File tree

2 files changed

+105
-110
lines changed

2 files changed

+105
-110
lines changed

cylc/flow/platforms.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ def get_platform(
125125
Returns:
126126
platform: A platform definition dictionary. Uses either
127127
get_platform() or platform_name_from_job_info(), but to the
128-
user these look the same.
128+
user these look the same. This will be None if the platform
129+
definition uses a subshell.
129130
130131
Raises:
131132
NoPlatformsError:
@@ -142,7 +143,7 @@ def get_platform(
142143

143144
# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
144145
# https://github.com/cylc/cylc-flow/pull/4975
145-
elif 'platform' in task_conf and task_conf['platform']:
146+
if 'platform' in task_conf and task_conf['platform']:
146147
# Check whether task has conflicting Cylc7 items.
147148
fail_if_platform_and_host_conflict(task_conf, task_name)
148149

@@ -155,24 +156,22 @@ def get_platform(
155156
# If platform name exists and doesn't clash with Cylc7 Config items.
156157
return platform_from_name(task_conf['platform'], bad_hosts=bad_hosts)
157158

158-
else:
159-
if get_platform_deprecated_settings(task_conf) == []:
160-
# No deprecated items; platform is localhost
161-
return platform_from_name()
162-
else:
163-
# Need to calculate platform
164-
# NOTE: Do NOT use .get() on OrderedDictWithDefaults - see above
165-
task_job_section = task_conf['job'] if 'job' in task_conf else {}
166-
task_remote_section = (
167-
task_conf['remote'] if 'remote' in task_conf else {})
168-
return platform_from_name(
169-
platform_name_from_job_info(
170-
glbl_cfg().get(['platforms']),
171-
task_job_section,
172-
task_remote_section
173-
),
174-
bad_hosts=bad_hosts
175-
)
159+
if get_platform_deprecated_settings(task_conf) == []:
160+
# No deprecated items; platform is localhost
161+
return platform_from_name()
162+
163+
# Need to calculate platform
164+
# NOTE: Do NOT use .get() on OrderedDictWithDefaults - see above
165+
task_job_section = task_conf['job'] if 'job' in task_conf else {}
166+
task_remote_section = task_conf['remote'] if 'remote' in task_conf else {}
167+
return platform_from_name(
168+
platform_name_from_job_info(
169+
glbl_cfg().get(['platforms']),
170+
task_job_section,
171+
task_remote_section,
172+
),
173+
bad_hosts=bad_hosts,
174+
)
176175

177176

178177
def platform_from_name(

cylc/flow/task_job_mgr.py

Lines changed: 86 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
Optional,
4545
Tuple,
4646
Union,
47+
cast,
4748
)
4849

4950
from cylc.flow import LOG
@@ -123,6 +124,7 @@
123124

124125

125126
if TYPE_CHECKING:
127+
from cylc.flow.data_store_mgr import DataStoreMgr
126128
from cylc.flow.task_proxy import TaskProxy
127129
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
128130

@@ -159,8 +161,8 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
159161
self.workflow = workflow
160162
self.proc_pool = proc_pool
161163
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
162-
self.task_events_mgr = task_events_mgr
163-
self.data_store_mgr = data_store_mgr
164+
self.task_events_mgr: TaskEventsManager = task_events_mgr
165+
self.data_store_mgr: DataStoreMgr = data_store_mgr
164166
self.job_file_writer = JobFileWriter()
165167
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
166168
self.bad_hosts = bad_hosts
@@ -306,76 +308,10 @@ def submit_livelike_task_jobs(
306308
done_tasks = bad_tasks
307309

308310
for _, itasks in sorted(auth_itasks.items()):
309-
# Find the first platform where >1 host has not been tried and
310-
# found to be unreachable.
311-
# If there are no good hosts for a task then the task submit-fails.
312-
for itask in itasks:
313-
# If there are any hosts left for this platform which we
314-
# have not previously failed to contact with a 255 error.
315-
if any(
316-
host not in self.task_remote_mgr.bad_hosts
317-
for host in itask.platform['hosts']
318-
):
319-
platform = itask.platform
320-
out_of_hosts = False
321-
break
322-
else:
323-
# If there are no hosts left for this platform.
324-
# See if you can get another platform from the group or
325-
# else set task to submit failed.
326-
327-
# Get another platform, if task config platform is a group
328-
use_next_platform_in_group = False
329-
bc_mgr = self.task_events_mgr.broadcast_mgr
330-
rtconf = bc_mgr.get_updated_rtconfig(itask)
331-
try:
332-
platform = get_platform(
333-
rtconf,
334-
bad_hosts=self.bad_hosts
335-
)
336-
except PlatformLookupError:
337-
pass
338-
else:
339-
# If were able to select a new platform;
340-
if platform and platform != itask.platform:
341-
use_next_platform_in_group = True
342-
343-
if use_next_platform_in_group:
344-
# store the previous platform's hosts so that when
345-
# we record a submit fail we can clear all hosts
346-
# from all platforms from bad_hosts.
347-
for host_ in itask.platform['hosts']:
348-
self.bad_hosts_to_clear.add(host_)
349-
itask.platform = platform
350-
out_of_hosts = False
351-
break
352-
else:
353-
itask.waiting_on_job_prep = False
354-
itask.local_job_file_path = None
355-
self._prep_submit_task_job_error(
356-
itask, '(remote init)', ''
357-
)
358-
# Now that all hosts on all platforms in platform
359-
# group selected in task config are exhausted we clear
360-
# bad_hosts for all the hosts we have
361-
# tried for this platform or group.
362-
self.bad_hosts -= set(itask.platform['hosts'])
363-
self.bad_hosts -= self.bad_hosts_to_clear
364-
self.bad_hosts_to_clear.clear()
365-
LOG.critical(
366-
PlatformError(
367-
(
368-
f'{PlatformError.MSG_INIT}'
369-
' (no hosts were reachable)'
370-
),
371-
itask.platform['name'],
372-
)
373-
)
374-
out_of_hosts = True
375-
done_tasks.append(itask)
376-
377-
if out_of_hosts is True:
311+
platform = self._get_platform_with_good_host(itasks, done_tasks)
312+
if not platform:
378313
continue
314+
379315
install_target = get_install_target_from_platform(platform)
380316
ri_map = self.task_remote_mgr.remote_init_map
381317

@@ -451,12 +387,9 @@ def submit_livelike_task_jobs(
451387
)
452388
continue
453389

454-
if (
455-
self.job_runner_mgr.is_job_local_to_host(
456-
itask.summary['job_runner_name']
457-
) and
458-
not is_remote_platform(platform)
459-
):
390+
if self.job_runner_mgr.is_job_local_to_host(
391+
itask.summary['job_runner_name']
392+
) and not is_remote_platform(platform):
460393
host = get_host()
461394

462395
done_tasks.extend(itasks)
@@ -608,6 +541,66 @@ def submit_livelike_task_jobs(
608541
)
609542
return done_tasks
610543

544+
def _get_platform_with_good_host(
545+
self, itasks: 'Iterable[TaskProxy]', done_tasks: 'List[TaskProxy]'
546+
) -> Optional[dict]:
547+
"""Find the first platform with at least one host that has not been
548+
tried and found to be unreachable.
549+
550+
If there are no good hosts for a task then the task submit-fails.
551+
552+
Returns:
553+
The platform with a good host, or None if no such platform is found
554+
"""
555+
for itask in itasks:
556+
# If there are any hosts left for this platform which we
557+
# have not previously failed to contact with a 255 error.
558+
if any(
559+
host not in self.task_remote_mgr.bad_hosts
560+
for host in itask.platform['hosts']
561+
):
562+
return itask.platform
563+
564+
# If there are no hosts left for this platform.
565+
# See if you can get another platform from the group or
566+
# else set task to submit failed.
567+
platform: Optional[dict] = None
568+
rtconf = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
569+
itask
570+
)
571+
with suppress(PlatformLookupError):
572+
platform = get_platform(rtconf, bad_hosts=self.bad_hosts)
573+
574+
# If were able to select a new platform;
575+
if platform and platform != itask.platform:
576+
# store the previous platform's hosts so that when
577+
# we record a submit fail we can clear all hosts
578+
# from all platforms from bad_hosts.
579+
for host_ in itask.platform['hosts']:
580+
self.bad_hosts_to_clear.add(host_)
581+
itask.platform = platform
582+
return platform
583+
584+
itask.waiting_on_job_prep = False
585+
itask.local_job_file_path = None
586+
self._prep_submit_task_job_error(itask, '(remote init)', '')
587+
# Now that all hosts on all platforms in platform
588+
# group selected in task config are exhausted we
589+
# clear bad_hosts for all the hosts we have
590+
# tried for this platform or group.
591+
self.bad_hosts -= set(itask.platform['hosts'])
592+
self.bad_hosts -= self.bad_hosts_to_clear
593+
self.bad_hosts_to_clear.clear()
594+
LOG.critical(
595+
PlatformError(
596+
f"{PlatformError.MSG_INIT} (no hosts were reachable)",
597+
itask.platform['name'],
598+
)
599+
)
600+
done_tasks.append(itask)
601+
602+
return None
603+
611604
def _create_job_log_path(self, itask):
612605
"""Create job log directory for a task job, etc.
613606
@@ -954,6 +947,7 @@ def _run_job_cmd(
954947
f'Unable to run command {cmd_key}: Unable to find'
955948
f' platform {platform_name} with accessible hosts.'
956949
)
950+
continue
957951
except PlatformLookupError:
958952
LOG.error(
959953
f'Unable to run command {cmd_key}: Unable to find'
@@ -1238,31 +1232,33 @@ def _prep_submit_task_job(
12381232
rtconfig['remote']['host'] = host_n
12391233

12401234
try:
1241-
platform = get_platform(
1242-
rtconfig, itask.tdef.name, bad_hosts=self.bad_hosts
1235+
platform = cast(
1236+
# We know this is not None because eval_platform() or
1237+
# eval_host() called above ensure it is set or else we
1238+
# return early if the subshell is still evaluating.
1239+
'dict',
1240+
get_platform(
1241+
rtconfig, itask.tdef.name, bad_hosts=self.bad_hosts
1242+
),
12431243
)
12441244
except PlatformLookupError as exc:
12451245
itask.waiting_on_job_prep = False
12461246
itask.summary['platforms_used'][itask.submit_num] = ''
12471247
# Retry delays, needed for the try_num
12481248
self._create_job_log_path(itask)
1249+
msg = '(platform not defined)'
12491250
if isinstance(exc, NoPlatformsError):
1251+
msg = '(no platforms available)'
12501252
# Clear all hosts from all platforms in group from
12511253
# bad_hosts:
12521254
self.bad_hosts -= exc.hosts_consumed
12531255
self._set_retry_timers(itask, rtconfig)
1254-
self._prep_submit_task_job_error(
1255-
itask, '(no platforms available)', exc
1256-
)
1257-
return False
1258-
self._prep_submit_task_job_error(
1259-
itask, '(platform not defined)', exc
1260-
)
1256+
self._prep_submit_task_job_error(itask, msg, exc)
12611257
return False
1262-
else:
1263-
itask.platform = platform
1264-
# Retry delays, needed for the try_num
1265-
self._set_retry_timers(itask, rtconfig)
1258+
1259+
itask.platform = platform
1260+
# Retry delays, needed for the try_num
1261+
self._set_retry_timers(itask, rtconfig)
12661262

12671263
try:
12681264
job_conf = self._prep_submit_task_job_impl(

0 commit comments

Comments
 (0)