Skip to content

Commit 1909dcc

Browse files
authored
Merge pull request DIRACGrid#7905 from fstagni/cherry-pick-2-ec0126231-integration
[sweep:integration] adding possibility to bulk insert in JobLoggingDB
2 parents 6f18538 + 1168ca0 commit 1909dcc

File tree

5 files changed

+127
-26
lines changed

5 files changed

+127
-26
lines changed

src/DIRAC/Core/Utilities/MySQL.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -529,20 +529,19 @@ def __del__(self):
529529
except Exception:
530530
pass
531531

532-
def _except(self, methodName, x, err, cmd="", print=True):
532+
def _except(self, methodName, x, err, cmd="", debug=True):
533533
"""
534534
print MySQL error or exception
535535
return S_ERROR with Exception
536536
"""
537-
538537
try:
539538
raise x
540539
except MySQLdb.Error as e:
541-
if print:
540+
if debug:
542541
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1]))
543542
return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1]))
544543
except Exception as e:
545-
if print:
544+
if debug:
546545
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", repr(e))
547546
return S_ERROR(DErrno.EMYSQL, f"{err}: ({repr(e)})")
548547

@@ -757,8 +756,8 @@ def _update(self, cmd, *, conn=None, debug=True):
757756
758757
:param debug: print or not the errors
759758
760-
return S_OK with number of updated registers upon success
761-
return S_ERROR upon error
759+
:return: S_OK with number of updated registers upon success.
760+
S_ERROR upon error.
762761
"""
763762

764763
self.log.debug(f"_update: {self._safeCmd(cmd)}")
@@ -786,6 +785,41 @@ def _update(self, cmd, *, conn=None, debug=True):
786785

787786
return retDict
788787

788+
@captureOptimizerTraces
789+
def _updatemany(self, cmd, data, *, conn=None, debug=True):
790+
"""execute MySQL updatemany command
791+
792+
:param debug: print or not the errors
793+
794+
:return: S_OK with number of updated registers upon success.
795+
S_ERROR upon error.
796+
"""
797+
798+
self.log.debug(f"_updatemany: {self._safeCmd(cmd)}")
799+
if conn:
800+
connection = conn
801+
else:
802+
retDict = self._getConnection()
803+
if not retDict["OK"]:
804+
return retDict
805+
connection = retDict["Value"]
806+
807+
try:
808+
cursor = connection.cursor()
809+
res = cursor.executemany(cmd, data)
810+
retDict = S_OK(res)
811+
if cursor.lastrowid:
812+
retDict["lastRowId"] = cursor.lastrowid
813+
except Exception as x:
814+
retDict = self._except("_updatemany", x, "Execution failed.", cmd, debug)
815+
816+
try:
817+
cursor.close()
818+
except Exception:
819+
pass
820+
821+
return retDict
822+
789823
def _transaction(self, cmdList, conn=None):
790824
"""dummy transaction support
791825

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,9 @@ def setJobAttribute(self, jobID, attrName, attrValue, update=False, myDate=None,
449449
:return: S_OK/S_ERROR
450450
"""
451451

452+
if not jobID:
453+
return S_OK()
454+
452455
if attrName not in self.jobAttributeNames:
453456
return S_ERROR(EWMSJMAN, "Request to set non-existing job attribute")
454457

@@ -506,6 +509,9 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
506509
:return: S_OK/S_ERROR
507510
"""
508511

512+
if not jobID:
513+
return S_OK()
514+
509515
jobIDList = jobID
510516
if not isinstance(jobID, (list, tuple)):
511517
jobIDList = [jobID]

src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,61 @@ def addLoggingRecord(
4646
event = f"status/minor/app={status}/{minorStatus}/{applicationStatus}"
4747
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)
4848

49-
try:
49+
def _get_date(date):
50+
# We need to specify that timezone is UTC because otherwise timestamp
51+
# assumes local time while we mean UTC.
5052
if not date:
51-
# Make the UTC datetime string and float
52-
_date = datetime.datetime.utcnow()
53+
# Make the UTC datetime
54+
return datetime.datetime.utcnow()
5355
elif isinstance(date, str):
5456
# The date is provided as a string in UTC
55-
_date = TimeUtilities.fromString(date)
57+
return TimeUtilities.fromString(date)
5658
elif isinstance(date, datetime.datetime):
57-
_date = date
59+
return date
60+
else:
61+
raise Exception("Incorrect date for the logging record")
62+
63+
try:
64+
if isinstance(date, list):
65+
_date = []
66+
for d in date:
67+
try:
68+
_date.append(_get_date(d))
69+
except Exception:
70+
self.log.exception("Exception while date evaluation")
71+
_date.append(datetime.datetime.utcnow())
5872
else:
59-
self.log.error("Incorrect date for the logging record")
60-
_date = datetime.datetime.utcnow()
73+
_date = _get_date(date)
6174
except Exception:
6275
self.log.exception("Exception while date evaluation")
63-
_date = datetime.datetime.utcnow()
64-
65-
# We need to specify that timezone is UTC because otherwise timestamp
66-
# assumes local time while we mean UTC.
67-
epoc = _date.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
76+
_date = [datetime.datetime.utcnow()]
6877

6978
cmd = (
7079
"INSERT INTO LoggingInfo (JobId, Status, MinorStatus, ApplicationStatus, "
71-
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES (%d,'%s','%s','%s','%s',%f,'%s')"
72-
% (int(jobID), status, minorStatus, applicationStatus[:255], str(_date), epoc, source[:32])
80+
+ "StatusTime, StatusTimeOrder, StatusSource) VALUES "
7381
)
7482

75-
return self._update(cmd)
83+
if not isinstance(jobID, list):
84+
jobID = [jobID]
85+
86+
if isinstance(status, str):
87+
status = [status] * len(jobID)
88+
if isinstance(minorStatus, str):
89+
minorStatus = [minorStatus] * len(jobID)
90+
if isinstance(applicationStatus, str):
91+
applicationStatus = [applicationStatus[:255]] * len(jobID)
92+
if isinstance(_date, datetime.datetime):
93+
_date = [_date] * len(jobID)
94+
95+
epocs = []
96+
for dt in _date:
97+
epoc = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - MAGIC_EPOC_NUMBER
98+
epocs.append(epoc)
99+
cmd = cmd + "(%s, %s, %s, %s, %s, %s, %s)"
100+
data = list(
101+
zip(jobID, status, minorStatus, applicationStatus, _date, epocs, [source[:32]] * len(jobID), strict=True)
102+
)
103+
return self._updatemany(cmd, data)
76104

77105
#############################################################################
78106
def getJobLoggingInfo(self, jobID):

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ def export_submitJob(self, jobDesc):
169169
jobDescList = [jobDesc]
170170

171171
jobIDList = []
172+
statusList = []
173+
minorStatusList = []
174+
timeStampList = []
172175

173176
if parametricJob:
174177
initialStatus = JobStatus.SUBMITTING
@@ -206,13 +209,25 @@ def export_submitJob(self, jobDesc):
206209
return result
207210

208211
jobID = result["JobID"]
209-
self.log.info(f'Job added to the JobDB", "{jobID} for {self.owner}/{self.ownerGroup}')
210-
211-
self.jobLoggingDB.addLoggingRecord(
212-
jobID, result["Status"], result["MinorStatus"], date=result["TimeStamp"], source="JobManager"
213-
)
212+
self.log.info(f"Job added to the JobDB", f"{jobID} for {self.owner}/{self.ownerGroup}")
214213

215214
jobIDList.append(jobID)
215+
statusList.append(result["Status"])
216+
minorStatusList.append(result["MinorStatus"])
217+
timeStampList.append(result["TimeStamp"])
218+
219+
# insert records in logging DB
220+
221+
# For parametric jobs I can insert logging records in a bulk
222+
if parametricJob and len(set(jobIDList)) == len(jobIDList):
223+
result = self.jobLoggingDB.addLoggingRecord(
224+
jobIDList, statusList, minorStatusList, date=timeStampList, source="JobManager"
225+
)
226+
else:
227+
for jobID, status, minorStatus, timeStamp in zip(jobIDList, statusList, minorStatusList, timeStampList):
228+
result = self.jobLoggingDB.addLoggingRecord(
229+
jobID, status, minorStatus, date=timeStamp, source="JobManager"
230+
)
216231

217232
if parametricJob:
218233
result = S_OK(jobIDList)

tests/Integration/WorkloadManagementSystem/Test_JobLoggingDB.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,22 @@ def test_JobStatus(jobLoggingDB: JobLoggingDB):
4646
result = jobLoggingDB.getWMSTimeStamps(1)
4747
assert result["OK"] is True, result["Message"]
4848

49+
now = datetime.datetime.utcnow()
50+
result = jobLoggingDB.addLoggingRecord(
51+
[2, 3, 4, 5],
52+
status=["testing", "testing", "testing", "testing"],
53+
minorStatus=["mn", "mn", "mn", "mn"],
54+
date=[now, now, now, now],
55+
source="Unittest",
56+
)
57+
assert result["OK"] is True, result["Message"]
58+
59+
result = jobLoggingDB.getJobLoggingInfo(2)
60+
assert result["OK"] is True, result["Message"]
61+
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")
62+
63+
result = jobLoggingDB.getJobLoggingInfo(5)
64+
assert result["OK"] is True, result["Message"]
65+
assert result["Value"][-1][0:3] == ("testing", "mn", "Unknown")
66+
4967
jobLoggingDB.deleteJob(1)

0 commit comments

Comments
 (0)