Skip to content

Commit 979ac4c

Browse files
Robin-Van-de-MerghelRobin VAN DE MERGHEL
authored andcommitted
feat: Migrating client toward DiracX
1 parent 5f54d5a commit 979ac4c

File tree

4 files changed

+159
-43
lines changed

4 files changed

+159
-43
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from diracx.client._generated.models import BodyPilotsAddJobsToPilot, BodyPilotsAddPilotStamps, ScalarSearchSpec, SearchParams
2+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
3+
4+
from DIRAC.Core.Security.DiracX import DiracXClient
5+
6+
7+
class PilotManagerClient:
8+
@convertToReturnValue
9+
def addPilotReferences(self, pilot_stamps, VO, gridType="DIRAC", pilot_references={}):
10+
with DiracXClient() as api:
11+
# We will move toward a stamp as identifier for the pilot
12+
return api.pilots.add_pilot_stamps(BodyPilotsAddPilotStamps(
13+
pilot_stamps=pilot_stamps,
14+
vo=VO,
15+
grid_type=gridType,
16+
pilot_references=pilot_references
17+
))
18+
19+
20+
def set_pilot_field(self, pilot_stamp, values_dict):
21+
with DiracXClient() as api:
22+
values_dict["PilotStamp"] = pilot_stamp
23+
return api.pilots.update_pilot_fields(
24+
values_dict
25+
)
26+
27+
@convertToReturnValue
28+
def setPilotBenchmark(self, pilotStamp, mark):
29+
return self.set_pilot_field(pilotStamp, {"BenchMark": mark})
30+
31+
@convertToReturnValue
32+
def setAccountingFlag(self, pilotStamp, flag):
33+
return self.set_pilot_field(pilotStamp, {"AccountingSent": flag})
34+
35+
@convertToReturnValue
36+
def setPilotStatus(self, pilot_stamp, status, destination=None, reason=None, grid_site=None, queue=None):
37+
return self.set_pilot_field(pilot_stamp, {
38+
"Status": status,
39+
"DestinationSite": destination,
40+
"StatusReason": reason,
41+
"GridSite": grid_site,
42+
"Queue": queue
43+
})
44+
45+
@convertToReturnValue
46+
def clearPilots(self, interval=30, aborted_interval=7):
47+
with DiracXClient() as api:
48+
api.pilots.delete_pilots(
49+
age_in_days=interval,
50+
delete_only_aborted=False
51+
)
52+
api.pilots.delete_pilots(
53+
age_in_days=aborted_interval,
54+
delete_only_aborted=True
55+
)
56+
57+
@convertToReturnValue
58+
def deletePilots(self, pilot_stamps):
59+
with DiracXClient() as api:
60+
api.pilots.delete_pilots(
61+
pilot_stamps=pilot_stamps
62+
)
63+
64+
@convertToReturnValue
65+
def setJobForPilot(self, job_id, pilot_stamp, destination=None):
66+
with DiracXClient() as api:
67+
api.pilots.add_jobs_to_pilot(BodyPilotsAddJobsToPilot(
68+
pilot_stamp=pilot_stamp,
69+
job_ids=[job_id]
70+
))
71+
72+
self.set_pilot_field(pilot_stamp, {
73+
"DestinationSite": destination,
74+
})
75+
76+
@convertToReturnValue
77+
def getPilots(self, job_id):
78+
with DiracXClient() as api:
79+
pilot_ids = api.pilots.get_pilot_jobs(
80+
job_id=job_id
81+
)
82+
83+
query = [{"parameter": "PilotID", "operator": "in", "value": pilot_ids}]
84+
85+
return api.pilots.search(
86+
parameters=[],
87+
search=query,
88+
sort=[]
89+
)
90+
91+
92+
@convertToReturnValue
93+
def getPilotInfo(self, pilot_stamp):
94+
with DiracXClient() as api:
95+
query = [{"parameter": "PilotStamp", "operator": "eq", "value": pilot_stamp}]
96+
97+
return api.pilots.search(
98+
parameters=[],
99+
search=query,
100+
sort=[]
101+
)

src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,6 @@ def export_getCurrentPilotCounters(cls, attrDict={}):
6969

7070
return S_OK(resultDict)
7171

72-
##########################################################################################
73-
types_addPilotReferences = [list, str]
74-
75-
@classmethod
76-
def export_addPilotReferences(cls, pilotRef, VO, gridType="DIRAC", pilotStampDict={}):
77-
"""Add a new pilot job reference"""
78-
return cls.pilotAgentsDB.addPilotReferences(pilotRef, VO, gridType, pilotStampDict)
79-
8072
##############################################################################
8173
types_getPilotOutput = [str]
8274

@@ -205,17 +197,11 @@ def _getRemotePilotOutput(self, pilotReference, pilotDict):
205197
# return res, correct or not
206198
return res
207199

208-
##############################################################################
209-
types_getPilotInfo = [[list, str]]
210-
211-
@classmethod
212-
def export_getPilotInfo(cls, pilotReference):
213-
"""Get the info about a given pilot job reference"""
214-
return cls.pilotAgentsDB.getPilotInfo(pilotReference)
215200

216201
##############################################################################
217202
types_selectPilots = [dict]
218203

