Skip to content

Commit bfc7ac9

Browse files
committed
Add estimated finish time to Job schema
1 parent 46e44eb commit bfc7ac9

File tree

7 files changed

+171
-83
lines changed

7 files changed

+171
-83
lines changed

changes.d/6987.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Enable estimated finish times for jobs in the UI.

cylc/flow/data_messages.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ message PbJob {
158158
optional string cycle_point = 31; /* filter item */
159159
repeated string messages = 32;
160160
optional PbRuntime runtime = 33;
161+
optional string estimated_finish_time = 34;
161162
}
162163

163164
message PbTask {

cylc/flow/data_messages_pb2.py

Lines changed: 48 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cylc/flow/data_messages_pb2.pyi

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class PbRuntime(_message.Message):
184184
def __init__(self, platform: _Optional[str] = ..., script: _Optional[str] = ..., init_script: _Optional[str] = ..., env_script: _Optional[str] = ..., err_script: _Optional[str] = ..., exit_script: _Optional[str] = ..., pre_script: _Optional[str] = ..., post_script: _Optional[str] = ..., work_sub_dir: _Optional[str] = ..., execution_polling_intervals: _Optional[str] = ..., execution_retry_delays: _Optional[str] = ..., execution_time_limit: _Optional[str] = ..., submission_polling_intervals: _Optional[str] = ..., submission_retry_delays: _Optional[str] = ..., directives: _Optional[str] = ..., environment: _Optional[str] = ..., outputs: _Optional[str] = ..., completion: _Optional[str] = ..., run_mode: _Optional[str] = ...) -> None: ...
185185

186186
class PbJob(_message.Message):
187-
__slots__ = ("stamp", "id", "submit_num", "state", "task_proxy", "submitted_time", "started_time", "finished_time", "job_id", "job_runner_name", "execution_time_limit", "platform", "job_log_dir", "name", "cycle_point", "messages", "runtime")
187+
__slots__ = ("stamp", "id", "submit_num", "state", "task_proxy", "submitted_time", "started_time", "finished_time", "job_id", "job_runner_name", "execution_time_limit", "platform", "job_log_dir", "name", "cycle_point", "messages", "runtime", "estimated_finish_time")
188188
STAMP_FIELD_NUMBER: _ClassVar[int]
189189
ID_FIELD_NUMBER: _ClassVar[int]
190190
SUBMIT_NUM_FIELD_NUMBER: _ClassVar[int]
@@ -202,6 +202,7 @@ class PbJob(_message.Message):
202202
CYCLE_POINT_FIELD_NUMBER: _ClassVar[int]
203203
MESSAGES_FIELD_NUMBER: _ClassVar[int]
204204
RUNTIME_FIELD_NUMBER: _ClassVar[int]
205+
ESTIMATED_FINISH_TIME_FIELD_NUMBER: _ClassVar[int]
205206
stamp: str
206207
id: str
207208
submit_num: int
@@ -219,7 +220,8 @@ class PbJob(_message.Message):
219220
cycle_point: str
220221
messages: _containers.RepeatedScalarFieldContainer[str]
221222
runtime: PbRuntime
222-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., submit_num: _Optional[int] = ..., state: _Optional[str] = ..., task_proxy: _Optional[str] = ..., submitted_time: _Optional[str] = ..., started_time: _Optional[str] = ..., finished_time: _Optional[str] = ..., job_id: _Optional[str] = ..., job_runner_name: _Optional[str] = ..., execution_time_limit: _Optional[float] = ..., platform: _Optional[str] = ..., job_log_dir: _Optional[str] = ..., name: _Optional[str] = ..., cycle_point: _Optional[str] = ..., messages: _Optional[_Iterable[str]] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ...) -> None: ...
223+
estimated_finish_time: str
224+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., submit_num: _Optional[int] = ..., state: _Optional[str] = ..., task_proxy: _Optional[str] = ..., submitted_time: _Optional[str] = ..., started_time: _Optional[str] = ..., finished_time: _Optional[str] = ..., job_id: _Optional[str] = ..., job_runner_name: _Optional[str] = ..., execution_time_limit: _Optional[float] = ..., platform: _Optional[str] = ..., job_log_dir: _Optional[str] = ..., name: _Optional[str] = ..., cycle_point: _Optional[str] = ..., messages: _Optional[_Iterable[str]] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., estimated_finish_time: _Optional[str] = ...) -> None: ...
223225

224226
class PbTask(_message.Message):
225227
__slots__ = ("stamp", "id", "name", "meta", "mean_elapsed_time", "depth", "proxies", "namespace", "parents", "first_parent", "runtime")

