Skip to content

Commit bb64e9f

Browse files
authored
Merge pull request #6200 from MetRonnie/stall-status
Fix stall status bug
2 parents 6ff547e + a0ee209 commit bb64e9f

File tree

8 files changed

+134
-93
lines changed

8 files changed

+134
-93
lines changed

changes.d/6200.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused

cylc/flow/commands.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,19 @@ async def stop(
189189
schd.workflow_db_mgr.put_workflow_stop_cycle_point(
190190
schd.options.stopcp
191191
)
192+
schd._update_workflow_state()
192193
elif clock_time is not None:
193194
# schedule shutdown after wallclock time passes provided time
194195
parser = TimePointParser()
195196
schd.set_stop_clock(
196197
int(parser.parse(clock_time).seconds_since_unix_epoch)
197198
)
199+
schd._update_workflow_state()
198200
elif task is not None:
199201
# schedule shutdown after task succeeds
200202
task_id = TaskID.get_standardised_taskid(task)
201203
schd.pool.set_stop_task(task_id)
204+
schd._update_workflow_state()
202205
else:
203206
# immediate shutdown
204207
with suppress(KeyError):
@@ -229,6 +232,7 @@ async def release_hold_point(schd: 'Scheduler'):
229232
yield
230233
LOG.info("Releasing all tasks and removing hold cycle point.")
231234
schd.pool.release_hold_point()
235+
schd._update_workflow_state()
232236

233237

234238
@_command('resume')
@@ -287,6 +291,7 @@ async def set_hold_point(schd: 'Scheduler', point: str):
287291
"All tasks after this point will be held."
288292
)
289293
schd.pool.set_hold_point(cycle_point)
294+
schd._update_workflow_state()
290295

291296

292297
@_command('pause')

cylc/flow/data_store_mgr.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@
8585
pdeepcopy,
8686
poverride
8787
)
88-
from cylc.flow.workflow_status import get_workflow_status
88+
from cylc.flow.workflow_status import (
89+
get_workflow_status,
90+
get_workflow_status_msg,
91+
)
8992
from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log
9093
from cylc.flow.task_proxy import TaskProxy
9194
from cylc.flow.task_state import (
@@ -2174,8 +2177,8 @@ def update_workflow(self, reloaded=False):
21742177
w_delta.latest_state_tasks[state].task_proxies[:] = tp_queue
21752178

21762179
# Set status & msg if changed.
2177-
status, status_msg = map(
2178-
str, get_workflow_status(self.schd))
2180+
status = get_workflow_status(self.schd).value
2181+
status_msg = get_workflow_status_msg(self.schd)
21792182
if w_data.status != status or w_data.status_msg != status_msg:
21802183
w_delta.status = status
21812184
w_delta.status_msg = status_msg

cylc/flow/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2000,7 +2000,7 @@ def update_data_store(self):
20002000
20012001
Call this method whenever the Scheduler's state has changed in a way
20022002
that requires a data store update.
2003-
See cylc.flow.workflow_status.get_workflow_status() for a
2003+
See cylc.flow.workflow_status.get_workflow_status_msg() for a
20042004
(non-exhaustive?) list of properties that if changed will require
20052005
this update.
20062006

cylc/flow/tui/util.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -384,19 +384,6 @@ def get_task_status_summary(flow):
384384
]
385385

386386

387-
def get_workflow_status_str(flow):
388-
"""Return a workflow status string for the header.
389-
390-
Arguments:
391-
flow (dict):
392-
GraphQL JSON response for this workflow.
393-
394-
Returns:
395-
list - Text list for the urwid.Text widget.
396-
397-
"""
398-
399-
400387
def _render_user(node, data):
401388
return f'~{ME}'
402389

cylc/flow/wallclock.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"""Wall clock related utilities."""
1717

1818
from calendar import timegm
19-
from datetime import datetime, timedelta
19+
from datetime import datetime, timedelta, timezone
2020

