Skip to content

Commit f8b6f72

Browse files
authored
Merge pull request #6987 from MetRonnie/est-finish-time
Add estimated finish time to Job schema
2 parents 24f89fc + bfc7ac9 commit f8b6f72

File tree

7 files changed

+171
-83
lines changed

7 files changed

+171
-83
lines changed

changes.d/6987.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Enable estimated finish times for jobs in the UI.

cylc/flow/data_messages.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ message PbJob {
158158
optional string cycle_point = 31; /* filter item */
159159
repeated string messages = 32;
160160
optional PbRuntime runtime = 33;
161+
optional string estimated_finish_time = 34;
161162
}
162163

163164
message PbTask {

cylc/flow/data_messages_pb2.py

Lines changed: 48 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cylc/flow/data_messages_pb2.pyi

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class PbRuntime(_message.Message):
184184
def __init__(self, platform: _Optional[str] = ..., script: _Optional[str] = ..., init_script: _Optional[str] = ..., env_script: _Optional[str] = ..., err_script: _Optional[str] = ..., exit_script: _Optional[str] = ..., pre_script: _Optional[str] = ..., post_script: _Optional[str] = ..., work_sub_dir: _Optional[str] = ..., execution_polling_intervals: _Optional[str] = ..., execution_retry_delays: _Optional[str] = ..., execution_time_limit: _Optional[str] = ..., submission_polling_intervals: _Optional[str] = ..., submission_retry_delays: _Optional[str] = ..., directives: _Optional[str] = ..., environment: _Optional[str] = ..., outputs: _Optional[str] = ..., completion: _Optional[str] = ..., run_mode: _Optional[str] = ...) -> None: ...
185185

186186
class PbJob(_message.Message):
187-
__slots__ = ("stamp", "id", "submit_num", "state", "task_proxy", "submitted_time", "started_time", "finished_time", "job_id", "job_runner_name", "execution_time_limit", "platform", "job_log_dir", "name", "cycle_point", "messages", "runtime")
187+
__slots__ = ("stamp", "id", "submit_num", "state", "task_proxy", "submitted_time", "started_time", "finished_time", "job_id", "job_runner_name", "execution_time_limit", "platform", "job_log_dir", "name", "cycle_point", "messages", "runtime", "estimated_finish_time")
188188
STAMP_FIELD_NUMBER: _ClassVar[int]
189189
ID_FIELD_NUMBER: _ClassVar[int]
190190
SUBMIT_NUM_FIELD_NUMBER: _ClassVar[int]
@@ -202,6 +202,7 @@ class PbJob(_message.Message):
202202
CYCLE_POINT_FIELD_NUMBER: _ClassVar[int]
203203
MESSAGES_FIELD_NUMBER: _ClassVar[int]
204204
RUNTIME_FIELD_NUMBER: _ClassVar[int]
205+
ESTIMATED_FINISH_TIME_FIELD_NUMBER: _ClassVar[int]
205206
stamp: str
206207
id: str
207208
submit_num: int
@@ -219,7 +220,8 @@ class PbJob(_message.Message):
219220
cycle_point: str
220221
messages: _containers.RepeatedScalarFieldContainer[str]
221222
runtime: PbRuntime
222-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., submit_num: _Optional[int] = ..., state: _Optional[str] = ..., task_proxy: _Optional[str] = ..., submitted_time: _Optional[str] = ..., started_time: _Optional[str] = ..., finished_time: _Optional[str] = ..., job_id: _Optional[str] = ..., job_runner_name: _Optional[str] = ..., execution_time_limit: _Optional[float] = ..., platform: _Optional[str] = ..., job_log_dir: _Optional[str] = ..., name: _Optional[str] = ..., cycle_point: _Optional[str] = ..., messages: _Optional[_Iterable[str]] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ...) -> None: ...
223+
estimated_finish_time: str
224+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., submit_num: _Optional[int] = ..., state: _Optional[str] = ..., task_proxy: _Optional[str] = ..., submitted_time: _Optional[str] = ..., started_time: _Optional[str] = ..., finished_time: _Optional[str] = ..., job_id: _Optional[str] = ..., job_runner_name: _Optional[str] = ..., execution_time_limit: _Optional[float] = ..., platform: _Optional[str] = ..., job_log_dir: _Optional[str] = ..., name: _Optional[str] = ..., cycle_point: _Optional[str] = ..., messages: _Optional[_Iterable[str]] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., estimated_finish_time: _Optional[str] = ...) -> None: ...
223225

224226
class PbTask(_message.Message):
225227
__slots__ = ("stamp", "id", "name", "meta", "mean_elapsed_time", "depth", "proxies", "namespace", "parents", "first_parent", "runtime")

