Skip to content

Commit e8e7b13

Browse files
feat: Migrating client toward DiracX
1 parent 44ac56e commit e8e7b13

File tree

4 files changed

+159
-43
lines changed

4 files changed

+159
-43
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from diracx.client._generated.models import BodyPilotsAddJobsToPilot, BodyPilotsAddPilotStamps, ScalarSearchSpec, SearchParams
2+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
3+
4+
from DIRAC.Core.Security.DiracX import DiracXClient
5+
6+
7+
class PilotManagerClient:
8+
@convertToReturnValue
9+
def addPilotReferences(self, pilot_stamps, VO, gridType="DIRAC", pilot_references={}):
10+
with DiracXClient() as api:
11+
# We will move toward a stamp as identifier for the pilot
12+
return api.pilots.add_pilot_stamps(BodyPilotsAddPilotStamps(
13+
pilot_stamps=pilot_stamps,
14+
vo=VO,
15+
grid_type=gridType,
16+
pilot_references=pilot_references
17+
))
18+
19+
20+
def set_pilot_field(self, pilot_stamp, values_dict):
21+
with DiracXClient() as api:
22+
values_dict["PilotStamp"] = pilot_stamp
23+
return api.pilots.update_pilot_fields(
24+
values_dict
25+
)
26+
27+
@convertToReturnValue
28+
def setPilotBenchmark(self, pilotStamp, mark):
29+
return self.set_pilot_field(pilotStamp, {"BenchMark": mark})
30+
31+
@convertToReturnValue
32+
def setAccountingFlag(self, pilotStamp, flag):
33+
return self.set_pilot_field(pilotStamp, {"AccountingSent": flag})
34+
35+
@convertToReturnValue
36+
def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None):
37+
return self.set_pilot_field(pilot_stamp, {
38+
"Status": status,
39+
"DestinationSite": destination,
40+
"StatusReason": reason,
41+
"GridSite": grid_site,
42+
"Queue": queue
43+
})
44+
45+
@convertToReturnValue
46+
def clearPilots(self, interval=30, aborted_interval=7):
47+
with DiracXClient() as api:
48+
api.pilots.delete_pilots(
49+
age_in_days=interval,
50+
delete_only_aborted=False
51+
)
52+
api.pilots.delete_pilots(
53+
age_in_days=aborted_interval,
54+
delete_only_aborted=True
55+
)
56+
57+
@convertToReturnValue
58+
def deletePilots(self, pilot_stamps):
59+
with DiracXClient() as api:
60+
api.pilots.delete_pilots(
61+
pilot_stamps=pilot_stamps
62+
)
63+
64+
@convertToReturnValue
65+
def setJobForPilot(self, job_id, pilot_stamp, destination=None):
66+
with DiracXClient() as api:
67+
api.pilots.add_jobs_to_pilot(BodyPilotsAddJobsToPilot(
68+
pilot_stamp=pilot_stamp,
69+
job_ids=[job_id]
70+
))
71+
72+
self.set_pilot_field(pilot_stamp, {
73+
"DestinationSite": destination,
74+
})
75+
76+
@convertToReturnValue
77+
def getPilots(self, job_id):
78+
with DiracXClient() as api:
79+
pilot_ids = api.pilots.get_pilot_jobs(
80+
job_id=job_id
81+
)
82+
83+
query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
84+
85+
return api.pilots.search(
86+
parameters=[],
87+
search=query,
88+
sort=[]
89+
)
90+
91+
92+
@convertToReturnValue
93+
def getPilotInfo(self, pilot_stamp):
94+
with DiracXClient() as api:
95+
query = [{"parameter": "PilotStamp", "operator": "eq", "value": pilot_stamp}]
96+
97+
return api.pilots.search(
98+
parameters=[],
99+
search=query,
100+
sort=[]
101+
)

src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,6 @@ def export_getCurrentPilotCounters(cls, attrDict={}):
6969

7070
return S_OK(resultDict)
7171

72-
##########################################################################################
73-
types_addPilotReferences = [list, str]
74-
75-
@classmethod
76-
def export_addPilotReferences(cls, pilotRef, VO, gridType="DIRAC", pilotStampDict={}):
77-
"""Add a new pilot job reference"""
78-
return cls.pilotAgentsDB.addPilotReferences(pilotRef, VO, gridType, pilotStampDict)
79-
8072
##############################################################################
8173
types_getPilotOutput = [str]
8274

@@ -202,17 +194,11 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict):
202194
# return res, correct or not
203195
return res
204196

205-
##############################################################################
206-
types_getPilotInfo = [[list, str]]
207-
208-
@classmethod
209-
def export_getPilotInfo(cls, pilotReference):
210-
"""Get the info about a given pilot job reference"""
211-
return cls.pilotAgentsDB.getPilotInfo(pilotReference)
212197

213198
##############################################################################
214199
types_selectPilots = [dict]
215200

