1919
2020import sqlite3
2121import os
22+ from pathlib import Path
2223import tarfile
2324import re
2425from glob import glob
2526from sqlite3 import OperationalError
2627
27- from cylc .rundb import CylcSuiteDAO
28- from cylc .task_state import TASK_STATUS_GROUPS
29- """Provide data access object to the suite runtime database for Cylc Review."""
28+ from cylc .flow . rundb import CylcWorkflowDAO
29+ from cylc .flow . task_state import TASK_STATUS_GROUPS
30+ from cylc . flow . workflow_files import WorkflowFiles
3031
3132
3233class CylcReviewDAO (object ):
@@ -105,29 +106,30 @@ def __init__(self):
105106 self .daos = {}
106107
107108 def _db_init (self , user_name , suite_name ):
108- """Initialise a named CylcSuiteDAO database connection."""
109+ """Initialise a named CylcWorkflowDAO database connection."""
109110 key = (user_name , suite_name )
110111 if key not in self .daos :
111112 prefix = "~"
112113 if user_name :
113114 prefix += user_name
114115 for name in [os .path .join ("log" , "db" ), "cylc-suite.db" ]:
115- db_f_name = os .path .expanduser (os .path .join (
116- prefix , os .path .join ("cylc-run" , suite_name , name )))
117- self .daos [key ] = CylcSuiteDAO (db_f_name , is_public = True )
116+ db_f_name = os .path .expanduser (
117+ os .path .join (
118+ prefix , os .path .join ("cylc-run" , suite_name , name )))
119+ self .daos [key ] = CylcWorkflowDAO (db_f_name , is_public = True )
118120 if os .path .exists (db_f_name ):
119121 break
120122 self .is_cylc8 = self .set_is_cylc8 (user_name , suite_name )
121123 return self .daos [key ]
122124
123125 def _db_close (self , user_name , suite_name ):
124- """Close a named CylcSuiteDAO database connection."""
126+ """Close a named CylcWorkflowDAO database connection."""
125127 key = (user_name , suite_name )
126128 if self .daos .get (key ) is not None :
127129 self .daos [key ].close ()
128130
129131 def _db_exec (self , user_name , suite_name , stmt , stmt_args = None ):
130- """Execute a query on a named CylcSuiteDAO database connection."""
132+ """Execute a query on a named CylcWorkflowDAO database connection."""
131133 daos = self ._db_init (user_name , suite_name )
132134 if stmt_args is None :
133135 stmt_args = []
@@ -140,10 +142,12 @@ def _db_exec(self, user_name, suite_name, stmt, stmt_args=None):
140142 except sqlite3 .OperationalError as exc :
141143 # At Cylc 8.0.1+ Workflows installed but not run will not yet
142144 # have a database.
143- if (os .path .exists (os .path .dirname (
144- self .daos .values ()[0 ].db_file_name ) + '/flow.cylc' ) or
145- os .path .exists (os .path .dirname (
146- self .daos .values ()[0 ].db_file_name ) + '/suite.rc' )):
145+ wf_dir = Path (
146+ list (self .daos .values ())[0 ].db_file_name
147+ ).parent
148+ if (wf_dir / WorkflowFiles .FLOW_FILE ).exists () or (
149+ wf_dir / WorkflowFiles .SUITE_RC
150+ ).exists ():
147151 return []
148152 else :
149153 raise exc
@@ -152,7 +156,7 @@ def get_suite_broadcast_states(self, user_name, suite_name):
152156 """Return broadcast states of a suite.
153157 [[point, name, key, value], ...]
154158 """
155- stmt = CylcSuiteDAO .pre_select_broadcast_states (
159+ stmt = CylcWorkflowDAO .pre_select_broadcast_states (
156160 self ._db_init (user_name , suite_name ), order = "ASC" )[0 ]
157161 broadcast_states = []
158162 for row in self ._db_exec (user_name , suite_name , stmt ):
@@ -164,7 +168,7 @@ def get_suite_broadcast_events(self, user_name, suite_name):
164168 """Return broadcast events of a suite.
165169 [[time, change, point, name, key, value], ...]
166170 """
167- stmt = CylcSuiteDAO .pre_select_broadcast_events (
171+ stmt = CylcWorkflowDAO .pre_select_broadcast_events (
168172 self ._db_init (user_name , suite_name ), order = "DESC" )[0 ]
169173 broadcast_events = []
170174 for row in self ._db_exec (user_name , suite_name , stmt ):
@@ -175,16 +179,27 @@ def get_suite_broadcast_events(self, user_name, suite_name):
175179
176180 @staticmethod
177181 def set_is_cylc8 (user_name , suite_name ):
178- from cylc .review import CylcReviewService
182+ from cylc .flow .review import CylcReviewService
183+
179184 suite_dir = os .path .join (
180185 CylcReviewService ._get_user_home (user_name ),
181186 "cylc-run" ,
182187 suite_name )
183188 return CylcReviewService .is_cylc8 (suite_dir )
184189
185190 def get_suite_job_entries (
186- self , user_name , suite_name , cycles , tasks , task_status ,
187- job_status , order , limit , offset , flow_nums = 'flow_nums' ):
191+ self ,
192+ user_name ,
193+ suite_name ,
194+ cycles ,
195+ tasks ,
196+ task_status ,
197+ job_status ,
198+ order ,
199+ limit ,
200+ offset ,
201+ flow_nums = 'flow_nums' ,
202+ ):
188203 """Query suite runtime database to return a listing of task jobs.
189204 user -- A string containing a valid user ID
190205 suite -- A string containing a valid suite ID
@@ -226,18 +241,20 @@ def get_suite_job_entries(
226241
227242 # Get number of entries
228243 of_n_entries = 0
229- stmt = ("SELECT COUNT(*)" +
230- " FROM task_states LEFT JOIN task_jobs USING (name, cycle)" +
231- where_expr )
244+ stmt = (
245+ "SELECT COUNT(*)"
246+ + " FROM task_states LEFT JOIN task_jobs USING (name, cycle)"
247+ + where_expr
248+ )
232249 try :
233250 for row in self ._db_exec (user_name , suite_name , stmt , where_args ):
234251 of_n_entries = row [0 ]
235252 break
236253 else :
237254 self ._db_close (user_name , suite_name )
238- return ([], 0 )
255+ return ([], 0 , self . is_cylc8 )
239256 except sqlite3 .Error :
240- return ([], 0 )
257+ return ([], 0 , self . is_cylc8 )
241258 if self .is_cylc8 :
242259 stmt = (
243260 "SELECT" +
@@ -293,18 +310,28 @@ def get_suite_job_entries(
293310 if exc .message == self .CANNOT_JOIN_FLOW_NUMS :
294311 stmt = stmt .replace ('flow_nums' , 'submit_num' )
295312 db_data = self ._db_exec (
296- user_name , suite_name , stmt , where_args + limit_args
313+ user_name , suite_name , stmt , where_args + limit_args
297314 )
298315 eight_zero_warning = True
299316 else :
300317 raise exc
301318
302319 for row in db_data :
303320 (
304- cycle , name , submit_num , submit_num_max , task_status ,
305- time_submit , submit_status ,
306- time_run , time_run_exit , run_signal , run_status ,
307- user_at_host , batch_sys_name , batch_sys_job_id
321+ cycle ,
322+ name ,
323+ submit_num ,
324+ submit_num_max ,
325+ task_status ,
326+ time_submit ,
327+ submit_status ,
328+ time_run ,
329+ time_run_exit ,
330+ run_signal ,
331+ run_status ,
332+ user_at_host ,
333+ batch_sys_name ,
334+ batch_sys_job_id ,
308335 ) = row [1 :]
309336 entry = {
310337 "cycle" : cycle ,
@@ -469,7 +496,7 @@ def _get_job_logs(self, user_name, suite_name, entries, entry_of):
469496 entry ["seq_logs_indexes" ][seq_key ] = int_indexes
470497 except ValueError :
471498 pass
472- for filename , log_dict in entry ["logs" ].items ():
499+ for _ , log_dict in entry ["logs" ].items ():
473500 # Unset seq_key for singular items
474501 if log_dict ["seq_key" ] not in entry ["seq_logs_indexes" ]:
475502 log_dict ["seq_key" ] = None
@@ -555,8 +582,8 @@ def get_suite_cycles_summary(
555582 " cycle," +
556583 " max(time_updated)," +
557584 " sum(" + states_stmt ["active" ] + ") AS n_active," +
558- " sum(" + states_stmt ["success" ] + ") AS n_success,"
559- " sum(" + states_stmt ["fail" ] + ") AS n_fail"
585+ " sum(" + states_stmt ["success" ] + ") AS n_success," +
586+ " sum(" + states_stmt ["fail" ] + ") AS n_fail" +
560587 " FROM task_states" +
561588 " GROUP BY cycle" +
562589 " HAVING n_active > 0" +
0 commit comments