Skip to content

Commit 599ac83

Browse files
committed
Python 3 and Cylc 8 conversion
1 parent c9025e4 commit 599ac83

26 files changed

+850
-597
lines changed

cylc/flow/review.py

Lines changed: 145 additions & 118 deletions
Large diffs are not rendered by default.

cylc/flow/review_dao.py

Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
import sqlite3
2121
import os
22+
from pathlib import Path
2223
import tarfile
2324
import re
2425
from glob import glob
2526
from sqlite3 import OperationalError
2627

27-
from cylc.rundb import CylcSuiteDAO
28-
from cylc.task_state import TASK_STATUS_GROUPS
29-
"""Provide data access object to the suite runtime database for Cylc Review."""
28+
from cylc.flow.rundb import CylcWorkflowDAO
29+
from cylc.flow.task_state import TASK_STATUS_GROUPS
30+
from cylc.flow.workflow_files import WorkflowFiles
3031

3132

3233
class CylcReviewDAO(object):
@@ -105,29 +106,30 @@ def __init__(self):
105106
self.daos = {}
106107

107108
def _db_init(self, user_name, suite_name):
108-
"""Initialise a named CylcSuiteDAO database connection."""
109+
"""Initialise a named CylcWorkflowDAO database connection."""
109110
key = (user_name, suite_name)
110111
if key not in self.daos:
111112
prefix = "~"
112113
if user_name:
113114
prefix += user_name
114115
for name in [os.path.join("log", "db"), "cylc-suite.db"]:
115-
db_f_name = os.path.expanduser(os.path.join(
116-
prefix, os.path.join("cylc-run", suite_name, name)))
117-
self.daos[key] = CylcSuiteDAO(db_f_name, is_public=True)
116+
db_f_name = os.path.expanduser(
117+
os.path.join(
118+
prefix, os.path.join("cylc-run", suite_name, name)))
119+
self.daos[key] = CylcWorkflowDAO(db_f_name, is_public=True)
118120
if os.path.exists(db_f_name):
119121
break
120122
self.is_cylc8 = self.set_is_cylc8(user_name, suite_name)
121123
return self.daos[key]
122124

123125
def _db_close(self, user_name, suite_name):
124-
"""Close a named CylcSuiteDAO database connection."""
126+
"""Close a named CylcWorkflowDAO database connection."""
125127
key = (user_name, suite_name)
126128
if self.daos.get(key) is not None:
127129
self.daos[key].close()
128130