2121
from metomi.isodatetime.timezone import (
2222
get_local_time_zone_format, get_local_time_zone, TimeZoneFormatMode)
@@ -209,7 +209,7 @@ def get_time_string_from_unix_time(unix_time, display_sub_seconds=False,
209209
to use as the time zone designator.
210210
211211
"""
212-
date_time = datetime.utcfromtimestamp(unix_time)
212+
date_time = datetime.fromtimestamp(unix_time, timezone.utc)
213213
return get_time_string(date_time,
214214
display_sub_seconds=display_sub_seconds,
215215
use_basic_format=use_basic_format,

cylc/flow/workflow_status.py

Lines changed: 57 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@
1616
"""Workflow status constants."""
1717

1818
from enum import Enum
19-
from typing import Tuple, TYPE_CHECKING
19+
from typing import TYPE_CHECKING, Optional, Union
2020

21+
from cylc.flow.cycling.loader import get_point
22+
from cylc.flow.id import tokenise
2123
from cylc.flow.wallclock import get_time_string_from_unix_time as time2str
2224

2325
if TYPE_CHECKING:
2426
from optparse import Values
27+
28+
from cylc.flow.cycling import PointBase
2529
from cylc.flow.scheduler import Scheduler
30+
from cylc.flow.task_pool import TaskPool
2631

2732
# Keys for identify API call
2833
KEY_GROUP = "group"
@@ -143,62 +148,60 @@ class AutoRestartMode(Enum):
143148
"""Workflow will stop immediately but *not* attempt to restart."""
144149

145150

146-
def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
147-
"""Return the status of the provided workflow.
148-
149-
This should be a short, concise description of the workflow state.
150-
151-
Args:
152-
schd: The running workflow
153-
154-
Returns:
155-
tuple - (state, state_msg)
156-
157-
state:
158-
The WorkflowState.
159-
state_msg:
160-
Text describing the current state (may be an empty string).
151+
def get_workflow_status(schd: 'Scheduler') -> WorkflowStatus:
152+
"""Return the status of the provided workflow."""
153+
if schd.stop_mode is not None:
154+
return WorkflowStatus.STOPPING
155+
if schd.is_paused or schd.reload_pending:
156+
return WorkflowStatus.PAUSED
157+
return WorkflowStatus.RUNNING
161158

162-
"""
163-
status = WorkflowStatus.RUNNING
164-
status_msg = ''
165159

160+
def get_workflow_status_msg(schd: 'Scheduler') -> str:
161+
"""Return a short, concise status message for the provided workflow."""
166162
if schd.stop_mode is not None:
167-
status = WorkflowStatus.STOPPING
168-
status_msg = f'stopping: {schd.stop_mode.explain()}'
169-
elif schd.reload_pending:
170-
status = WorkflowStatus.PAUSED
171-
status_msg = f'reloading: {schd.reload_pending}'
172-
elif schd.is_stalled:
173-
status_msg = 'stalled'
174-
elif schd.is_paused:
175-
status = WorkflowStatus.PAUSED
176-
status_msg = 'paused'
177-
elif schd.pool.hold_point:
178-
status_msg = (
179-
WORKFLOW_STATUS_RUNNING_TO_HOLD %
180-
schd.pool.hold_point)
181-
elif schd.pool.stop_point:
182-
status_msg = (
183-
WORKFLOW_STATUS_RUNNING_TO_STOP %
184-
schd.pool.stop_point)
185-
elif schd.stop_clock_time is not None:
186-
status_msg = (
187-
WORKFLOW_STATUS_RUNNING_TO_STOP %
188-
time2str(schd.stop_clock_time))
189-
elif schd.pool.stop_task_id:
190-
status_msg = (
191-
WORKFLOW_STATUS_RUNNING_TO_STOP %
192-
schd.pool.stop_task_id)
193-
elif schd.config and schd.config.final_point:
194-
status_msg = (
195-
WORKFLOW_STATUS_RUNNING_TO_STOP %
196-
schd.config.final_point)
197-
else:
198-
# fallback - running indefinitely
199-
status_msg = 'running'
200-
201-
return (status.value, status_msg)
163+
return f'stopping: {schd.stop_mode.explain()}'
164+
if schd.reload_pending:
165+
return f'reloading: {schd.reload_pending}'
166+
if schd.is_stalled:
167+
if schd.is_paused:
168+
return 'stalled and paused'
169+
return 'stalled'
170+
if schd.is_paused:
171+
return 'paused'
172+
if schd.stop_clock_time is not None:
173+
return WORKFLOW_STATUS_RUNNING_TO_STOP % time2str(
174+
schd.stop_clock_time
175+
)
176+
stop_point_msg = _get_earliest_stop_point_status_msg(schd.pool)
177+
if stop_point_msg is not None:
178+
return stop_point_msg
179+
if schd.config and schd.config.final_point:
180+
return WORKFLOW_STATUS_RUNNING_TO_STOP % schd.config.final_point
181+
# fallback - running indefinitely
182+
return 'running'
183+
184+
185+
def _get_earliest_stop_point_status_msg(pool: 'TaskPool') -> Optional[str]:
186+
"""Return the status message for the earliest stop point in the pool,
187+
if any."""
188+
template = WORKFLOW_STATUS_RUNNING_TO_STOP
189+
prop: Union[PointBase, str, None] = pool.stop_task_id
190+
min_point: Optional[PointBase] = get_point(
191+
tokenise(pool.stop_task_id, relative=True)['cycle']
192+
if pool.stop_task_id else None
193+
)
194+
for point, tmpl in (
195+
(pool.stop_point, WORKFLOW_STATUS_RUNNING_TO_STOP),
196+
(pool.hold_point, WORKFLOW_STATUS_RUNNING_TO_HOLD)
197+
):
198+
if point is not None and (min_point is None or point < min_point):
199+
template = tmpl
200+
min_point = point
201+
prop = point
202+
if prop is None:
203+
return None
204+
return template % prop
202205

203206

204207
class RunMode:

tests/unit/test_workflow_status.py

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717
from types import SimpleNamespace
1818

1919
import pytest
20+
from metomi.isodatetime.data import TimePoint
2021

22+
from cylc.flow.cycling.integer import IntegerPoint
2123
from cylc.flow.workflow_status import (
22-
StopMode,
23-
WorkflowStatus,
2424
WORKFLOW_STATUS_RUNNING_TO_HOLD,
2525
WORKFLOW_STATUS_RUNNING_TO_STOP,
26+
StopMode,
27+
WorkflowStatus,
2628
get_workflow_status,
29+
get_workflow_status_msg,
2730
)
2831

32+
STOP_TIME = TimePoint(year=2006).to_local_time_zone()
33+
2934

3035
def schd(
3136
final_point=None,
@@ -50,6 +55,7 @@ def schd(
5055
stop_task_id=stop_task_id,
5156
),
5257
config=SimpleNamespace(final_point=final_point),
58+
options=SimpleNamespace(utc_mode=True),
5359
)
5460

5561

@@ -73,29 +79,29 @@ def schd(
7379
'stopping: waiting for active jobs to complete'
7480
),
7581
(
76-
{'hold_point': 'point'},
82+
{'hold_point': 2},
7783
WorkflowStatus.RUNNING,
78-
WORKFLOW_STATUS_RUNNING_TO_HOLD % 'point'
84+
WORKFLOW_STATUS_RUNNING_TO_HOLD % 2
7985
),
8086
(
81-
{'stop_point': 'point'},
87+
{'stop_point': 4},
8288
WorkflowStatus.RUNNING,
83-
WORKFLOW_STATUS_RUNNING_TO_STOP % 'point'
89+
WORKFLOW_STATUS_RUNNING_TO_STOP % 4
8490
),
8591
(
86-
{'stop_clock_time': 1234},
92+
{'stop_clock_time': int(STOP_TIME.seconds_since_unix_epoch)},
8793
WorkflowStatus.RUNNING,
88-
WORKFLOW_STATUS_RUNNING_TO_STOP % ''
94+
WORKFLOW_STATUS_RUNNING_TO_STOP % str(STOP_TIME)
8995
),
9096
(
91-
{'stop_task_id': 'foo'},
97+
{'stop_task_id': '6/foo'},
9298
WorkflowStatus.RUNNING,
93-
WORKFLOW_STATUS_RUNNING_TO_STOP % 'foo'
99+
WORKFLOW_STATUS_RUNNING_TO_STOP % '6/foo'
94100
),
95101
(
96-
{'final_point': 'point'},
102+
{'final_point': 8},
97103
WorkflowStatus.RUNNING,
98-
WORKFLOW_STATUS_RUNNING_TO_STOP % 'point'
104+
WORKFLOW_STATUS_RUNNING_TO_STOP % 8
99105
),
100106
(
101107
{'is_stalled': True},
@@ -112,22 +118,58 @@ def schd(
112118
(
113119
# stopping should trump stalled, paused & running
114120
{
115-
'stop_mode': StopMode.AUTO,
121+
'stop_mode': StopMode.REQUEST_NOW,
116122
'is_stalled': True,
117123
'is_paused': True
118124
},
119125
WorkflowStatus.STOPPING,
120-
'stopping'
126+
'stopping: shutting down'
121127
),
122128
(
123-
# stalled should trump paused & running
124129
{'is_stalled': True, 'is_paused': True},
130+
WorkflowStatus.PAUSED,
131+
'stalled and paused',
132+
),
133+
(
134+
# earliest of stop point, hold point and stop task id
135+
{
136+
'stop_point': IntegerPoint(4),
137+
'hold_point': IntegerPoint(2),
138+
'stop_task_id': '6/foo',
139+
},
125140
WorkflowStatus.RUNNING,
126-
'stalled'
141+
WORKFLOW_STATUS_RUNNING_TO_HOLD % 2,
142+
),
143+
(
144+
{
145+
'stop_point': IntegerPoint(11),
146+
'hold_point': IntegerPoint(15),
147+
'stop_task_id': '9/bar',
148+
},
149+
WorkflowStatus.RUNNING,
150+
WORKFLOW_STATUS_RUNNING_TO_STOP % '9/bar',
151+
),
152+
(
153+
{
154+
'stop_point': IntegerPoint(3),
155+
'hold_point': IntegerPoint(3),
156+
},
157+
WorkflowStatus.RUNNING,
158+
WORKFLOW_STATUS_RUNNING_TO_STOP % 3,
159+
),
160+
(
161+
# stop point trumps final point
162+
{
163+
'stop_point': IntegerPoint(1),
164+
'final_point': IntegerPoint(2),
165+
},
166+
WorkflowStatus.RUNNING,
167+
WORKFLOW_STATUS_RUNNING_TO_STOP % 1,
127168
),
128169
]
129170
)
130-
def test_get_workflow_status(kwargs, state, message):
131-
state_, message_ = get_workflow_status(schd(**kwargs))
132-
assert state_ == state.value
133-
assert message in message_
171+
def test_get_workflow_status(kwargs, state, message, set_cycling_type):
172+
set_cycling_type()
173+
scheduler = schd(**kwargs)
174+
assert get_workflow_status(scheduler) == state
175+
assert get_workflow_status_msg(scheduler) == message

0 commit comments

Comments
 (0)