Skip to content

Commit 5011800

Browse files
authored
Merge pull request #8252 from fstagni/90_sbstore_fixes
[9.0] SandboxStore simplifications
2 parents ebbf9ba + 5ae6358 commit 5011800

File tree

6 files changed

+37
-142
lines changed

6 files changed

+37
-142
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
from DIRAC.RequestManagementSystem.Client.Request import Request
3737
from DIRAC.WorkloadManagementSystem.Client import JobStatus
3838
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
39-
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
4039
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
4140
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
41+
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
4242

4343

4444
class JobCleaningAgent(AgentModule):
@@ -152,8 +152,9 @@ def removeDeletedJobs(self):
152152
return S_OK()
153153

154154
self.log.info("Unassigning sandboxes from soon to be deleted jobs", f"({len(jobList)})")
155-
result = SandboxStoreClient(useCertificates=True).unassignJobs(jobList)
156-
if not result["OK"]:
155+
156+
entitiesList = [f"Job:{jobId}" for jobId in jobList]
157+
if not (result := SandboxMetadataDB().unassignEntities(entitiesList))["OK"]:
157158
self.log.error("Cannot unassign jobs to sandboxes", result["Message"])
158159
return result
159160

src/DIRAC/WorkloadManagementSystem/Client/SandboxStoreClient.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
class SandboxStoreClient:
2222
__validSandboxTypes = ("Input", "Output")
23-
__smdb = None
2423

