1919
2020import sqlite3
2121import os
22+ from pathlib import Path
2223import tarfile
2324import re
2425from glob import glob
2526from sqlite3 import OperationalError
2627
27- from cylc .rundb import CylcSuiteDAO
28- from cylc .task_state import TASK_STATUS_GROUPS
29- """Provide data access object to the suite runtime database for Cylc Review."""
28+ from cylc .flow . rundb import CylcWorkflowDAO
29+ from cylc .flow . task_state import TASK_STATUS_GROUPS
30+ from cylc . flow . workflow_files import WorkflowFiles
3031
3132
3233class CylcReviewDAO (object ):
@@ -105,29 +106,30 @@ def __init__(self):
105106 self .daos = {}
106107
107108 def _db_init (self , user_name , suite_name ):
108- """Initialise a named CylcSuiteDAO database connection."""
109+ """Initialise a named CylcWorkflowDAO database connection."""
109110 key = (user_name , suite_name )
110111 if key not in self .daos :
111112 prefix = "~"
112113 if user_name :
113114 prefix += user_name
114115 for name in [os .path .join ("log" , "db" ), "cylc-suite.db" ]:
115- db_f_name = os .path .expanduser (os .path .join (
116- prefix , os .path .join ("cylc-run" , suite_name , name )))
117- self .daos [key ] = CylcSuiteDAO (db_f_name , is_public = True )
116+ db_f_name = os .path .expanduser (
117+ os .path .join (
118+ prefix , os .path .join ("cylc-run" , suite_name , name )))
119+ self .daos [key ] = CylcWorkflowDAO (db_f_name , is_public = True )
118120 if os .path .exists (db_f_name ):
119121 break
120122 self .is_cylc8 = self .set_is_cylc8 (user_name , suite_name )
121123 return self .daos [key ]
122124
123125 def _db_close (self , user_name , suite_name ):
124- """Close a named CylcSuiteDAO database connection."""
126+ """Close a named CylcWorkflowDAO database connection."""
125127 key = (user_name , suite_name )
126128 if self .daos .get (key ) is not None :
127129 self .daos [key ].close ()
128130
129131 def _db_exec (self , user_name , suite_name , stmt , stmt_args = None ):
130- """Execute a query on a named CylcSuiteDAO database connection."""
132+ """Execute a query on a named CylcWorkflowDAO database connection."""
131133 daos = self ._db_init (user_name , suite_name )
132134 if stmt_args is None :
133135 stmt_args = []
@@ -140,10 +142,12 @@ def _db_exec(self, user_name, suite_name, stmt, stmt_args=None):
140142 except sqlite3 .OperationalError as exc :
141143 # At Cylc 8.0.1+ Workflows installed but not run will not yet
142144 # have a database.
143- if (os .path .exists (os .path .dirname (
144- self .daos .values ()[0 ].db_file_name ) + '/flow.cylc' ) or
145- os .path .exists (os .path .dirname (
146- self .daos .values ()[0 ].db_file_name ) + '/suite.rc' )):
145+ wf_dir = Path (
146+ list (self .daos .values ())[0 ].db_file_name
147+ ).parent
148+ if (wf_dir / WorkflowFiles .FLOW_FILE ).exists () or (
149+ wf_dir / WorkflowFiles .SUITE_RC
150+ ).exists ():
147151 return []
148152 else :
149153 raise exc
@@ -152,7 +156,7 @@ def get_suite_broadcast_states(self, user_name, suite_name):
152156 """Return broadcast states of a suite.
153157 [[point, name, key, value], ...]
154158 """
155- stmt = CylcSuiteDAO .pre_select_broadcast_states (
159+ stmt = CylcWorkflowDAO .pre_select_broadcast_states (
156160 self ._db_init (user_name , suite_name ), order = "ASC" )[0 ]
157161 broadcast_states = []
158162 for row in self ._db_exec (user_name , suite_name , stmt ):
@@ -164,7 +168,7 @@ def get_suite_broadcast_events(self, user_name, suite_name):
164168 """Return broadcast events of a suite.
165169 [[time, change, point, name, key, value], ...]
166170 """
167- stmt = CylcSuiteDAO .pre_select_broadcast_events (
171+ stmt = CylcWorkflowDAO .pre_select_broadcast_events (
168172 self ._db_init (user_name , suite_name ), order = "DESC" )[0 ]
169173 broadcast_events = []
170174 for row in self ._db_exec (user_name , suite_name , stmt ):
@@ -175,16 +179,27 @@ def get_suite_broadcast_events(self, user_name, suite_name):
175179
176180 @staticmethod
177181 def set_is_cylc8 (user_name , suite_name ):
178- from cylc .review import CylcReviewService
182+ from cylc .flow .review import CylcReviewService
183+
179184 suite_dir = os .path .join (
180185 CylcReviewService ._get_user_home (user_name ),
181186 "cylc-run" ,
182187 suite_name )
183188 return CylcReviewService .is_cylc8 (suite_dir )
184189
185190 def get_suite_job_entries (
186- self , user_name , suite_name , cycles , tasks , task_status ,
187- job_status , order , limit , offset , flow_nums = 'flow_nums' ):
191+ self ,
192+ user_name ,
193+ suite_name ,
194+ cycles ,
195+ tasks ,
196+ task_status ,
197+ job_status ,
198+ order ,
199+ limit ,
200+ offset ,
201+ flow_nums = 'flow_nums' ,
202+ ):
188203 """Query suite runtime database to return a listing of task jobs.
189204 user -- A string containing a valid user ID
190205 suite -- A string containing a valid suite ID
@@ -226,18 +241,20 @@ def get_suite_job_entries(
226241
227242 # Get number of entries
228243 of_n_entries = 0
229- stmt = ("SELECT COUNT(*)" +
230- " FROM task_states LEFT JOIN task_jobs USING (name, cycle)" +
231- where_expr )
244+ stmt = (
245+ "SELECT COUNT(*)"
246+ + " FROM task_states LEFT JOIN task_jobs USING (name, cycle)"
247+ + where_expr
248+ )
232249 try :
233250 for row in self ._db_exec (user_name , suite_name , stmt , where_args ):
234251 of_n_entries = row [0 ]
235252 break
236253 else :
237254 self ._db_close (user_name , suite_name )
238- return ([], 0 )
255+ return ([], 0 , self . is_cylc8 )
239256 except sqlite3 .Error :
240- return ([], 0 )
257+ return ([], 0 , self . is_cylc8 )
241258 if self .is_cylc8 :
242259 stmt = (
243260 "SELECT" +
@@ -250,7 +267,9 @@ def get_suite_job_entries(
250267 " time_run, time_run_exit, run_signal, run_status," +
251268 " platform_name, job_runner_name, job_id" +
252269 " FROM task_states LEFT JOIN task_jobs USING " +
253- "(cycle, name, " + flow_nums + ") " +
270+ "(cycle, name, " +
271+ flow_nums +
272+ ") " +
254273 where_expr +
255274 " ORDER BY " +
256275 self .JOB_ORDERS .get (order , self .JOB_ORDERS ["time_desc" ])
@@ -293,18 +312,28 @@ def get_suite_job_entries(
293312 if exc .message == self .CANNOT_JOIN_FLOW_NUMS :
294313 stmt = stmt .replace ('flow_nums' , 'submit_num' )
295314 db_data = self ._db_exec (
296- user_name , suite_name , stmt , where_args + limit_args
315+ user_name , suite_name , stmt , where_args + limit_args
297316 )
298317 eight_zero_warning = True
299318 else :
300319 raise exc
301320
302321 for row in db_data :
303322 (
304- cycle , name , submit_num , submit_num_max , task_status ,
305- time_submit , submit_status ,
306- time_run , time_run_exit , run_signal , run_status ,
307- user_at_host , batch_sys_name , batch_sys_job_id
323+ cycle ,
324+ name ,
325+ submit_num ,
326+ submit_num_max ,
327+ task_status ,
328+ time_submit ,
329+ submit_status ,
330+ time_run ,
331+ time_run_exit ,
332+ run_signal ,
333+ run_status ,
334+ user_at_host ,
335+ batch_sys_name ,
336+ batch_sys_job_id ,
308337 ) = row [1 :]
309338 entry = {
310339 "cycle" : cycle ,
@@ -469,7 +498,7 @@ def _get_job_logs(self, user_name, suite_name, entries, entry_of):
469498 entry ["seq_logs_indexes" ][seq_key ] = int_indexes
470499 except ValueError :
471500 pass
472- for filename , log_dict in entry ["logs" ].items ():
501+ for _ , log_dict in entry ["logs" ].items ():
473502 # Unset seq_key for singular items
474503 if log_dict ["seq_key" ] not in entry ["seq_logs_indexes" ]:
475504 log_dict ["seq_key" ] = None
@@ -499,8 +528,10 @@ def get_suite_cycles_summary(
499528 """
500529
501530 of_n_entries = 0
502- stmt = ("SELECT COUNT(DISTINCT cycle) FROM task_states WHERE " +
503- "submit_num > 0" )
531+ stmt = (
532+ "SELECT COUNT(DISTINCT cycle) FROM task_states WHERE "
533+ + "submit_num > 0"
534+ )
504535 try :
505536 for row in self ._db_exec (user_name , suite_name , stmt ):
506537 of_n_entries = row [0 ]
@@ -555,7 +586,7 @@ def get_suite_cycles_summary(
555586 " cycle," +
556587 " max(time_updated)," +
557588 " sum(" + states_stmt ["active" ] + ") AS n_active," +
558- " sum(" + states_stmt ["success" ] + ") AS n_success,"
589+ " sum(" + states_stmt ["success" ] + ") AS n_success," +
559590 " sum(" + states_stmt ["fail" ] + ") AS n_fail"
560591 " FROM task_states" +
561592 " GROUP BY cycle" +
@@ -612,12 +643,13 @@ def get_suite_cycles_summary(
612643 " FROM task_jobs GROUP BY cycle" )
613644 else :
614645 fail_events_stmt = " OR " .join (
615- ["event=='%s'" % (name )
616- for name in TASK_STATUS_GROUPS [ "fail" ]] )
646+ ["event=='%s'" % (name ) for name in TASK_STATUS_GROUPS [ "fail" ]]
647+ )
617648 stmt = (
618649 "SELECT cycle," +
619650 " sum(" + fail_events_stmt + ") AS n_job_fail" +
620- " FROM task_events GROUP BY cycle" )
651+ " FROM task_events GROUP BY cycle"
652+ )
621653 self ._db_close (user_name , suite_name )
622654 for cycle , n_job_active , n_job_success , n_job_fail in self ._db_exec (
623655 user_name , suite_name , stmt ):
0 commit comments