Skip to content

Commit 4101a16

Browse files
committed
test: adding tests for Jobs summary tables
1 parent 27b77aa commit 4101a16

File tree

2 files changed

+106
-4
lines changed

2 files changed

+106
-4
lines changed

src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@ CREATE TABLE `JobsHistorySummary` (
155155
`JobCount` INT,
156156
RescheduleSum INT,
157157
UNIQUE KEY uq_summary (
158-
`Status`,
159-
`Site`,
160-
`Owner`,
161-
`OwnerGroup`(32),
158+
`Status`,
159+
`Site`,
160+
`Owner`,
161+
`OwnerGroup`(32),
162162
`VO`,
163163
`JobGroup`,
164164
`JobType`,

tests/Integration/WorkloadManagementSystem/Test_JobDB.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
# pylint: disable=wrong-import-position, missing-docstring
88

9+
import csv
910
from datetime import datetime, timedelta
1011
from unittest.mock import MagicMock, patch
1112

@@ -443,3 +444,104 @@ def test_attributes(jobDB):
443444
res = jobDB.getJobsAttributes([jobID_1, jobID_2], ["Status"])
444445
assert res["OK"], res["Message"]
445446
assert res["Value"] == {jobID_1: {"Status": JobStatus.DONE}, jobID_2: {"Status": JobStatus.RUNNING}}
447+
448+
449+
# Parse date strings into datetime objects
450+
def process_data(jobIDs, data):
451+
converted_data = []
452+
453+
full_data = []
454+
455+
for j, d in zip(jobIDs, data):
456+
row = list(d)
457+
row.insert(0, j) # Insert JobID at the beginning of the row
458+
full_data.append(row)
459+
460+
for row in full_data:
461+
# date fields
462+
date_indices = [8, 9, 10, 11, 12, 13] # Positions of date fields
463+
for i in date_indices:
464+
if not row[i]:
465+
row[i] = None
466+
else:
467+
try:
468+
row[i] = datetime.strptime(row[i], "%Y-%m-%d %H:%M:%S")
469+
except ValueError:
470+
# Handle invalid dates
471+
row[i] = None
472+
# Convert other fields to appropriate types
473+
int_indices = [17, 18] # Positions of integer fields
474+
for i in int_indices:
475+
if not row[i]:
476+
row[i] = 0
477+
else:
478+
try:
479+
row[i] = int(row[i])
480+
except ValueError:
481+
# Handle invalid integers
482+
row[i] = 0
483+
converted_data.append(tuple(row))
484+
return converted_data
485+
486+
487+
def test_summarySnapshot():
488+
# first delete all jobs
489+
jobDB = JobDB()
490+
for table in [
491+
"InputData",
492+
"JobParameters",
493+
"AtticJobParameters",
494+
"HeartBeatLoggingInfo",
495+
"OptimizerParameters",
496+
"JobCommands",
497+
"Jobs",
498+
"JobJDLs",
499+
]:
500+
sqlCmd = f"DELETE from `{table}`"
501+
jobDB._update(sqlCmd)
502+
sql = "DELETE FROM JobsHistorySummary"
503+
res = jobDB._update(sql)
504+
assert res["OK"], res["Message"]
505+
506+
# insert some predefined jobs to test the summary snapshot
507+
with open("jobs.csv", newline="", encoding="utf-8") as csvfile:
508+
csvreader = csv.reader(csvfile)
509+
data = list(csvreader)
510+
511+
# First inserting the JDLs
512+
jdlData = [(jdl, "", "")] * len(data)
513+
res = jobDB._updatemany("INSERT INTO JobJDLs (JDL, JobRequirements, OriginalJDL) VALUES (%s,%s,%s)", jdlData)
514+
assert res["OK"], res["Message"]
515+
# Getting which JobIDs were inserted
516+
res = jobDB._query("SELECT JobID FROM JobJDLs")
517+
assert res["OK"], res["Message"]
518+
jobIDs = [row[0] for row in res["Value"]][0 : len(data)]
519+
520+
# Now inserting the jobs
521+
processed_data = process_data(jobIDs, data)
522+
placeholders = ",".join(["%s"] * len(processed_data[0]))
523+
sql = f"INSERT INTO Jobs (JobID, JobType, JobGroup, Site, JobName, Owner, OwnerGroup, VO, SubmissionTime, RescheduleTime, LastUpdateTime, StartExecTime, HeartBeatTime, EndExecTime, Status, MinorStatus, ApplicationStatus, UserPriority, RescheduleCounter, VerifiedFlag, AccountedFlag) VALUES ({placeholders})"
524+
res = jobDB._updatemany(sql, processed_data)
525+
assert res["OK"], res["Message"]
526+
527+
requestedFields = [
528+
"Status",
529+
"MinorStatus",
530+
"Site",
531+
"Owner",
532+
"OwnerGroup",
533+
"VO",
534+
"JobGroup",
535+
"JobType",
536+
"ApplicationStatus",
537+
]
538+
defString = ", ".join(requestedFields)
539+
540+
# Check it corresponds to the basic "GROUP BY" query
541+
simple_query = f"SELECT {defString}, COUNT(JobID) AS JobCount, SUM(RescheduleCounter) AS RescheduleSum FROM Jobs GROUP BY {defString} ORDER BY {defString};"
542+
res_sq = jobDB._query(simple_query)
543+
assert res_sq["OK"], res_sq["Message"]
544+
sql = f"SELECT {defString}, JobCount, RescheduleSum FROM JobsHistorySummary ORDER BY {defString};"
545+
result_summary = jobDB._query(sql)
546+
assert result_summary["OK"], result_summary["Message"]
547+
assert sorted(res_sq["Value"]) == sorted(result_summary["Value"])

0 commit comments

Comments
 (0)