204+
# Won't be moved to DiracX: not used at all anywhere.
219205
@classmethod
220206
def export_selectPilots(cls, condDict):
221207
"""Select pilots given the selection conditions"""
@@ -291,21 +277,35 @@ def export_getGroupedPilotSummary(cls, columnList):
291277
"""
292278
return cls.pilotAgentsDB.getGroupedPilotSummary(columnList)
293279

294-
##############################################################################
295-
types_getPilots = [[str, int]]
280+
281+
types_countPilots = [dict]
296282

297283
@classmethod
298-
def export_getPilots(cls, jobID):
299-
"""Get pilots executing/having executed the Job"""
300-
result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID))
301-
if not result["OK"] or not result["Value"]:
302-
return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}")
284+
def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"):
285+
"""Set the pilot agent status"""
303286

304-
return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"])
287+
return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp)
305288

306-
##############################################################################
307-
types_setJobForPilot = [[str, int], str]
308289

290+
# --------------- Moved to DiracX ---------------
291+
292+
#############################################
293+
types_addPilotReferences = [list, str]
294+
295+
# Moved to DiracX
296+
@classmethod
297+
def export_addPilotReferences(cls, pilotStamps, VO, gridType="DIRAC", pilotRefDict={}):
298+
"""Add a new pilot job reference"""
299+
pilot_references = pilotRefDict.values()
300+
pilot_stamp_dict = dict(zip(pilotStamps, pilot_references))
301+
302+
303+
return cls.pilotAgentsDB.addPilotReferences(pilot_references, VO, gridType, pilot_stamp_dict)
304+
305+
306+
#############################################
307+
types_setJobForPilot = [[str, int], str]
308+
309309
@classmethod
310310
def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
311311
"""Report the DIRAC job ID which is executed by the given pilot job"""
@@ -321,23 +321,24 @@ def export_setJobForPilot(cls, jobID, pilotRef, destination=None):
321321

322322
return result
323323

324-
##########################################################################################
324+
#############################################
325325
types_setPilotBenchmark = [str, float]
326326

327327
@classmethod
328328
def export_setPilotBenchmark(cls, pilotRef, mark):
329329
"""Set the pilot agent benchmark"""
330330
return cls.pilotAgentsDB.setPilotBenchmark(pilotRef, mark)
331331

332-
##########################################################################################
332+
#############################################
333333
types_setAccountingFlag = [str]
334334

335335
@classmethod
336336
def export_setAccountingFlag(cls, pilotRef, mark="True"):
337337
"""Set the pilot AccountingSent flag"""
338338
return cls.pilotAgentsDB.setAccountingFlag(pilotRef, mark)
339339

340-
##########################################################################################
340+
341+
#############################################
341342
types_setPilotStatus = [str, str]
342343

343344
@classmethod
@@ -348,22 +349,16 @@ def export_setPilotStatus(cls, pilotRef, status, destination=None, reason=None,
348349
pilotRef, status, destination=destination, statusReason=reason, gridSite=gridSite, queue=queue
349350
)
350351

351-
##########################################################################################
352-
types_countPilots = [dict]
353-
354-
@classmethod
355-
def export_countPilots(cls, condDict, older=None, newer=None, timeStamp="SubmissionTime"):
356-
"""Set the pilot agent status"""
357-
358-
return cls.pilotAgentsDB.countPilots(condDict, older, newer, timeStamp)
359-
360-
##########################################################################################
352+
#############################################
361353
types_deletePilots = [[list, str, int]]
362-
354+
363355
@classmethod
364356
def export_deletePilots(cls, pilotIDs):
365357
if isinstance(pilotIDs, str):
366358
return cls.pilotAgentsDB.deletePilot(pilotIDs)
359+
360+
# And list[str]????
361+
# pilot_id>>>S<<<
367362

368363
if isinstance(pilotIDs, int):
369364
pilotIDs = [
@@ -376,9 +371,29 @@ def export_deletePilots(cls, pilotIDs):
376371

377372
return S_OK()
378373

379-
##############################################################################
374+
#############################################
380375
types_clearPilots = [int, int]
381376

382377
@classmethod
383378
def export_clearPilots(cls, interval=30, aborted_interval=7):
384379
return cls.pilotAgentsDB.clearPilots(interval, aborted_interval)
380+
381+
##############################################################################
382+
types_getPilots = [[str, int]]
383+
384+
@classmethod
385+
def export_getPilots(cls, jobID):
386+
"""Get pilots executing/having executed the Job"""
387+
result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID))
388+
if not result["OK"] or not result["Value"]:
389+
return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}")
390+
391+
return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"])
392+
393+
##############################################################################
394+
types_getPilotInfo = [[list, str]]
395+
396+
@classmethod
397+
def export_getPilotInfo(cls, pilotReference):
398+
"""Get the info about a given pilot job reference"""
399+
return cls.pilotAgentsDB.getPilotInfo(pilotReference)

src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def main():
7474
if not DErrno.cmpError(res, DErrno.EWMSNOPILOT):
7575
gLogger.error(res["Message"])
7676
DIRACExit(1)
77-
res = pmc.addPilotReferences([pilotRef], VO, gridType, {pilotRef: pilotStamp})
77+
res = pmc.addPilotReferences(pilotStamp, VO, gridType, {pilotStamp: pilotRef})
7878
if not res["OK"]:
7979
gLogger.error(res["Message"])
8080
DIRACExit(1)

tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ def test_PilotsDB():
2525
webapp = WebAppClient()
2626

2727
# This will allow you to run the test again if necessary
28-
for jobID in ["aPilot", "anotherPilot"]:
29-
pilots.deletePilots(jobID)
28+
for pilot_stamp in ["aPilot", "anotherPilot"]:
29+
pilots.deletePilot(pilot_stamp)
3030

3131
res = pilots.addPilotReferences(["aPilot"], "VO")
3232
assert res["OK"], res["Message"]

0 commit comments

Comments
 (0)