Skip to content

Commit 647abef

Browse files
feat: Migrate PilotManagerClient towards DiracX
1 parent bcc565d commit 647abef

File tree

7 files changed

+226
-167
lines changed

7 files changed

+226
-167
lines changed

.github/workflows/integration.yml

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
- TEST_NAME: "Backward Compatibility"
3939
ARGS: CLIENT_INSTALLATION_BRANCH=rel-v8r0 PILOT_INSTALLATION_BRANCH=rel-v8r0
4040
- TEST_NAME: "Test DiracX latest"
41-
ARGS: TEST_DIRACX=Yes
41+
ARGS: TEST_DIRACX=Yes --diracx-dist-dir $GITHUB_WORKSPACE/diracx-dist
4242

4343
steps:
4444
- uses: actions/checkout@v4
@@ -57,7 +57,22 @@ jobs:
5757
packaging \
5858
pyyaml \
5959
requests \
60-
typer
60+
typer \
61+
build
62+
- name: Building wheels
63+
run: |
64+
# Clone diracx
65+
git clone --single-branch --branch robin-pilot-management https://github.com/Robin-Van-de-Merghel/diracx.git $GITHUB_WORKSPACE/diracx
66+
67+
# Create dist dir
68+
mkdir -p $GITHUB_WORKSPACE/diracx-dist
69+
70+
# Building diracx
71+
for pkg_dir in $GITHUB_WORKSPACE/diracx/diracx-* $GITHUB_WORKSPACE/diracx; do
72+
echo "Building $pkg_dir"
73+
python -m build --outdir "$GITHUB_WORKSPACE/diracx-dist" $pkg_dir
74+
done
75+
6176
- name: Prepare environment
6277
run: ./integration_tests.py prepare-environment ${{ matrix.ARGS }}
6378
- name: Install server

src/DIRAC/WorkloadManagementSystem/Client/PilotManagerClient.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@
44
from DIRAC.Core.Base.Client import Client, createClient
55

66

7+
from DIRAC.WorkloadManagementSystem.FutureClient.PilotManagerClient import (
8+
PilotManagerClient as futurePilotManagerClient,
9+
)
10+
11+
712
@createClient("WorkloadManagement/PilotManager")
813
class PilotManagerClient(Client):
914
"""PilotManagerClient sets url for the PilotManagerHandler."""
1015