2524
def __init__(self, rpcClient=None, transferClient=None, smdb=False, **kwargs):
2625
"""Constructor
@@ -37,21 +36,8 @@ def __init__(self, rpcClient=None, transferClient=None, smdb=False, **kwargs):
3736
self.__transferClient = transferClient
3837
self.__kwargs = kwargs
3938
self.__vo = None
40-
SandboxStoreClient.__smdb = smdb
4139
if "delegatedGroup" in kwargs:
4240
self.__vo = getVOForGroup(kwargs["delegatedGroup"])
43-
if SandboxStoreClient.__smdb is True:
44-
try:
45-
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
46-
47-
SandboxStoreClient.__smdb = SandboxMetadataDB()
48-
result = SandboxStoreClient.__smdb._getConnection() # pylint: disable=protected-access
49-
if not result["OK"]:
50-
SandboxStoreClient.__smdb = False
51-
else:
52-
result["Value"].close()
53-
except (ImportError, RuntimeError, AttributeError):
54-
SandboxStoreClient.__smdb = False
5541

5642
def __getRPCClient(self):
5743
"""Get an RPC client for SB service"""
@@ -227,29 +213,6 @@ def downloadSandbox(self, sbLocation, destinationDir="", inMemory=False, unpack=
227213
##############
228214
# Jobs
229215

230-
def assignSandboxesToJob(self, jobId, sbList, ownerName="", ownerGroup=""):
231-
"""
232-
Assign sandboxes to a job.
233-
sbList must be a list of sandboxes and relation types
234-
sbList = [ ( "SB:SEName|SEPFN", "Input" ), ( "SB:SEName|SEPFN", "Output" ) ]
235-
"""
236-
eId = f"Job:{jobId}"
237-
for sbT in sbList:
238-
if sbT[1] not in self.__validSandboxTypes:
239-
return S_ERROR(f"Invalid Sandbox type {sbT[1]}")
240-
if SandboxStoreClient.__smdb and ownerName and ownerGroup:
241-
return SandboxStoreClient.__smdb.assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)
242-
return self.__getRPCClient().assignSandboxesToEntities({eId: sbList}, ownerName, ownerGroup)
243-
244-
def unassignJobs(self, jobIdList):
245-
"""Unassign SB to a job"""
246-
if isinstance(jobIdList, int):
247-
jobIdList = [jobIdList]
248-
entitiesList = []
249-
for jobId in jobIdList:
250-
entitiesList.append(f"Job:{jobId}")
251-
return self.__getRPCClient().unassignEntities(entitiesList)
252-
253216
def downloadSandboxForJob(self, jobId, sbType, destinationPath="", inMemory=False, unpack=True):
254217
"""Download SB for a job"""
255218
result = self.__getRPCClient().getSandboxesAssignedToEntity(f"Job:{jobId}")

src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
""" SandboxMetadataDB class is a front-end to the metadata for sandboxes
22
"""
3-
from DIRAC import S_ERROR, S_OK, gLogger
3+
from DIRAC import S_ERROR, S_OK
44
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
55
from DIRAC.Core.Base.DB import DB
66
from DIRAC.Core.Security import Properties
@@ -220,48 +220,22 @@ def assignSandboxesToEntities(self, enDict, requesterName, requesterGroup, owner
220220
return result
221221
return S_OK(assigned)
222222

223-
def __entitiesByRequesterCond(self, requesterName, requesterGroup):
224-
sqlCond = []
225-
requesterProps = Registry.getPropertiesForEntity(requesterGroup, name=requesterName)
226-
if Properties.JOB_ADMINISTRATOR in requesterProps:
227-
# Do nothing, just ensure it doesn't fit in the other cases
228-
pass
229-
elif Properties.JOB_SHARING in requesterProps:
230-
sqlCond.append(f"o.OwnerGroup='{requesterGroup}'")
231-
elif Properties.NORMAL_USER in requesterProps:
232-
sqlCond.append(f"o.OwnerGroup='{requesterGroup}'")
233-
sqlCond.append(f"o.Owner='{requesterName}'")
234-
else:
235-
return S_ERROR("Not authorized to access sandbox")
236-
return sqlCond
237-
238223
@convertToReturnValue
239-
def unassignEntities(self, entities, requesterName, requesterGroup):
224+
def unassignEntities(self, entities: list):
240225
"""
241-
Unassign jobs to sandboxes
226+
Unassign entities to sandboxes. Entities are a list of strings, e.g. ['job:1234', 'job:5678'].
242227
243228
:param list entities: list of entities to unassign
244229
"""
245230
if not entities:
246231
return None
247-
conds = self.__entitiesByRequesterCond(requesterName, requesterGroup)
248232

249-
sqlCmd = "CREATE TEMPORARY TABLE to_delete_EntityId (EntityId VARCHAR(128) NOT NULL, PRIMARY KEY (EntityId)) ENGINE=MEMORY;"
233+
sqlCmd = "CREATE TEMPORARY TABLE to_delete_EntityId (EntityId VARCHAR(128) NOT NULL, PRIMARY KEY (EntityId)) ENGINE=MEMORY;"
250234
returnValueOrRaise(self._update(sqlCmd))
251235
try:
252236
sqlCmd = "INSERT INTO to_delete_EntityId (EntityId) VALUES ( %s )"
253237
returnValueOrRaise(self._updatemany(sqlCmd, [(e,) for e in entities]))
254238
sqlCmd = "DELETE m from `sb_EntityMapping` m JOIN to_delete_EntityId t USING (EntityId)"
255-
if conds:
256-
sqlCmd = " ".join(
257-
[
258-
sqlCmd,
259-
"JOIN `sb_SandBoxes` s ON s.SBId = m.SBId",
260-
"JOIN `sb_Owners` o ON s.OwnerId = o.OwnerId",
261-
"WHERE",
262-
" AND ".join(conds),
263-
]
264-
)
265239
returnValueOrRaise(self._update(sqlCmd))
266240
finally:
267241
sqlCmd = "DROP TEMPORARY TABLE to_delete_EntityId"
@@ -313,16 +287,30 @@ def getUnusedSandboxes(self):
313287
sqlCmd = f"SELECT SBId, SEName, SEPFN FROM `sb_SandBoxes` WHERE SEPFN not like '/S3/%' AND (( {' ) OR ( '.join(sqlCond)} ))"
314288
return self._query(sqlCmd)
315289

290+
@convertToReturnValue
316291
def deleteSandboxes(self, SBIdList):
317292
"""
318-
Delete sandboxes
293+
Delete sandboxes using a temporary table for efficiency and consistency.
319294
"""
320-
sqlSBList = ", ".join([str(sbid) for sbid in SBIdList])
321-
for table in ("sb_SandBoxes", "sb_EntityMapping"):
322-
sqlCmd = f"DELETE FROM `{table}` WHERE SBId IN ( {sqlSBList} )"
323-
result = self._update(sqlCmd)
324-
if not result["OK"]:
325-
return result
295+
if not SBIdList:
296+
return S_OK()
297+
# Create temporary table
298+
sqlCmd = "CREATE TEMPORARY TABLE to_delete_SBId (SBId INTEGER(10) UNSIGNED NOT NULL, PRIMARY KEY (SBId)) ENGINE=MEMORY;"
299+
returnValueOrRaise(self._update(sqlCmd))
300+
try:
301+
# Insert SBIds into temporary table
302+
sqlCmd = "INSERT INTO to_delete_SBId (SBId) VALUES (%s)"
303+
returnValueOrRaise(self._updatemany(sqlCmd, [(sbid,) for sbid in SBIdList]))
304+
# Delete from sb_EntityMapping first (to respect FK constraints if any)
305+
sqlCmd = "DELETE FROM `sb_EntityMapping` WHERE SBId IN (SELECT SBId FROM to_delete_SBId)"
306+
returnValueOrRaise(self._update(sqlCmd))
307+
# Delete from sb_SandBoxes
308+
sqlCmd = "DELETE FROM `sb_SandBoxes` WHERE SBId IN (SELECT SBId FROM to_delete_SBId)"
309+
returnValueOrRaise(self._update(sqlCmd))
310+
finally:
311+
# Drop temporary table
312+
sqlCmd = "DROP TEMPORARY TABLE to_delete_SBId"
313+
returnValueOrRaise(self._update(sqlCmd))
326314
return S_OK()
327315

328316
def getSandboxId(self, SEName, SEPFN, requesterName, requesterGroup, field="SBId"):

src/DIRAC/WorkloadManagementSystem/Executor/JobSanity.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
""" The Job Sanity executor assigns sandboxes to the job """
22
from DIRAC import S_OK
3-
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
3+
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
44
from DIRAC.WorkloadManagementSystem.Executor.Base.OptimizerExecutor import OptimizerExecutor
55

66

@@ -15,7 +15,6 @@ class JobSanity(OptimizerExecutor):
1515
@classmethod
1616
def initializeOptimizer(cls):
1717
"""Initialize specific parameters for JobSanityAgent."""
18-
cls.sandboxClient = SandboxStoreClient(useCertificates=True, smdb=True)
1918
return S_OK()
2019

2120
def optimizeJob(self, jid, jobState):
@@ -57,7 +56,8 @@ def checkInputSandbox(self, jobState, manifest):
5756
if not numSBsToAssign:
5857
return S_OK(0)
5958
self.jobLog.info("Assigning sandboxes", f"({numSBsToAssign} on behalf of {ownerName}@{ownerGroup}@{vo})")
60-
result = self.sandboxClient.assignSandboxesToJob(jobState.jid, sbsToAssign, ownerName, ownerGroup)
59+
eId = f"Job:{jobState.jid}"
60+
result = SandboxMetadataDB().assignSandboxesToEntities({eId: sbsToAssign}, ownerName, ownerGroup)
6161
if not result["OK"]:
6262
self.jobLog.error("Could not assign sandboxes in the SandboxStore")
6363
return result

src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from DIRAC.Core.Security import Properties
2323
from DIRAC.Core.Utilities.File import getGlobbedTotalSize, mkDir
2424
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
25-
from DIRAC.DataManagementSystem.Service.StorageElementHandler import getDiskSpace
2625
from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator
2726

2827

@@ -139,7 +138,7 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
139138
gLogger.debug("Sandbox already exists in storage backend", res.pfn)
140139

141140
assignTo = {key: [(res.pfn, assignTo[key])] for key in assignTo}
142-
result = self.export_assignSandboxesToEntities(assignTo)
141+
result = self.sandboxDB.assignSandboxesToEntities(assignTo, credDict["username"], credDict["group"])
143142
if not result["OK"]:
144143
return result
145144
return S_OK(res.pfn)
@@ -153,7 +152,7 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
153152
fileHelper.markAsTransferred()
154153
sbURL = f"SB:{self.__localSEName}|{sbPath}"
155154
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
156-
result = self.export_assignSandboxesToEntities(assignTo)
155+
result = self.sandboxDB.assignSandboxesToEntities(assignTo, credDict["username"], credDict["group"])
157156
if not result["OK"]:
158157
return result
159158
return S_OK(sbURL)
@@ -210,7 +209,9 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
210209

211210
sbURL = f"SB:{self.__localSEName}|{sbPath}"
212211
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
213-
if not (result := self.export_assignSandboxesToEntities(assignTo))["OK"]:
212+
if not (result := self.sandboxDB.assignSandboxesToEntities(assignTo, credDict["username"], credDict["group"]))[
213+
"OK"
214+
]:
214215
return result
215216
return S_OK(sbURL)
216217

@@ -328,33 +329,6 @@ def __moveToFinalLocation(self, localFilePath, sbPath):
328329

329330
return result
330331

331-
##################
332-
# Assigning sbs to jobs
333-
334-
types_assignSandboxesToEntities = [dict]
335-
336-
def export_assignSandboxesToEntities(self, enDict, ownerName="", ownerGroup=""):
337-
"""
338-
Assign sandboxes to jobs.
339-
Expects a dict of { entityId : [ ( SB, SBType ), ... ] }
340-
"""
341-
credDict = self.getRemoteCredentials()
342-
return self.sandboxDB.assignSandboxesToEntities(
343-
enDict, credDict["username"], credDict["group"], ownerName, ownerGroup
344-
)
345-
346-
##################
347-
# Unassign sbs to jobs
348-
349-
types_unassignEntities = [(list, tuple)]
350-
351-
def export_unassignEntities(self, entitiesList):
352-
"""
353-
Unassign a list of jobs
354-
"""
355-
credDict = self.getRemoteCredentials()
356-
return self.sandboxDB.unassignEntities(entitiesList, credDict["username"], credDict["group"])
357-
358332
##################
359333
# Getting assigned sandboxes
360334

@@ -376,26 +350,6 @@ def export_getSandboxesAssignedToEntity(self, entityId):
376350
sbDict[SBType].append(f"SB:{SEName}|{SEPFN}")
377351
return S_OK(sbDict)
378352

379-
##################
380-
# Disk space left management
381-
382-
types_getFreeDiskSpace = []
383-
384-
def export_getFreeDiskSpace(self):
385-
"""Get the free disk space of the storage element
386-
If no size is specified, terabytes will be used by default.
387-
"""
388-
389-
return getDiskSpace(self.getCSOption("BasePath", "/opt/dirac/storage/sandboxes"))
390-
391-
types_getTotalDiskSpace = []
392-
393-
def export_getTotalDiskSpace(self):
394-
"""Get the total disk space of the storage element
395-
If no size is specified, terabytes will be used by default.
396-
"""
397-
return getDiskSpace(self.getCSOption("BasePath", "/opt/dirac/storage/sandboxes"), total=True)
398-
399353
##################
400354
# Download sandboxes
401355

tests/Integration/WorkloadManagementSystem/Test_SandboxStoreClient.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,15 @@
3838
from DIRAC.tests.Utilities.utils import find_all
3939
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
4040

41-
4241
gLogger.setLevel("DEBUG")
4342

4443

4544
def test_SSCChain():
4645
"""full test of functionalities"""
4746
ssc = SandboxStoreClient()
4847

49-
jobId = 1
50-
5148
exeScriptLocation = find_all("exe-script.py", "../..", "/DIRAC/tests/Integration")[0]
5249
fileList = [exeScriptLocation]
5350

5451
res = ssc.uploadFilesAsSandbox(fileList)
5552
assert res["OK"], res["Message"]
56-
57-
# TODO : FIXME
58-
# res = ssc.downloadSandboxForJob(jobId, "Input") # to run this we need the RSS on
59-
# print(res) # for debug...
60-
# assert res["OK"], res["Message"]
61-
62-
res = ssc.unassignJobs([jobId])
63-
assert res["OK"], res["Message"]

0 commit comments

Comments
 (0)