Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- TEST_NAME: "Backward Compatibility"
ARGS: CLIENT_INSTALLATION_BRANCH=rel-v8r0 PILOT_INSTALLATION_BRANCH=rel-v8r0
- TEST_NAME: "Test DiracX latest"
ARGS: TEST_DIRACX=Yes
ARGS: TEST_DIRACX=Yes --diracx-dist-dir $GITHUB_WORKSPACE/diracx-dist

steps:
- uses: actions/checkout@v4
Expand All @@ -57,7 +57,22 @@ jobs:
packaging \
pyyaml \
requests \
typer
typer \
build
- name: Building wheels
run: |
# Clone diracx
git clone --single-branch --branch robin-pilot-registrations https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx

# Create dist dir
mkdir -p $GITHUB_WORKSPACE/diracx-dist

# Building diracx
for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do
echo "Building $pkg_dir"
python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir
done

- name: Prepare environment
run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }}
- name: Install server
Expand Down
57 changes: 0 additions & 57 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,63 +380,6 @@ def getJobPilotOutput(self, jobID, directory=""):
self.log.always(f"Outputs retrieved in {outputPath}")
return result

#############################################################################
def getPilotOutput(self, gridReference, directory=""):
"""Retrieve the pilot output (std.out and std.err) for an existing pilot reference.

>>> gLogger.notice(dirac.getJobPilotOutput(12345))
{'OK': True, 'Value': {}}

:param str gridReference: pilot reference
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not isinstance(gridReference, str):
return self._errorReport("Expected string for pilot reference")

if not directory:
directory = self.currentDir

if not os.path.exists(directory):
return self._errorReport(f"Directory {directory} does not exist")

result = PilotManagerClient().getPilotOutput(gridReference)
if not result["OK"]:
return result

gridReferenceSmall = gridReference.split("/")[-1]
if not gridReferenceSmall:
gridReferenceSmall = "reference"
outputPath = f"{directory}/pilot_{gridReferenceSmall}"

if os.path.exists(outputPath):
self.log.info(f"Remove {outputPath} and retry to continue")
return S_ERROR(f"Remove {outputPath} and retry to continue")

if not os.path.exists(outputPath):
self.log.verbose(f"Creating directory {outputPath}")
os.mkdir(outputPath)

outputs = result["Value"]
if "StdOut" in outputs:
stdout = f"{outputPath}/std.out"
with open(stdout, "w") as fopen:
fopen.write(outputs["StdOut"])
self.log.info(f"Standard output written to {stdout}")
else:
self.log.warn("No standard output returned")

if "StdErr" in outputs:
stderr = f"{outputPath}/std.err"
with open(stderr, "w") as fopen:
fopen.write(outputs["StdErr"])
self.log.info(f"Standard error written to {stderr}")
else:
self.log.warn("No standard error returned")

self.log.always(f"Outputs retrieved in {outputPath}")
return result

#############################################################################
def getPilotInfo(self, gridReference):
"""Retrieve info relative to a pilot reference
Expand Down
16 changes: 2 additions & 14 deletions src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,10 @@ def main():
_, args = Script.parseCommandLine(ignoreErrors=True)

from DIRAC import exit as DIRACExit
from DIRAC.Interfaces.API.DiracAdmin import DiracAdmin

diracAdmin = DiracAdmin()
exitCode = 0
errorList = []
print("This command is not supported anymore with DIRAV V9.")

for gridID in args:
result = diracAdmin.getPilotOutput(gridID)
if not result["OK"]:
errorList.append((gridID, result["Message"]))
exitCode = 2

for error in errorList:
print("ERROR %s: %s" % error)

DIRACExit(exitCode)
DIRACExit(0)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
from DIRAC.Core.Base.Client import Client, createClient


from DIRAC.WorkloadManagementSystem.FutureClient.PilotManagerClient import (
PilotManagerClient as futurePilotManagerClient,
)


@createClient("WorkloadManagement/PilotManager")
class PilotManagerClient(Client):
"""PilotManagerClient sets url for the PilotManagerHandler."""

diracxClient = futurePilotManagerClient