cylc/flow/data_store_mgr.py

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,30 @@
5454
5555
"""
5656

57+
from collections import (
58+
Counter,
59+
deque,
60+
)
5761
from contextlib import suppress
58-
from collections import Counter, deque
5962
from copy import deepcopy
6063
import json
6164
from time import time
6265
from typing import (
66+
TYPE_CHECKING,
6367
Any,
6468
Dict,
65-
Optional,
6669
List,
70+
Literal,
71+
Optional,
6772
Set,
68-
TYPE_CHECKING,
6973
Tuple,
7074
)
7175
import zlib
7276

73-
from cylc.flow import __version__ as CYLC_VERSION, LOG
77+
from cylc.flow import (
78+
LOG,
79+
__version__ as CYLC_VERSION,
80+
)
7481
from cylc.flow.cycling.loader import get_point
7582
from cylc.flow.data_messages_pb2 import (
7683
AllDeltas,
@@ -98,37 +105,46 @@
98105
from cylc.flow.parsec.util import (
99106
listjoin,
100107
pdeepcopy,
101-
poverride
108+
poverride,
102109
)
103110
from cylc.flow.run_modes import RunMode
104-
from cylc.flow.workflow_status import (
105-
get_workflow_status,
106-
get_workflow_status_msg,
111+
from cylc.flow.task_job_logs import (
112+
JOB_LOG_OPTS,
113+
get_task_job_log,
107114
)
108-
from cylc.flow.task_job_logs import JOB_LOG_OPTS, get_task_job_log
109115
from cylc.flow.task_proxy import TaskProxy
110116
from cylc.flow.task_state import (
111-
TASK_STATUS_WAITING,
112-
TASK_STATUS_SUBMITTED,
113-
TASK_STATUS_SUBMIT_FAILED,
117+
TASK_STATUS_FAILED,
114118
TASK_STATUS_RUNNING,
119+
TASK_STATUS_SUBMIT_FAILED,
120+
TASK_STATUS_SUBMITTED,
115121
TASK_STATUS_SUCCEEDED,
116-
TASK_STATUS_FAILED,
117-
TASK_STATUSES_ORDERED
122+
TASK_STATUS_WAITING,
123+
TASK_STATUSES_FINAL,
124+
TASK_STATUSES_ORDERED,
118125
)
119126
from cylc.flow.task_state_prop import extract_group_state
120-
from cylc.flow.taskdef import generate_graph_parents, generate_graph_children
121-
from cylc.flow.task_state import TASK_STATUSES_FINAL
127+
from cylc.flow.taskdef import (
128+
generate_graph_children,
129+
generate_graph_parents,
130+
)
122131
from cylc.flow.util import (
132+
deserialise_set,
123133
serialise_set,
124-
deserialise_set
125134
)
126135
from cylc.flow.wallclock import (
127136
TIME_ZONE_LOCAL_INFO,
128137
TIME_ZONE_UTC_INFO,
129-
get_utc_mode
138+
get_time_string_from_unix_time as time2str,
139+
get_unix_time_from_time_string as str2time,
140+
get_utc_mode,
141+
)
142+
from cylc.flow.workflow_status import (
143+
get_workflow_status,
144+
get_workflow_status_msg,
130145
)
131146

147+
132148
if TYPE_CHECKING:
133149
from cylc.flow.cycling import PointBase
134150
from cylc.flow.flow_mgr import FlowNums
@@ -2809,19 +2825,23 @@ def delta_job_state(
28092825
def delta_job_time(
28102826
self,
28112827
itask: 'TaskProxy',
2812-
event_key: str,
2813-
time_str: Optional[str] = None,
2828+
event_key: Literal['submitted', 'started', 'finished'],
2829+
time_str: str,
28142830
) -> None:
2815-
"""Set an event time in job pool object.
2816-
2817-
Set values of both event_key + '_time' and event_key + '_time_string'.
2818-
"""
2831+
"""Set an event time in job pool object."""
28192832
j_id, job = self.store_node_fetcher(itask.job_tokens)
28202833
if not job:
28212834
return
28222835
j_delta = PbJob(stamp=f'{j_id}@{time()}')
28232836
time_attr = f'{event_key}_time'
28242837
setbuff(j_delta, time_attr, time_str)
2838+
if (
2839+
event_key == 'started'
2840+
and (run_time := task_mean_elapsed_time(itask.tdef)) is not None
2841+
):
2842+
j_delta.estimated_finish_time = (
2843+
time2str(str2time(time_str) + run_time) if time_str else ''
2844+
)
28252845
self.updated[JOBS].setdefault(
28262846
j_id,
28272847
PbJob(id=j_id)

cylc/flow/network/schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,13 @@ class Meta:
936936
finished_time = String(
937937
description='The time this job finished running (if it has yet).',
938938
)
939+
estimated_finish_time = String(
940+
description=(
941+
"The estimated time this job will finish, if applicable. "
942+
"This is based on the task's mean run time, or if not available, "
943+
"the execution time limit."
944+
),
945+
)
939946
job_id = ID(
940947
description='The ID of this job in the job runner it was submitted to.'
941948
)

tests/integration/test_data_store_mgr.py

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616

17-
from logging import INFO
1817
import logging
18+
from logging import INFO
1919
from typing import (
2020
Iterable,
2121
List,
@@ -25,12 +25,13 @@
2525

2626
import pytest
2727

28+
from cylc.flow import LOG
2829
from cylc.flow.commands import (
30+
force_trigger_tasks,
2931
run_cmd,
30-
force_trigger_tasks
3132
)
32-
from cylc.flow import LOG
3333
from cylc.flow.data_messages_pb2 import (
34+
PbJob,
3435
PbPrerequisite,
3536
PbTaskProxy,
3637
)
@@ -42,7 +43,10 @@
4243
TASKS,
4344
WORKFLOW,
4445
)
45-
from cylc.flow.id import TaskTokens, Tokens
46+
from cylc.flow.id import (
47+
TaskTokens,
48+
Tokens,
49+
)
4650
from cylc.flow.network.log_stream_handler import ProtobufStreamHandler
4751
from cylc.flow.scheduler import Scheduler
4852
from cylc.flow.task_events_mgr import TaskEventsManager
@@ -51,6 +55,7 @@
5155
TASK_OUTPUT_SUBMITTED,
5256
TASK_OUTPUT_SUCCEEDED,
5357
)
58+
from cylc.flow.task_proxy import TaskProxy
5459
from cylc.flow.task_state import (
5560
TASK_STATUS_FAILED,
5661
TASK_STATUS_PREPARING,
@@ -125,6 +130,11 @@ def get_pb_prereqs(schd: 'Scheduler') -> 'List[PbPrerequisite]':
125130
]
126131

127132

133+
def get_pb_job(schd: Scheduler, itask: TaskProxy) -> PbJob:
134+
"""Get the protobuf job for a given task from the data store."""
135+
return schd.data_store_mgr.data[schd.id][JOBS][itask.job_tokens.id]
136+
137+
128138
@pytest.fixture(scope='module')
129139
async def mod_harness(mod_flow, mod_scheduler, mod_start):
130140
flow_def = {
@@ -832,9 +842,6 @@ async def test_log_events(one: Scheduler, start):
832842

833843
async def test_no_backwards_job_state_change(one: Scheduler, start):
834844
"""It should not allow backwards job state changes."""
835-
def get_job_state(itask):
836-
return one.data_store_mgr.data[one.id][JOBS][itask.job_tokens.id].state
837-
838845
async with start(one):
839846
itask = one.pool.get_tasks()[0]
840847
itask.state_reset(TASK_STATUS_PREPARING)
@@ -843,9 +850,59 @@ def get_job_state(itask):
843850

844851
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_STARTED)
845852
await one.update_data_structure()
846-
assert get_job_state(itask) == TASK_STATUS_RUNNING
853+
assert get_pb_job(one, itask).state == TASK_STATUS_RUNNING
847854

848855
# Simulate late arrival of "submitted" message
849856
one.task_events_mgr.process_message(itask, INFO, TASK_OUTPUT_SUBMITTED)
850857
await one.update_data_structure()
851-
assert get_job_state(itask) == TASK_STATUS_RUNNING
858+
assert get_pb_job(one, itask).state == TASK_STATUS_RUNNING
859+
860+
861+
async def test_job_estimated_finish_time(one_conf, flow, scheduler, start):
862+
"""It should set estimated_finish_time on job elements along with
863+
started_time."""
864+
wid = flow({
865+
**one_conf,
866+
'scheduler': {'UTC mode': True},
867+
'runtime': {
868+
'one': {'execution time limit': 'PT2M'},
869+
},
870+
})
871+
schd: Scheduler = scheduler(wid)
872+
date = '2081-07-02T'
873+
874+
async def start_job(itask: TaskProxy, start_time: str):
875+
if not schd.pool.get_task(itask.point, itask.tdef.name):
876+
schd.pool.add_to_pool(itask)
877+
await schd.update_data_structure()
878+
itask.state_reset(TASK_STATUS_PREPARING)
879+
itask.submit_num += 1
880+
itask.jobs = []
881+
schd.task_events_mgr.process_message(
882+
itask, INFO, TASK_OUTPUT_SUBMITTED # submit time irrelevant
883+
)
884+
await schd.update_data_structure()
885+
schd.task_events_mgr.process_message(
886+
itask, INFO, TASK_OUTPUT_STARTED, f'{date}{start_time}'
887+
)
888+
await schd.update_data_structure()
889+
890+
async with start(schd):
891+
itask = schd.pool.get_tasks()[0]
892+
await start_job(itask, '06:00:00Z')
893+
# 1st job: estimate based on execution time limit:
894+
assert (
895+
get_pb_job(schd, itask).estimated_finish_time
896+
== f'{date}06:02:00Z'
897+
)
898+
899+
# Finish this job and start a new one:
900+
schd.task_events_mgr.process_message(
901+
itask, INFO, TASK_OUTPUT_SUCCEEDED, f'{date}06:00:40Z'
902+
)
903+
await start_job(itask, '06:01:00Z')
904+
# >=2nd job: estimate based on mean of previous jobs:
905+
assert (
906+
get_pb_job(schd, itask).estimated_finish_time
907+
== f'{date}06:01:40Z'
908+
)

0 commit comments

Comments
 (0)