16+
diracxClient = futurePilotManagerClient
17+
1118
def __init__(self, url=None, **kwargs):
1219
"""
1320
Sets URL for PilotManager handler
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
2+
from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient
3+
4+
5+
class PilotManagerClient(FutureClient):
6+
@convertToReturnValue
7+
def addPilotReferences(self, pilot_references, VO, gridType="DIRAC", pilot_stamps_dict={}):
8+
with DiracXClient() as api:
9+
pilot_stamps = pilot_stamps_dict.values()
10+
pilot_ref_dict = dict(zip(pilot_stamps, pilot_references))
11+
12+
# We will move toward a stamp as identifier for the pilot
13+
return api.pilots.add_pilot_stamps(
14+
{"pilot_stamps": pilot_stamps, "vo": VO, "grid_type": gridType, "pilot_references": pilot_ref_dict} # type: ignore
15+
)
16+
17+
def set_pilot_field(self, pilot_stamp, values_dict):
18+
with DiracXClient() as api:
19+
values_dict["PilotStamp"] = pilot_stamp
20+
return api.pilots.update_pilot_fields({"pilot_stamps_to_fields_mapping": [values_dict]}) # type: ignore
21+
22+
@convertToReturnValue
23+
def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None):
24+
return self.set_pilot_field(
25+
pilot_stamp,
26+
{
27+
"Status": status,
28+
"DestinationSite": destination,
29+
"StatusReason": reason,
30+
"GridSite": grid_site,
31+
"Queue": queue,
32+
},
33+
)
34+
35+
@convertToReturnValue
36+
def deletePilot(self, pilot_stamp):
37+
with DiracXClient() as api:
38+
pilot_stamps = [pilot_stamp]
39+
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps)
40+
41+
@convertToReturnValue
42+
def getJobsForPilotByStamp(self, pilotStamp):
43+
with DiracXClient() as api:
44+
return api.pilots.get_pilot_jobs(pilot_stamp=pilotStamp)
45+
46+
@convertToReturnValue
47+
def getPilots(self, job_id):
48+
with DiracXClient() as api:
49+
pilot_ids = api.pilots.get_pilot_jobs(job_id=job_id)
50+
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
51+
return api.pilots.search(parameters=[], search=search, sort=[]) # type: ignore
52+
53+
@convertToReturnValue
54+
def getPilotInfo(self, pilot_reference):
55+
"""Important: We assume that to one stamp is mapped one pilot."""
56+
with DiracXClient() as api:
57+
search = [{"parameter": "PilotJobReference", "operator": "eq", "value": pilot_reference}]
58+
pilot = api.pilots.search(parameters=[], search=search, sort=[])[0] # type: ignore
59+
60+
if not pilot:
61+
# Return an error as in the legacy code
62+
return []
63+
64+
# Convert all bools in pilot to str
65+
for k, v in pilot.items():
66+
if isinstance(v, bool):
67+
pilot[k] = str(v)
68+
69+
# Transform the list of pilots into a dict keyed by PilotJobReference
70+
resDict = {}
71+
72+
pilotRef = pilot.get("PilotJobReference", None)
73+
assert pilot_reference == pilotRef
74+
pilotStamp = pilot.get("PilotStamp", None)
75+
76+
if pilotRef is not None:
77+
resDict[pilotRef] = pilot
78+
else:
79+
# Fallback: use PilotStamp or another key if PilotJobReference is missing
80+
resDict[pilotStamp] = pilot
81+
82+
jobIDs = self.getJobsForPilotByStamp(pilotStamp)
83+
if jobIDs: # Only add if jobs exist
84+
for pilotRef, pilotInfo in resDict.items():
85+
pilotInfo["Jobs"] = jobIDs # Attach the entire list
86+
87+
return resDict
88+
89+
@convertToReturnValue
90+
def getGroupedPilotSummary(self, column_list):
91+
with DiracXClient() as api:
92+
return api.pilots.summary(grouping=column_list)
93+
94+
@convertToReturnValue
95+
def getPilotSummary(self, startdate="", enddate=""):
96+
with DiracXClient() as api:
97+
search_filters = []
98+
if startdate:
99+
search_filters.append({"parameter": "SubmissionTime", "operator": "gt", "value": startdate})
100+
if enddate:
101+
search_filters.append({"parameter": "SubmissionTime", "operator": "lt", "value": enddate})
102+
103+
rows = api.pilots.summary(grouping=["DestinationSite", "Status"], search=search_filters)
104+
105+
# Build nested result: { site: { status: count }, Total: { status: total_count } }
106+
summary_dict = {"Total": {}}
107+
for row in rows:
108+
site = row["DestinationSite"]
109+
status = row["Status"]
110+
count = row["count"]
111+
112+
if site not in summary_dict:
113+
summary_dict[site] = {}
114+
115+
summary_dict[site][status] = count
116+
summary_dict["Total"].setdefault(status, 0)
117+
summary_dict["Total"][status] += count
118+
119+
return summary_dict
120+
121+
@convertToReturnValue
122+
def deletePilots(self, pilot_stamps):
123+
# Used by no one, but we won't raise `UnimplementedError` because we still use it in tests.
124+
with DiracXClient() as api:
125+
pilot_ids = None
126+
if pilot_stamps and isinstance(pilot_stamps, list):
127+
if isinstance(pilot_stamps[0], int):
128+
# Multiple elements (int)
129+
pilot_ids = pilot_stamps # Semantic
130+
elif isinstance(pilot_stamps, int):
131+
# Only one element (int)
132+
pilot_ids = [pilot_stamps]
133+
elif isinstance(pilot_stamps, str):
134+
# Only one element (str)
135+
pilot_stamps = [pilot_stamps]
136+
# Else: pilot_stamps should be list[str] (or the input is random)
137+
138+
if pilot_ids:
139+
# If we have defined pilot_ids, then we have to change them to pilot_stamps
140+
search = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
141+
142+
pilots = api.pilots.search(parameters=["PilotStamp"], search=search, sort=[]) # type: ignore
143+
pilot_stamps = [pilot["PilotStamp"] for pilot in pilots]
144+
145+
return api.pilots.delete_pilots(pilot_stamps=pilot_stamps) # type: ignore

0 commit comments

Comments
 (0)