cylc/flow/data_store_mgr.py

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,30 @@
5454
5555
"""
5656

57+
from collections import (
58+
Counter,
59+
deque,
60+
)
5761
from contextlib import suppress
58-
from collections import Counter, deque
5962
from copy import deepcopy
6063
import json
6164
from time import time
6265
from typing import (
66+
TYPE_CHECKING,
6367
Any,
6468
Dict,
65-
Optional,
6669
List,
70+
Literal,
71+
Optional,
6772
Set,
68-
TYPE_CHECKING,
6973
Tuple,
7074
)
7175
import zlib
7276

73-
from cylc.flow import __version__ as CYLC_VERSION, LOG
77+
from cylc.flow import (
78+
LOG,
79+
__version__ as CYLC_VERSION,
80+
)
7481
from cylc.flow.cycling.loader import get_point
7582
from cylc.flow.data_messages_pb2 import (
7683
AllDeltas,
@@ -98,37 +105,46 @@
98105
from cylc.flow.parsec.util import (
99106
listjoin,
100107
pdeepcopy,
101-
poverride
108+
poverride,
102109
)
103110
from cylc.flow.run_modes import RunMode
104-
from cylc.flow.workflow_status import (
105-
get_workflow_status,
106-
get_workflow_status_msg,
111+
from cylc.flow.task_job_logs import (
112+
JOB_LOG_OPTS,
113+
get_task_job_log,
107114
)
108-
from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log
109115
from cylc.flow.task_proxy import TaskProxy
110116
from cylc.flow.task_state import (
111-
TASK_STATUS_WAITING,
112-
TASK_STATUS_SUBMITTED,
113-
TASK_STATUS_SUBMIT_FAILED,
117+
TASK_STATUS_FAILED,
114118
TASK_STATUS_RUNNING,
119+
TASK_STATUS_SUBMIT_FAILED,
120+
TASK_STATUS_SUBMITTED,
115121
TASK_STATUS_SUCCEEDED,
116-
TASK_STATUS_FAILED,
117-
TASK_STATUSES_ORDERED
122+
TASK_STATUS_WAITING,
123+
TASK_STATUSES_FINAL,
124+
TASK_STATUSES_ORDERED,
118125
)
119126
from cylc.flow.task_state_prop import extract_group_state
120-
from cylc.flow.taskdef import generate_graph_parents, generate_graph_children
121-
from cylc.flow.task_state import TASK_STATUSES_FINAL
127+
from cylc.flow.taskdef import (
128+
generate_graph_children,
129+
generate_graph_parents,
130+
)
122131
from cylc.flow.util import (
132+
deserialise_set,
123133
serialise_set,
124-
deserialise_set
125134
)
126135
from cylc.flow.wallclock import (
127136
TIME_ZONE_LOCAL_INFO,
128137
TIME_ZONE_UTC_INFO,
129-
get_utc_mode
138+
get_time_string_from_unix_time as time2str,
139+
get_unix_time_from_time_string as str2time,
140+
get_utc_mode,
141+
)
142+
from cylc.flow.workflow_status import (
143+
get_workflow_status,
144+
get_workflow_status_msg,
130145
)
131146

147+
132148
if TYPE_CHECKING:
133149
from cylc.flow.cycling import PointBase
134150
from cylc.flow.flow_mgr import FlowNums
@@ -2874,19 +2890,23 @@ def delta_job_state(
28742890
def delta_job_time(
28752891
self,
28762892
itask: 'TaskProxy',
2877-
event_key: str,
2878-
time_str: Optional[str] = None,
2893+
event_key: Literal['submitted', 'started', 'finished'],
2894+
time_str: str,
28792895
) -> None:
2880-
"""Set an event time in job pool object.
2881-
2882-
Set values of both event_key + '_time' and event_key + '_time_string'.
2883-
"""
2896+
"""Set an event time in job pool object."""
28842897
j_id, job = self.store_node_fetcher(itask.job_tokens)
28852898
if not job:
28862899
return
28872900
j_delta = PbJob(stamp=f'{j_id}@{time()}')
28882901
time_attr = f'{event_key}_time'
28892902
setbuff(j_delta, time_attr, time_str)
2903+
if (
2904+
event_key == 'started'
2905+
and (run_time := task_mean_elapsed_time(itask.tdef)) is not None
2906+
):
2907+
j_delta.estimated_finish_time = (
2908+
time2str(str2time(time_str) + run_time) if time_str else ''
2909+
)
28902910
self.updated[JOBS].setdefault(
28912911
j_id,
28922912
PbJob(id=j_id)

cylc/flow/network/schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,13 @@ class Meta:
928928
finished_time = String(
929929
description='The time this job finished running (if it has yet).',
930930
)
931+
estimated_finish_time = String(
932+
description=(
933+
"The estimated time this job will finish, if applicable. "
934+
"This is based on the task's mean run time, or if not available, "
935+
"the execution time limit."
936+
),
937+
)
931938
job_id = ID(
932939
description='The ID of this job in the job runner it was submitted to.'
933940
)

tests/integration/test_data_store_mgr.py

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from logging import INFO
1817
import logging
18+
from logging import INFO
1919
from typing import (
2020
Iterable,
2121
List,
@@ -25,12 +25,13 @@
2525

2626
import pytest
2727

28+
from cylc.flow import LOG
2829
from cylc.flow.commands import (
30+
force_trigger_tasks,
2931
run_cmd,
30-
force_trigger_tasks
3132
)
32-
from cylc.flow import LOG
3333
from cylc.flow.data_messages_pb2 import (
34+
PbJob,
3435
PbPrerequisite,
3536
PbTaskProxy,
3637
)
@@ -42,7 +43,10 @@
4243
TASKS,
4344
WORKFLOW,
4445
)
45-
from cylc.flow.id import TaskTokens, Tokens
46+
from cylc.flow.id import (
47+
TaskTokens,
48+
Tokens,
49+
)
4650
from cylc.flow.network.log_stream_handler import ProtobufStreamHandler
4751
from cylc.flow.scheduler import Scheduler
4852
from cylc.flow.task_events_mgr import TaskEventsManager
@@ -51,6 +55,7 @@
5155
TASK_OUTPUT_SUBMITTED,
5256
TASK_OUTPUT_SUCCEEDED,
5357
)
58+
from cylc.flow.task_proxy import TaskProxy
5459
from cylc.flow.task_state import (
5560
TASK_STATUS_FAILED,
5661
TASK_STATUS_PREPARING,
@@ -127,6 +132,11 @@ def get_pb_prereqs(schd: 'Scheduler') -> 'List[PbPrerequisite]':
127132
]
128133

129134

135+
def get_pb_job(schd: Scheduler, itask: TaskProxy) -> PbJob:
136+
"""Get the protobuf job for a given task from the data store."""
137+
return schd.data_store_mgr.data[schd.id][JOBS][itask.job_tokens.id]
138+
139+
130140
@pytest.fixture(scope='module')
131141
async def mod_harness(mod_flow, mod_scheduler, mod_start):
132142
flow_def = {
@@ -834,9 +844,6 @@ async def test_log_events(one: Scheduler, start):
834844

835845
async def test_no_backwards_job_state_change(one: Scheduler, start):
836846
"""It should not allow backwards job state changes."""
837-
def get_job_state(itask):
838-
return one.data_store_mgr.data[one.id][JOBS][itask.job_tokens.id].state
839-
840847
async with start(one):
841848
itask = one.pool.get_tasks()[0]
842849
itask.state_reset(TASK_STATUS_PREPARING)
@@ -845,9 +852,59 @@ def get_job_state(itask):
845852

846853
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_STARTED)
847854
await one.update_data_structure()
848-
assert get_job_state(itask) == TASK_STATUS_RUNNING
855+
assert get_pb_job(one, itask).state == TASK_STATUS_RUNNING
849856

850857
# Simulate late arrival of "submitted" message
851858
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_SUBMITTED)
852859
await one.update_data_structure()
853-
assert get_job_state(itask) == TASK_STATUS_RUNNING
860+
assert get_pb_job(one, itask).state == TASK_STATUS_RUNNING
861+
862+
863+
async def test_job_estimated_finish_time(one_conf, flow, scheduler, start):
864+
"""It should set estimated_finish_time on job elements along with
865+
started_time."""
866+
wid = flow({
867+
**one_conf,
868+
'scheduler': {'UTC mode': True},
869+
'runtime': {
870+
'one': {'execution time limit': 'PT2M'},
871+
},
872+
})
873+
schd: Scheduler = scheduler(wid)
874+
date = '2081-07-02T'
875+
876+
async def start_job(itask: TaskProxy, start_time: str):
877+
if not schd.pool.get_task(itask.point, itask.tdef.name):
878+
schd.pool.add_to_pool(itask)
879+
await schd.update_data_structure()
880+
itask.state_reset(TASK_STATUS_PREPARING)
881+
itask.submit_num += 1
882+
itask.jobs = []
883+
schd.task_events_mgr.process_message(
884+
itask, INFO, TASK_OUTPUT_SUBMITTED # submit time irrelevant
885+
)
886+
await schd.update_data_structure()
887+
schd.task_events_mgr.process_message(
888+
itask, INFO, TASK_OUTPUT_STARTED, f'{date}{start_time}'
889+
)
890+
await schd.update_data_structure()
891+
892+
async with start(schd):
893+
itask = schd.pool.get_tasks()[0]
894+
await start_job(itask, '06:00:00Z')
895+
# 1st job: estimate based on execution time limit:
896+
assert (
897+
get_pb_job(schd, itask).estimated_finish_time
898+
== f'{date}06:02:00Z'
899+
)
900+
901+
# Finish this job and start a new one:
902+
schd.task_events_mgr.process_message(
903+
itask, INFO, TASK_OUTPUT_SUCCEEDED, f'{date}06:00:40Z'
904+
)
905+
await start_job(itask, '06:01:00Z')
906+
# >=2nd job: estimate based on mean of previous jobs:
907+
assert (
908+
get_pb_job(schd, itask).estimated_finish_time
909+
== f'{date}06:01:40Z'
910+
)

0 commit comments

Comments
 (0)