Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,22 @@ jobs:
packaging \
pyyaml \
requests \
typer
typer \
build
- name: Building wheels
run: |
# Clone diracx
git clone --single-branch --branch robin-update-job-monitoring https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx

# Create dist dir
mkdir -p $GITHUB_WORKSPACE/diracx-dist

# Building diracx
for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do
echo "Building $pkg_dir"
python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir
done

- name: Prepare environment
run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }}
- name: Install server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.setServer("WorkloadManagement/JobMonitoring")

diracxClient = futureJobMonitoringClient

@ignoreEncodeWarning
def getJobsStatus(self, jobIDs):
res = self._getRPC().getJobsStatus(jobIDs)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue

from DIRAC.Core.Security.DiracX import DiracXClient
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL


class JobMonitoringClient:
def fetch(self, parameters, jobIDs):
if not isinstance(jobIDs, list):
jobIDs = [jobIDs]
with DiracXClient() as api:
jobs = api.jobs.search(
parameters=["JobID"] + parameters,
search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}],
search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}], # type: ignore
)
return {j["JobID"]: {param: j[param] for param in parameters} for j in jobs}

Expand All @@ -20,6 +23,185 @@ def getJobsMinorStatus(self, jobIDs):
def getJobsStates(self, jobIDs):
return self.fetch(["Status", "MinorStatus", "ApplicationStatus"], jobIDs)

@convertToReturnValue
def getJobsApplicationStatus(self, jobIDs):
return self.fetch(["ApplicationStatus"], jobIDs)

@convertToReturnValue
def getJobsSites(self, jobIDs):
return self.fetch(["Site"], jobIDs)

@convertToReturnValue
def getJobsStatus(self, jobIDs):
return self.fetch(["Status"], jobIDs)

@convertToReturnValue
def getJobParameter(self, jobID, parName=""):
with DiracXClient() as api:
par = [] if not parName else [parName]
return api.jobs.get_job_parameters(job_id=jobID, parameters=[par])

# @convertToReturnValue
# def getJobParameters(self, jobIDs, parName=""):
# with DiracXClient() as api:
# par = [] if not parName else [parName]
# return api.jobs.get_job_parameters(job_id=jobID, parameters=[parName])

@convertToReturnValue
def getInputData(self, jobIDs):
with DiracXClient() as api:
# It's a mess
if isinstance(jobIDs, int):
jobIDs = [jobIDs]
elif isinstance(jobIDs, str):
jobIDs = [int(jobIDs)]
elif isinstance(jobIDs, list):
if isinstance(jobIDs[0], str):
jobIDs = [int(jobID) for jobID in jobIDs]

reqDict = {}

for jobID in jobIDs:
reqDict[jobID] = api.jobs.get_input_data(jobID) # type: ignore

def cleanLFN(data):
lfn = data["LFN"]
if lfn.lower().startswith("lfn:"):
return lfn[4:]
return lfn

# WARNING /!\: Old code is super bizarre.
# It can return list as well as dict...
# By default, it's better to return a dict.

return_dict = {}
for data in reqDict:
job_id = data["JobID"]
if not job_id in return_dict:
return_dict[job_id] = []

return_dict[job_id].append(cleanLFN(data))

return return_dict

def convert_condDict_to_searchSpec(self, condDict):
"""
Convert condDict to list of SearchSpec dicts.

- If key is str: one SearchSpec
- If key is tuple: multiple SearchSpec (one per key in tuple)
- If value is dict: skip this condDict entry
- list value -> operator 'in'
- scalar value -> operator 'eq'
"""
search_list = []

if not condDict:
return search_list

for key, val in condDict.items():
if isinstance(val, dict):
# abandon this condition entry
continue

# Normalize keys: if tuple, treat each element as a separate key
keys = [key] if isinstance(key, str) else list(key)

for k in keys:
if not isinstance(k, str):
# skip non-str keys silently (or raise error if you want)
continue

if isinstance(val, list):
search_list.append({"parameter": k, "operator": "in", "values": val})
else:
search_list.append({"parameter": k, "operator": "eq", "value": val})

return search_list

@convertToReturnValue
def getJobs(self, attrDict={}, cutDate=None):
# Get job >>>IDS<<<
with DiracXClient() as api:
search = self.convert_condDict_to_searchSpec(attrDict)

if cutDate:
# TODO: Verify the date format!
search.append({"parameter": "LastUpdateTime", "operator": "gt", "value": str(cutDate)})

jobs = api.jobs.search(parameters=["JobID"], search=search)

# Returns a **string**??, cf test_JobStateUpdateAndJobMonitoringMultiple
return [str(job["JobID"]) for job in jobs]

@convertToReturnValue
def getJobJDL(self, jobID, original):
with DiracXClient() as api:
jdl = api.jobs.get_job_jdl(job_id=jobID)

if not jdl:
return ""

if original:
return extractJDL(jdl.original_jdl)

return extractJDL(jdl.jdl)

@convertToReturnValue
def getJobLoggingInfo(self, jobID):
fields = ["DateTime", "Source", "Status", "MinorStatus", "ApplicationStatus"] # CF Dirac.py

jobs = self.fetch(parameters=["LoggingInfo"] + fields, jobIDs=[jobID])

# Normally, only one job
if len(jobs) == 0:
return []
job = jobs[0]

# Rearrange into a list, CF Dirac.py
res = []
for field in fields:
res.append(job.get(field, ""))

return res

@convertToReturnValue
def getJobSummary(self, jobID):
with DiracXClient() as api:
# TODO: Fix diracx client...
jobs = api.jobs.summary(
grouping=[], search=[{"parameter": "JobID", "operator": "eq", "value": jobID}] # type: ignore
)

if jobs:
return jobs[0]
return []

@convertToReturnValue
def getJobsSummary(self, jobIDs):
return self.fetch(parameters=[], jobIDs=jobIDs)

@convertToReturnValue
def getJobAttributes(self, jobID, attrList=None):
if not attrList:
attrList = []

return self.fetch(parameters=attrList, jobIDs=[jobID])

@convertToReturnValue
def getJobAttribute(self, jobID, attribute):
return self.fetch(parameters=[attribute], jobIDs=[jobID])

@convertToReturnValue
def getJobHeartBeatData(self, jobID):
with DiracXClient() as api:
res = api.jobs.get_job_heartbeat_info(job_id=jobID)

result = []
for row in res:
name = row["Name"]
value = str(row["Value"])
heartbeattime = row["HeartBeatTime"]
# ('name', '"0.1"', 'time')
# :)
result.append((str(name), "%.01f" % (float(value.replace('"', ""))), str(heartbeattime)))
Loading