201+
# Won't be moved to DiracX: not used at all anywhere.
216202
@classmethod
217203
def export_selectPilots(cls, condDict):
218204
"""Select pilots given the selection conditions"""
@@ -288,21 +274,35 @@ def export_getGroupedPilotSummary(cls, columnList):
288274
"""
289275
return cls.pilotAgentsDB.getGroupedPilotSummary(columnList)
290276

291-
##############################################################################
292-
types_getPilots = [[str, int]]
277+
278+
types_countPilots = [dict]
293279

294280
@classmethod
295-
def export_getPilots(cls, jobID):
296-
"""Get pilots executing/having executed the Job"""
297-
result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID))
298-
if not result["OK"] or not result["Value"]:
299-
return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}")
281+
def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"):
282+
"""Set the pilot agent status"""
300283

301-
return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"])
284+
return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp)
302285

303-
##############################################################################
304-
types_setJobForPilot = [[str, int], str]
305286

287+
# --------------- Moved to DiracX ---------------
288+
289+
#############################################
290+
types_addPilotReferences = [list, str]
291+
292+
# Moved to DiracX
293+
@classmethod
294+
def export_addPilotReferences(cls, pilotStamps, VO, gridType="DIRAC", pilotRefDict={}):
295+
"""Add a new pilot job reference"""
296+
pilot_references = pilotRefDict.values()
297+
pilot_stamp_dict = dict(zip(pilotStamps, pilot_references))
298+
299+
300+
return cls.pilotAgentsDB.addPilotReferences(pilot_references, VO, gridType, pilot_stamp_dict)
301+
302+
303+
#############################################
304+
types_setJobForPilot = [[str, int], str]
305+
306306
@classmethod
307307
def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
308308
"""Report the DIRAC job ID which is executed by the given pilot job"""
@@ -318,23 +318,24 @@ def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
318318

319319
return result
320320

321-
##########################################################################################
321+
#############################################
322322
types_setPilotBenchmark = [str, float]
323323

324324
@classmethod
325325
def export_setPilotBenchmark(cls, pilotRef, mark):
326326
"""Set the pilot agent benchmark"""
327327
return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark)
328328

329-
##########################################################################################
329+
#############################################
330330
types_setAccountingFlag = [str]
331331

332332
@classmethod
333333
def export_setAccountingFlag(cls, pilotRef, mark="True"):
334334
"""Set the pilot AccountingSent flag"""
335335
return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark)
336336

337-
##########################################################################################
337+
338+
#############################################
338339
types_setPilotStatus = [str, str]
339340

340341
@classmethod
@@ -345,22 +346,16 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None,
345346
pilotRef, status, destination=destination, statusReason=reason, gridSite=gridSite, queue=queue
346347
)
347348

348-
##########################################################################################
349-
types_countPilots = [dict]
350-
351-
@classmethod
352-
def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"):
353-
"""Set the pilot agent status"""
354-
355-
return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp)
356-
357-
##########################################################################################
349+
#############################################
358350
types_deletePilots = [[list, str, int]]
359-
351+
360352
@classmethod
361353
def export_deletePilots(cls, pilotIDs):
362354
if isinstance(pilotIDs, str):
363355
return cls.pilotAgentsDB.deletePilot(pilotIDs)
356+
357+
# And list[str]????
358+
# pilot_id>>>S<<<
364359

365360
if isinstance(pilotIDs, int):
366361
pilotIDs = [
@@ -373,9 +368,29 @@ def export_deletePilots(cls, pilotIDs):
373368

374369
return S_OK()
375370

376-
##############################################################################
371+
#############################################
377372
types_clearPilots = [int, int]
378373

379374
@classmethod
380375
def export_clearPilots(cls, interval=30, aborted_interval=7):
381376
return cls.pilotAgentsDB.clearPilots(interval, aborted_interval)
377+
378+
##############################################################################
379+
types_getPilots = [[str, int]]
380+
381+
@classmethod
382+
def export_getPilots(cls, jobID):
383+
"""Get pilots executing/having executed the Job"""
384+
result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID))
385+
if not result["OK"] or not result["Value"]:
386+
return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}")
387+
388+
return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"])
389+
390+
##############################################################################
391+
types_getPilotInfo = [[list, str]]
392+
393+
@classmethod
394+
def export_getPilotInfo(cls, pilotReference):
395+
"""Get the info about a given pilot job reference"""
396+
return cls.pilotAgentsDB.getPilotInfo(pilotReference)

src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def main():
7474
if not DErrno.cmpError(res, DErrno.EWMSNOPILOT):
7575
gLogger.error(res["Message"])
7676
DIRACExit(1)
77-
res = pmc.addPilotReferences([pilotRef], VO, gridType, {pilotRef: pilotStamp})
77+
res = pmc.addPilotReferences(pilotStamp, VO, gridType, {pilotStamp: pilotRef})
7878
if not res["OK"]:
7979
gLogger.error(res["Message"])
8080
DIRACExit(1)

tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ def test_PilotsDB():
2525
webapp = WebAppClient()
2626

2727
# This will allow you to run the test again if necessary
28-
for jobID in ["aPilot", "anotherPilot"]:
29-
pilots.deletePilots(jobID)
28+
for pilot_stamp in ["aPilot", "anotherPilot"]:
29+
pilots.deletePilot(pilot_stamp)
3030

3131
res = pilots.addPilotReferences(["aPilot"], "VO")
3232
assert res["OK"], res["Message"]

0 commit comments

Comments
 (0)