129131
def _db_exec(self, user_name, suite_name, stmt, stmt_args=None):
130-
"""Execute a query on a named CylcSuiteDAO database connection."""
132+
"""Execute a query on a named CylcWorkflowDAO database connection."""
131133
daos = self._db_init(user_name, suite_name)
132134
if stmt_args is None:
133135
stmt_args = []
@@ -140,10 +142,12 @@ def _db_exec(self, user_name, suite_name, stmt, stmt_args=None):
140142
except sqlite3.OperationalError as exc:
141143
# At Cylc 8.0.1+ Workflows installed but not run will not yet
142144
# have a database.
143-
if (os.path.exists(os.path.dirname(
144-
self.daos.values()[0].db_file_name) + '/flow.cylc') or
145-
os.path.exists(os.path.dirname(
146-
self.daos.values()[0].db_file_name) + '/suite.rc')):
145+
wf_dir = Path(
146+
list(self.daos.values())[0].db_file_name
147+
).parent
148+
if (wf_dir / WorkflowFiles.FLOW_FILE).exists() or (
149+
wf_dir / WorkflowFiles.SUITE_RC
150+
).exists():
147151
return []
148152
else:
149153
raise exc
@@ -152,7 +156,7 @@ def get_suite_broadcast_states(self, user_name, suite_name):
152156
"""Return broadcast states of a suite.
153157
[[point, name, key, value], ...]
154158
"""
155-
stmt = CylcSuiteDAO.pre_select_broadcast_states(
159+
stmt = CylcWorkflowDAO.pre_select_broadcast_states(
156160
self._db_init(user_name, suite_name), order="ASC")[0]
157161
broadcast_states = []
158162
for row in self._db_exec(user_name, suite_name, stmt):
@@ -164,7 +168,7 @@ def get_suite_broadcast_events(self, user_name, suite_name):
164168
"""Return broadcast events of a suite.
165169
[[time, change, point, name, key, value], ...]
166170
"""
167-
stmt = CylcSuiteDAO.pre_select_broadcast_events(
171+
stmt = CylcWorkflowDAO.pre_select_broadcast_events(
168172
self._db_init(user_name, suite_name), order="DESC")[0]
169173
broadcast_events = []
170174
for row in self._db_exec(user_name, suite_name, stmt):
@@ -175,16 +179,27 @@ def get_suite_broadcast_events(self, user_name, suite_name):
175179

176180
@staticmethod
177181
def set_is_cylc8(user_name, suite_name):
178-
from cylc.review import CylcReviewService
182+
from cylc.flow.review import CylcReviewService
183+
179184
suite_dir = os.path.join(
180185
CylcReviewService._get_user_home(user_name),
181186
"cylc-run",
182187
suite_name)
183188
return CylcReviewService.is_cylc8(suite_dir)
184189

185190
def get_suite_job_entries(
186-
self, user_name, suite_name, cycles, tasks, task_status,
187-
job_status, order, limit, offset, flow_nums='flow_nums'):
191+
self,
192+
user_name,
193+
suite_name,
194+
cycles,
195+
tasks,
196+
task_status,
197+
job_status,
198+
order,
199+
limit,
200+
offset,
201+
flow_nums='flow_nums',
202+
):
188203
"""Query suite runtime database to return a listing of task jobs.
189204
user -- A string containing a valid user ID
190205
suite -- A string containing a valid suite ID
@@ -226,18 +241,20 @@ def get_suite_job_entries(
226241

227242
# Get number of entries
228243
of_n_entries = 0
229-
stmt = ("SELECT COUNT(*)" +
230-
" FROM task_states LEFT JOIN task_jobs USING (name, cycle)" +
231-
where_expr)
244+
stmt = (
245+
"SELECT COUNT(*)"
246+
+ " FROM task_states LEFT JOIN task_jobs USING (name, cycle)"
247+
+ where_expr
248+
)
232249
try:
233250
for row in self._db_exec(user_name, suite_name, stmt, where_args):
234251
of_n_entries = row[0]
235252
break
236253
else:
237254
self._db_close(user_name, suite_name)
238-
return ([], 0)
255+
return ([], 0, self.is_cylc8)
239256
except sqlite3.Error:
240-
return ([], 0)
257+
return ([], 0, self.is_cylc8)
241258
if self.is_cylc8:
242259
stmt = (
243260
"SELECT" +
@@ -293,18 +310,28 @@ def get_suite_job_entries(
293310
if exc.message == self.CANNOT_JOIN_FLOW_NUMS:
294311
stmt = stmt.replace('flow_nums', 'submit_num')
295312
db_data = self._db_exec(
296-
user_name, suite_name, stmt, where_args + limit_args
313+
user_name, suite_name, stmt, where_args + limit_args
297314
)
298315
eight_zero_warning = True
299316
else:
300317
raise exc
301318

302319
for row in db_data:
303320
(
304-
cycle, name, submit_num, submit_num_max, task_status,
305-
time_submit, submit_status,
306-
time_run, time_run_exit, run_signal, run_status,
307-
user_at_host, batch_sys_name, batch_sys_job_id
321+
cycle,
322+
name,
323+
submit_num,
324+
submit_num_max,
325+
task_status,
326+
time_submit,
327+
submit_status,
328+
time_run,
329+
time_run_exit,
330+
run_signal,
331+
run_status,
332+
user_at_host,
333+
batch_sys_name,
334+
batch_sys_job_id,
308335
) = row[1:]
309336
entry = {
310337
"cycle": cycle,
@@ -469,7 +496,7 @@ def _get_job_logs(self, user_name, suite_name, entries, entry_of):
469496
entry["seq_logs_indexes"][seq_key] = int_indexes
470497
except ValueError:
471498
pass
472-
for filename, log_dict in entry["logs"].items():
499+
for _, log_dict in entry["logs"].items():
473500
# Unset seq_key for singular items
474501
if log_dict["seq_key"] not in entry["seq_logs_indexes"]:
475502
log_dict["seq_key"] = None
@@ -555,8 +582,8 @@ def get_suite_cycles_summary(
555582
" cycle," +
556583
" max(time_updated)," +
557584
" sum(" + states_stmt["active"] + ") AS n_active," +
558-
" sum(" + states_stmt["success"] + ") AS n_success,"
559-
" sum(" + states_stmt["fail"] + ") AS n_fail"
585+
" sum(" + states_stmt["success"] + ") AS n_success," +
586+
" sum(" + states_stmt["fail"] + ") AS n_fail" +
560587
" FROM task_states" +
561588
" GROUP BY cycle" +
562589
" HAVING n_active > 0" +

cylc/flow/rundb.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,15 @@ def select_broadcast_states(self, callback, sort=None):
625625
for row_idx, row in enumerate(self.connect().execute(stmt)):
626626
callback(row_idx, list(row))
627627

628+
def pre_select_broadcast_events(self, order=None):
629+
"""Query statement and args formation for select_broadcast_events."""
630+
form_stmt = r"SELECT time,change,point,namespace,key,value FROM %s"
631+
if order == "DESC":
632+
ordering = (" ORDER BY " +
633+
"time DESC, point DESC, namespace DESC, key DESC")
634+
form_stmt = form_stmt + ordering
635+
return form_stmt % self.TABLE_BROADCAST_EVENTS, []
636+
628637
def select_workflow_params(self) -> Iterable[Tuple[str, Optional[str]]]:
629638
"""Select all from workflow_params.
630639

cylc/flow/scripts/cylc.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,6 @@ def get_version(long=False):
293293
'cylc set-outputs (cylc 8.0-8.2) has been replaced by cylc set',
294294
'restart':
295295
'cylc run & cylc restart have been replaced by cylc play',
296-
'review':
297-
'cylc review has been removed; the latest Cylc 7 version is forward'
298-
' compatible with Cylc 8.',
299296
'suite-state':
300297
'cylc suite-state has been replaced by cylc workflow-state',
301298
'run':

cylc/flow/scripts/review.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#!/usr/bin/env python3
2+
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
3+
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
4+
#
5+
# This program is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU General Public License
16+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
"""cylc review [start|stop]
18+
19+
Start/stop ad-hoc Cylc Review web service server for browsing users' suite
20+
logs via an HTTP interface.
21+
22+
With no arguments, the status of the ad-hoc web service server is printed.
23+
"""
24+
25+
import os
26+
import signal
27+
28+
from cylc.flow.option_parsers import CylcOptionParser as COP
29+
from cylc.flow.review import CylcReviewService
30+
from cylc.flow.terminal import cli_function
31+
from cylc.flow.ws import _ws_init, _get_server_status
32+
33+
34+
IRRELEVANT_OPTS = [
35+
'--host',
36+
'--user',
37+
'--verbose',
38+
'--debug',
39+
'--quiet',
40+
'--timestamp',
41+
'--no-timestamp',
42+
'--color',
43+
]
44+
START = 'start'
45+
STOP = 'stop'
46+
47+
48+
def get_option_parser():
49+
parser = COP(
50+
__doc__,
51+
argdoc=[
52+
COP.optional(("start", "Start ad-hoc web service server.")),
53+
COP.optional(("stop", "Stop ad-hoc web service server.")),
54+
],
55+
)
56+
57+
parser.add_option(
58+
'--port',
59+
'-p',
60+
help="Port to use for Cylc Review (for start only).",
61+
default=8080,
62+
action='store',
63+
type=int,
64+
)
65+
parser.add_option(
66+
"--non-interactive",
67+
"--yes",
68+
"-y",
69+
help="Switch off interactive prompting i.e. answer yes to everything"
70+
" (for stop only).",
71+
action="store_true",
72+
default=False,
73+
dest="non_interactive",
74+
)
75+
parser.add_option(
76+
"--service-root",
77+
"-R",
78+
help="Include web service name under root of URL (for start only).",
79+
action="store",
80+
default='/',
81+
dest="service_root",
82+
)
83+
return parser
84+
85+
86+
@cli_function(get_option_parser, remove_opts=IRRELEVANT_OPTS)
87+
def main(_, opts, *args):
88+
"""Start/Stop the Cylc Review server."""
89+
subcmd = args[0] if args else ''
90+
91+
# Get current server status:
92+
status = _get_server_status(CylcReviewService)
93+
94+
# User has asked to start the server, and it's not already running:
95+
if subcmd == START:
96+
_ws_init(
97+
service_cls=CylcReviewService,
98+
port=opts.port,
99+
service_root=opts.service_root,
100+
)
101+
# User has asked to stop or get info on server, server _not_ running:
102+
elif not status:
103+
print(f'No {CylcReviewService.TITLE} service server running.')
104+
else:
105+
# Report on status of server:
106+
for key, value in sorted(status.items()):
107+
print(f'{key}={value}')
108+
109+
# User has asked to stop the server:
110+
if (
111+
subcmd == STOP # User asked for server stop
112+
and status.get('pid') # Server is running
113+
and ( # User really wants to stop the server
114+
opts.non_interactive
115+
or input("Stop server by termination? y/n(default=n)") == "y"
116+
)
117+
):
118+
try:
119+
os.killpg(int(status["pid"]), signal.SIGTERM)
120+
except OSError:
121+
print("Termination signal failed.")

cylc/flow/task_state.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@
173173
TASK_STATUS_SUCCEEDED: TASK_OUTPUT_SUCCEEDED,
174174
}
175175

176+
# Tasks statuses grouped for cylc review 'create_suite_cycles_summary()'
177+
TASK_STATUS_GROUPS = {
178+
"active": list(TASK_STATUSES_ACTIVE | TASK_STATUSES_NEVER_ACTIVE),
179+
"fail": list(TASK_STATUSES_FAILURE),
180+
"success": list(TASK_STATUSES_SUCCESS)
181+
}
182+
176183

177184
def status_leq(status_a, status_b):
178185
""""Return True if status_a <= status_b"""

0 commit comments

Comments
 (0)