Skip to content

Commit 7556f34

Browse files
authored
remove cleaning old job code (#1137)
1 parent aa2409a commit 7556f34

File tree

2 files changed

+57
-46
lines changed

2 files changed

+57
-46
lines changed

src/ClusterManager/db_manager.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import time
99
import yaml
1010

11-
1211
from cluster_manager import setup_exporter_thread, \
1312
manager_iteration_histogram, \
1413
register_stack_trace_dump, \
@@ -55,33 +54,42 @@ def delete_old_cluster_status(days_ago):
5554
def delete_old_inactive_jobs(days_ago):
5655
table = "jobs"
5756
with DataHandler() as data_handler:
58-
logger.info("Deleting inactive job records from table %s older than %s "
59-
"day(s)", table, days_ago)
57+
logger.info(
58+
"Deleting inactive job records from table %s older than %s "
59+
"day(s)", table, days_ago)
6060

61-
cond = {
62-
"jobStatus": ("IN", ["finished", "failed", "killed", "error"])
63-
}
61+
cond = {"jobStatus": ("IN", ["finished", "failed", "killed", "error"])}
6462
ret = data_handler.delete_rows_from_table_older_than_days(
6563
table, days_ago, col="lastUpdated", cond=cond)
6664
ret_status = "succeeded" if ret is True else "failed"
67-
logger.info("Deleting inactive job records from table %s older than %s "
68-
"day(s) %s", table, days_ago, ret_status)
65+
logger.info(
66+
"Deleting inactive job records from table %s older than %s "
67+
"day(s) %s", table, days_ago, ret_status)
68+
69+
70+
def sleep_with_update(time_to_sleep, fn):
71+
for _ in range(int(time_to_sleep / 100)):
72+
fn()
73+
time.sleep(100)
6974

7075

7176
def run():
7277
register_stack_trace_dump()
7378
create_log()
79+
80+
update = lambda: update_file_modification_time("db_manager")
7481
while True:
75-
update_file_modification_time("db_manager")
82+
update()
7683

7784
with manager_iteration_histogram.labels("db_manager").time():
7885
try:
7986
delete_old_cluster_status(CLUSTER_STATUS_EXPIRY)
80-
delete_old_inactive_jobs(JOBS_EXPIRY)
87+
# query below is too time consuming since lastUpdated in job table is not indexed
88+
# delete_old_inactive_jobs(JOBS_EXPIRY)
8189
except:
82-
logger.exception("Deleting old cluster status failed",
83-
exc_info=True)
84-
time.sleep(86400)
90+
logger.exception("Deleting old cluster status failed")
91+
92+
sleep_with_update(86400, update)
8593

8694

8795
if __name__ == '__main__':

src/utils/MySQLDataHandler.py

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ def AddStorage(self, vcName, url, storageType, metadata, defaultMountPath):
292292
cursor.close()
293293
return True
294294
except Exception as e:
295-
logger.error('AddStorage Exception: %s', str(e))
295+
logger.exception('AddStorage Exception: %s', str(e))
296296
return False
297297

298298
@record
@@ -306,7 +306,7 @@ def DeleteStorage(self, vcName, url):
306306
cursor.close()
307307
return True
308308
except Exception as e:
309-
logger.error('DeleteStorage Exception: %s', str(e))
309+
logger.exception('DeleteStorage Exception: %s', str(e))
310310
return False
311311

312312
@record
@@ -327,7 +327,7 @@ def ListStorages(self, vcName):
327327
record["defaultMountPath"] = defaultMountPath
328328
ret.append(record)
329329
except Exception as e:
330-
logger.error('ListStorages Exception: %s', str(e))
330+
logger.exception('ListStorages Exception: %s', str(e))
331331
pass
332332
self.conn.commit()
333333
cursor.close()
@@ -346,7 +346,7 @@ def UpdateStorage(self, vcName, url, storageType, metadata,
346346
cursor.close()
347347
return True
348348
except Exception as e:
349-
logger.error('Exception: %s', str(e))
349+
logger.exception('Exception: %s', str(e))
350350
return False
351351

352352
def init_vc_sqls(self, config):
@@ -374,7 +374,7 @@ def AddVC(self, vcName, quota, metadata, res_quota, res_meta):
374374
cursor.close()
375375
return True
376376
except Exception as e:
377-
logger.error('AddVC Exception: %s', str(e))
377+
logger.exception('AddVC Exception: %s', str(e))
378378
return False
379379

380380
@record
@@ -394,7 +394,7 @@ def ListVCs(self):
394394
}
395395
ret.append(rec)
396396
except Exception as e:
397-
logger.error('Exception: %s', str(e))
397+
logger.exception('Exception: %s', str(e))
398398
pass
399399
self.conn.commit()
400400
cursor.close()
@@ -411,7 +411,7 @@ def DeleteVC(self, vcName):
411411
cursor.close()
412412
return True
413413
except Exception as e:
414-
logger.error('DeleteVC Exception: %s', str(e))
414+
logger.exception('DeleteVC Exception: %s', str(e))
415415
return False
416416

417417
@record
@@ -425,7 +425,7 @@ def UpdateVC(self, vcName, quota, metadata):
425425
cursor.close()
426426
return True
427427
except Exception as e:
428-
logger.error('Exception: %s', str(e))
428+
logger.exception('Exception: %s', str(e))
429429
return False
430430

431431
@record
@@ -502,7 +502,7 @@ def UpdateAce(self, identityName, identityId, resource, permissions,
502502
cursor.close()
503503
return True
504504
except Exception as e:
505-
logger.error('UpdateAce Exception: %s', str(e))
505+
logger.exception('UpdateAce Exception: %s', str(e))
506506
return False
507507

508508
@record
@@ -517,7 +517,7 @@ def UpdateAclIdentityId(self, identityName, identityId):
517517
cursor.close()
518518
return True
519519
except Exception as e:
520-
logger.error('Exception: %s', str(e))
520+
logger.exception('Exception: %s', str(e))
521521
return False
522522

523523
@record
@@ -533,7 +533,7 @@ def DeleteResourceAcl(self, resource):
533533
cursor.close()
534534
return True
535535
except Exception as e:
536-
logger.error('Exception: %s', str(e))
536+
logger.exception('Exception: %s', str(e))
537537
return False
538538

539539
@record
@@ -549,7 +549,7 @@ def DeleteAce(self, identityName, resource):
549549
cursor.close()
550550
return True
551551
except Exception as e:
552-
logger.error('DeleteAce Exception: %s', str(e))
552+
logger.exception('DeleteAce Exception: %s', str(e))
553553
return False
554554

555555
@record
@@ -570,7 +570,7 @@ def GetAcl(self):
570570
record["isDeny"] = isDeny
571571
ret.append(record)
572572
except Exception as e:
573-
logger.error('Exception: %s', str(e))
573+
logger.exception('Exception: %s', str(e))
574574
pass
575575
self.conn.commit()
576576
cursor.close()
@@ -594,7 +594,7 @@ def GetResourceAcl(self, resource):
594594
record["isDeny"] = isDeny
595595
ret.append(record)
596596
except Exception as e:
597-
logger.error('Exception: %s', str(e))
597+
logger.exception('Exception: %s', str(e))
598598
self.conn.commit()
599599
cursor.close()
600600
return ret
@@ -613,7 +613,7 @@ def AddJob(self, jobParams):
613613
cursor.close()
614614
return True
615615
except Exception as e:
616-
logger.error('Exception: %s', str(e))
616+
logger.exception('Exception: %s', str(e))
617617
return False
618618

619619
@record
@@ -674,7 +674,7 @@ def GetJobList(self,
674674
record["lastUpdated"] = lastUpdated
675675
ret.append(record)
676676
except Exception as e:
677-
logger.error('Exception: %s', str(e))
677+
logger.exception('Exception: %s', str(e))
678678
self.conn.commit()
679679
cursor.close()
680680
return ret
@@ -745,7 +745,7 @@ def GetJobListV2(self,
745745
ret["finishedJobs"].append(record)
746746
self.conn.commit()
747747
except Exception as e:
748-
logger.error('GetJobListV2 Exception: %s', str(e))
748+
logger.exception('GetJobListV2 Exception: %s', str(e))
749749
finally:
750750
if cursor is not None:
751751
cursor.close()
@@ -858,9 +858,9 @@ def get_union_job_list(self, username, vc_name, num, status):
858858

859859
self.conn.commit()
860860
except:
861-
logger.error("Exception in getting union job list. status %s",
862-
status,
863-
exc_info=True)
861+
logger.exception("Exception in getting union job list. status %s",
862+
status,
863+
exc_info=True)
864864
finally:
865865
if cursor is not None:
866866
cursor.close()
@@ -1033,7 +1033,7 @@ def GetActiveJobList(self):
10331033
record["jobStatus"] = jobStatus
10341034
ret.append(record)
10351035
except Exception as e:
1036-
logger.error('GetActiveJobList Exception: %s', str(e))
1036+
logger.exception('GetActiveJobList Exception: %s', str(e))
10371037
self.conn.commit()
10381038
cursor.close()
10391039
return ret
@@ -1086,7 +1086,7 @@ def GetJobV2(self, jobId):
10861086
ret.append(record)
10871087
self.conn.commit()
10881088
except Exception as e:
1089-
logger.error('GetJobV2 Exception: %s', str(e))
1089+
logger.exception('GetJobV2 Exception: %s', str(e))
10901090
finally:
10911091
if cursor is not None:
10921092
cursor.close()
@@ -1295,7 +1295,7 @@ def GetJobTextField(self, jobId, field):
12951295
for (jobId, value) in cursor:
12961296
ret = value
12971297
except Exception as e:
1298-
logger.error('Exception: %s', str(e))
1298+
logger.exception('Exception: %s', str(e))
12991299
pass
13001300
self.conn.commit()
13011301
cursor.close()
@@ -1321,7 +1321,7 @@ def GetJobTextFields(self, jobId, fields):
13211321
ret = dict(list(zip(columns, item)))
13221322
self.conn.commit()
13231323
except Exception as e:
1324-
logger.error('GetJobTextFields Exception: %s', str(e))
1324+
logger.exception('GetJobTextFields Exception: %s', str(e))
13251325
finally:
13261326
if cursor is not None:
13271327
cursor.close()
@@ -1362,7 +1362,7 @@ def UpdateClusterStatus(self, clusterStatus):
13621362
cursor.close()
13631363
return True
13641364
except Exception as e:
1365-
logger.error('Exception: %s', str(e))
1365+
logger.exception('Exception: %s', str(e))
13661366
return False
13671367

13681368
@record
@@ -1378,7 +1378,7 @@ def GetClusterStatus(self):
13781378
ret = json.loads(base64decode(value))
13791379
time = t
13801380
except Exception as e:
1381-
logger.error('GetClusterStatus Exception: %s', str(e))
1381+
logger.exception('GetClusterStatus Exception: %s', str(e))
13821382
self.conn.commit()
13831383
cursor.close()
13841384
return ret, time
@@ -1394,7 +1394,7 @@ def GetUsers(self):
13941394
for (identityName, uid, public_key, private_key) in cursor:
13951395
ret.append((identityName, uid, public_key, private_key))
13961396
except Exception as e:
1397-
logger.error('Exception: %s', str(e))
1397+
logger.exception('Exception: %s', str(e))
13981398
self.conn.commit()
13991399
cursor.close()
14001400
return ret
@@ -1452,7 +1452,7 @@ def UpdateTemplate(self, name, scope, json):
14521452
cursor.close()
14531453
return True
14541454
except Exception as e:
1455-
logger.error('Exception: %s', str(e))
1455+
logger.exception('Exception: %s', str(e))
14561456
return False
14571457

14581458
@record
@@ -1465,7 +1465,7 @@ def DeleteTemplate(self, name, scope):
14651465
cursor.close()
14661466
return True
14671467
except Exception as e:
1468-
logger.error('Exception: %s', str(e))
1468+
logger.exception('Exception: %s', str(e))
14691469
return False
14701470

14711471
@record
@@ -1598,14 +1598,17 @@ def count_rows(self, table):
15981598

15991599
self.conn.commit()
16001600
except:
1601-
logger.error("Exception in counting rows for table %s", table)
1601+
logger.exception("Exception in counting rows for table %s", table)
16021602
finally:
16031603
if cursor is not None:
16041604
cursor.close()
16051605
return ret
16061606

1607-
def delete_rows_from_table_older_than_days(self, table, days_ago,
1608-
col="time", cond=None):
1607+
def delete_rows_from_table_older_than_days(self,
1608+
table,
1609+
days_ago,
1610+
col="time",
1611+
cond=None):
16091612
cursor = None
16101613
ret = False
16111614
try:
@@ -1623,7 +1626,7 @@ def delete_rows_from_table_older_than_days(self, table, days_ago,
16231626
self.conn.commit()
16241627
ret = True
16251628
except:
1626-
logger.error(
1629+
logger.exception(
16271630
"Exception in deleting rows older than %s in col %s "
16281631
"for table %s", days_ago, col, table)
16291632
finally:

0 commit comments

Comments
 (0)