def __init__(self, url=None, **kwargs):
"""
Sets URL for PilotManager handler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient


class PilotManagerClient(FutureClient):
def get_pilot_stamps_from_refs(self, pilot_references) -> list[str]:
with DiracXClient() as api:
search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}]
pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore

return [pilot["PilotStamp"] for pilot in pilots]

@convertToReturnValue
def addPilotReferences(self, pilot_references, VO, gridType="DIRAC", pilot_stamps_dict={}):
with DiracXClient() as api:
pilot_stamps = [pilot_stamps_dict.get(ref, ref) for ref in pilot_references]
pilot_ref_dict = dict(zip(pilot_stamps, pilot_references))

# We will move toward a stamp as identifier for the pilot
return api.pilots.add_pilot_stamps(
pilot_stamps=pilot_stamps, vo=VO, grid_type=gridType, pilot_references=pilot_ref_dict
)

def set_pilot_field(self, pilot_stamp, values_dict):
with DiracXClient() as api:
values_dict["PilotStamp"] = pilot_stamp
return api.pilots.update_pilot_fields(pilot_stamps_to_fields_mapping=[values_dict]) # type: ignore

@convertToReturnValue
def setPilotStatus(self, pilot_reference, status, destination=None, reason=None, grid_site=None, queue=None):
# Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs)
pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference])
pilot_stamp = pilot_stamps[0] # We might raise an error. This is so that we spot the error

return self.set_pilot_field(
pilot_stamp,
{
"Status": status,
"DestinationSite": destination,
"StatusReason": reason,
"GridSite": grid_site,
"Queue": queue,
},
)

@convertToReturnValue
def deletePilot(self, pilot_reference):
# Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs)
pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference])
# We don't want to raise an error.
if not pilot_stamps:
return None
pilot_stamp = pilot_stamps[0]

with DiracXClient() as api:
pilot_stamps = [pilot_stamp]
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps)

@convertToReturnValue
def getJobsForPilotByStamp(self, pilotStamp):
with DiracXClient() as api:
return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp)

@convertToReturnValue
def getPilots(self, job_id):
with DiracXClient() as api:
pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id)
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore

@convertToReturnValue
def getPilotInfo(self, pilot_reference):
"""Important: We assume that to one stamp is mapped one pilot."""
with DiracXClient() as api:
search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}]
pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore

if not pilot:
# Return an error as in the legacy code
return []

# Convert all bools in pilot to str
for k, v in pilot.items():
if isinstance(v, bool):
pilot[k] = str(v)

# Transform the list of pilots into a dict keyed by PilotJobReference
resDict = {}

pilotRef = pilot.get("PilotJobReference", None)
assert pilot_reference == pilotRef
pilotStamp = pilot.get("PilotStamp", None)

if pilotRef is not None:
resDict[pilotRef] = pilot
else:
# Fallback: use PilotStamp or another key if PilotJobReference is missing
resDict[pilotStamp] = pilot

jobIDs = self.getJobsForPilotByStamp(pilotStamp)
if jobIDs: # Only add if jobs exist
for pilotRef, pilotInfo in resDict.items():
pilotInfo["Jobs"] = jobIDs # Attach the entire list

return resDict

@convertToReturnValue
def getGroupedPilotSummary(self, column_list):
with DiracXClient() as api:
return api.pilots.summary(grouping=column_list)

@convertToReturnValue
def getPilotSummary(self, startdate="", enddate=""):
with DiracXClient() as api:
search_filters = []
if startdate:
search_filters.append({"parameter": "SubmissionTime", "operator": "gt", "value": startdate})
if enddate:
search_filters.append({"parameter": "SubmissionTime", "operator": "lt", "value": enddate})

rows = api.pilots.summary(grouping=["DestinationSite", "Status"], search=search_filters)

# Build nested result: { site: { status: count }, Total: { status: total_count } }
summary_dict = {"Total": {}}
for row in rows:
site = row["DestinationSite"]
status = row["Status"]
count = row["count"]

if site not in summary_dict:
summary_dict[site] = {}

summary_dict[site][status] = count
summary_dict["Total"].setdefault(status, 0)
summary_dict["Total"][status] += count

return summary_dict

@convertToReturnValue
def deletePilots(self, pilot_references):
pilot_ids = []
if pilot_references and isinstance(pilot_references, list):
if isinstance(pilot_references[0], int):
# Multiple elements (int)
pilot_ids = pilot_references # Semantic
elif isinstance(pilot_references, int):
# Only one element (int)
pilot_ids = [pilot_references]
elif isinstance(pilot_references, str):
# Only one element (str)
pilot_references = [pilot_references]
# Else: pilot_stamps should be list[str] (or the input is random)

pilot_stamps = []

# Used by no one, but we won't raise `UnimplementedError` because we still use it in tests.
with DiracXClient() as api:
search = []
if pilot_ids:
# If we have defined pilot_ids, then we have to change them to pilot_stamps
search = [{"parameter": "PilotID", "operator": "in", "values": pilot_ids}]
else:
# If we have defined pilot_ids, then we have to change them to pilot_stamps
search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}]

pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore
pilot_stamps = [pilot["PilotStamp"] for pilot in pilots]

if not pilot_stamps:
# Avoid useless requests
return None

return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore
Loading
Loading