-
Notifications
You must be signed in to change notification settings - Fork 183
[9.0] feat: Migrating pilot client toward DiracX #8233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Robin-Van-de-Merghel
wants to merge
3
commits into
DIRACGrid:integration
Choose a base branch
from
Robin-Van-de-Merghel:robin-migrate-client
base: integration
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
173 changes: 173 additions & 0 deletions
173
src/DIRAC/WorkloadManagementSystem/FutureClient/PilotManagerClient.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue | ||
from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient | ||
|
||
|
||
class PilotManagerClient(FutureClient): | ||
def get_pilot_stamps_from_refs(self, pilot_references) -> list[str]: | ||
with DiracXClient() as api: | ||
search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}] | ||
pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore | ||
|
||
return [pilot["PilotStamp"] for pilot in pilots] | ||
|
||
@convertToReturnValue | ||
def addPilotReferences(self, pilot_references, VO, gridType="DIRAC", pilot_stamps_dict={}): | ||
with DiracXClient() as api: | ||
pilot_stamps = [pilot_stamps_dict.get(ref, ref) for ref in pilot_references] | ||
pilot_ref_dict = dict(zip(pilot_stamps, pilot_references)) | ||
|
||
# We will move toward a stamp as identifier for the pilot | ||
return api.pilots.add_pilot_stamps( | ||
pilot_stamps=pilot_stamps, vo=VO, grid_type=gridType, pilot_references=pilot_ref_dict | ||
) | ||
|
||
def set_pilot_field(self, pilot_stamp, values_dict): | ||
with DiracXClient() as api: | ||
values_dict["PilotStamp"] = pilot_stamp | ||
return api.pilots.update_pilot_fields(pilot_stamps_to_fields_mapping=[values_dict]) # type: ignore | ||
|
||
@convertToReturnValue | ||
def setPilotStatus(self, pilot_reference, status, destination=None, reason=None, grid_site=None, queue=None): | ||
# Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs) | ||
pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference]) | ||
pilot_stamp = pilot_stamps[0] # We might raise an error. This is so that we spot the error | ||
|
||
return self.set_pilot_field( | ||
pilot_stamp, | ||
{ | ||
"Status": status, | ||
"DestinationSite": destination, | ||
"StatusReason": reason, | ||
"GridSite": grid_site, | ||
"Queue": queue, | ||
}, | ||
) | ||
|
||
@convertToReturnValue | ||
def deletePilot(self, pilot_reference): | ||
# Translate ref to stamp (DiracX relies on stamps whereas DIRAC relies on refs) | ||
pilot_stamps = self.get_pilot_stamps_from_refs([pilot_reference]) | ||
# We don't want to raise an error. | ||
if not pilot_stamps: | ||
return None | ||
pilot_stamp = pilot_stamps[0] | ||
|
||
with DiracXClient() as api: | ||
pilot_stamps = [pilot_stamp] | ||
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) | ||
|
||
@convertToReturnValue | ||
def getJobsForPilotByStamp(self, pilotStamp): | ||
with DiracXClient() as api: | ||
return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp) | ||
|
||
@convertToReturnValue | ||
def getPilots(self, job_id): | ||
with DiracXClient() as api: | ||
pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id) | ||
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}] | ||
return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore | ||
|
||
@convertToReturnValue | ||
def getPilotInfo(self, pilot_reference): | ||
"""Important: We assume that to one stamp is mapped one pilot.""" | ||
with DiracXClient() as api: | ||
search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}] | ||
pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore | ||
|
||
if not pilot: | ||
# Return an error as in the legacy code | ||
return [] | ||
|
||
# Convert all bools in pilot to str | ||
for k, v in pilot.items(): | ||
if isinstance(v, bool): | ||
pilot[k] = str(v) | ||
|
||
# Transform the list of pilots into a dict keyed by PilotJobReference | ||
resDict = {} | ||
|
||
pilotRef = pilot.get("PilotJobReference", None) | ||
assert pilot_reference == pilotRef | ||
pilotStamp = pilot.get("PilotStamp", None) | ||
|
||
if pilotRef is not None: | ||
resDict[pilotRef] = pilot | ||
else: | ||
# Fallback: use PilotStamp or another key if PilotJobReference is missing | ||
resDict[pilotStamp] = pilot | ||
|
||
jobIDs = self.getJobsForPilotByStamp(pilotStamp) | ||
if jobIDs: # Only add if jobs exist | ||
for pilotRef, pilotInfo in resDict.items(): | ||
pilotInfo["Jobs"] = jobIDs # Attach the entire list | ||
|
||
return resDict | ||
|
||
@convertToReturnValue | ||
def getGroupedPilotSummary(self, column_list): | ||
with DiracXClient() as api: | ||
return api.pilots.summary(grouping=column_list) | ||
|
||
@convertToReturnValue | ||
def getPilotSummary(self, startdate="", enddate=""): | ||
with DiracXClient() as api: | ||
search_filters = [] | ||
if startdate: | ||
search_filters.append({"parameter": "SubmissionTime", "operator": "gt", "value": startdate}) | ||
if enddate: | ||
search_filters.append({"parameter": "SubmissionTime", "operator": "lt", "value": enddate}) | ||
|
||
rows = api.pilots.summary(grouping=["DestinationSite", "Status"], search=search_filters) | ||
|
||
# Build nested result: { site: { status: count }, Total: { status: total_count } } | ||
summary_dict = {"Total": {}} | ||
for row in rows: | ||
site = row["DestinationSite"] | ||
status = row["Status"] | ||
count = row["count"] | ||
|
||
if site not in summary_dict: | ||
summary_dict[site] = {} | ||
|
||
summary_dict[site][status] = count | ||
summary_dict["Total"].setdefault(status, 0) | ||
summary_dict["Total"][status] += count | ||
|
||
return summary_dict | ||
|
||
@convertToReturnValue | ||
def deletePilots(self, pilot_references): | ||
pilot_ids = [] | ||
if pilot_references and isinstance(pilot_references, list): | ||
if isinstance(pilot_references[0], int): | ||
# Multiple elements (int) | ||
pilot_ids = pilot_references # Semantic | ||
elif isinstance(pilot_references, int): | ||
# Only one element (int) | ||
pilot_ids = [pilot_references] | ||
elif isinstance(pilot_references, str): | ||
# Only one element (str) | ||
pilot_references = [pilot_references] | ||
# Else: pilot_stamps should be list[str] (or the input is random) | ||
|
||
pilot_stamps = [] | ||
|
||
# Used by no one, but we won't raise `UnimplementedError` because we still use it in tests. | ||
with DiracXClient() as api: | ||
search = [] | ||
if pilot_ids: | ||
# If we have defined pilot_ids, then we have to change them to pilot_stamps | ||
search = [{"parameter": "PilotID", "operator": "in", "values": pilot_ids}] | ||
else: | ||
# If we have defined pilot_ids, then we have to change them to pilot_stamps | ||
search = [{"parameter": "PilotJobReference", "operator": "in", "values": pilot_references}] | ||
|
||
pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore | ||
pilot_stamps = [pilot["PilotStamp"] for pilot in pilots] | ||
|
||
if not pilot_stamps: | ||
# Avoid useless requests | ||
return None | ||
|
||
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.