Skip to content

Commit 0aa36a0

Browse files
committed
feat: simple implementation of the TornadoSandboxStore handler
1 parent bcef90f commit 0aa36a0

File tree

3 files changed

+121
-32
lines changed

3 files changed

+121
-32
lines changed

src/DIRAC/Core/Tornado/Client/TornadoClient.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from DIRAC.Core.Tornado.Client.private.TornadoBaseClient import TornadoBaseClient
2626
from DIRAC.Core.Utilities.JEncode import encode
27+
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
2728

2829

2930
class TornadoClient(TornadoBaseClient):
@@ -63,7 +64,7 @@ def executeRPC(self, method, *args):
6364
retVal["rpcStub"] = (self._getBaseStub(), method, list(args))
6465
return retVal
6566

66-
def receiveFile(self, destFile, *args):
67+
def receiveFile(self, destFile, fileId, token=""):
6768
"""
6869
Equivalent of :py:meth:`~DIRAC.Core.DISET.TransferClient.TransferClient.receiveFile`
6970
@@ -78,6 +79,25 @@ def receiveFile(self, destFile, *args):
7879
retVal = self._request(outputFile=destFile, **rpcCall)
7980
return retVal
8081

82+
def sendFile(self, filename, fileID):
83+
"""
84+
Equivalent of :py:meth:`~DIRAC.Core.DISET.TransferClient.TransferClient.sendFile`
85+
86+
In practice, it calls the remote method `streamFromClient` and transfers the file
87+
as a string type value
88+
89+
:param str filename: file (or path) where to store the result
90+
:param any fileID: tuple if file identifiers
91+
:returns: S_OK/S_ERROR result of the remote RPC call
92+
"""
93+
fileSize = getGlobbedTotalSize(filename)
94+
token = ""
95+
96+
with open(filename, "br") as input:
97+
result = self.executeRPC("streamFromClient", *[fileID, token, fileSize, input.read()])
98+
99+
return result
100+
81101

82102
def executeRPCStub(rpcStub):
83103
"""

src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import time
1212
import threading
1313
import tempfile
14+
import hashlib
1415

1516
from DIRAC import gLogger, S_OK, S_ERROR
1617
from DIRAC.Core.DISET.RequestHandler import RequestHandler
@@ -24,9 +25,10 @@
2425
from DIRAC.RequestManagementSystem.Client.Operation import Operation
2526
from DIRAC.RequestManagementSystem.Client.File import File
2627
from DIRAC.Resources.Storage.StorageElement import StorageElement
28+
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
2729

2830

29-
class SandboxStoreHandler(RequestHandler):
31+
class SandboxStoreHandlerMixin:
3032
__purgeCount = -1
3133
__purgeLock = threading.Lock()
3234
__purgeWorking = False
@@ -44,10 +46,10 @@ def initializeHandler(cls, serviceInfoDict):
4446
return S_ERROR(f"Can't connect to DB: {repr(excp)}")
4547
return S_OK()
4648

47-
def initialize(self):
49+
def initializeRequest(self):
4850
self.__backend = self.getCSOption("Backend", "local")
4951
self.__localSEName = self.getCSOption("LocalSE", "SandboxSE")
50-
self.__maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
52+
self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
5153
if self.__backend.lower() == "local" or self.__backend == self.__localSEName:
5254
self.__useLocalStorage = True
5355
self.__seNameToUse = self.__localSEName
@@ -75,13 +77,14 @@ def __getSandboxPath(self, md5):
7577
pathItems.extend([md5[0:3], md5[3:6], md5])
7678
return os.path.join(*pathItems)
7779

78-
def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
80+
def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
7981
"""
8082
Receive a file as a sandbox
8183
"""
8284

83-
if self.__maxUploadBytes and fileSize > self.__maxUploadBytes:
84-
fileHelper.markAsTransferred()
85+
if self._maxUploadBytes and fileSize > self._maxUploadBytes:
86+
if fileHelper:
87+
fileHelper.markAsTransferred()
8588
return S_ERROR("Sandbox is too big. Please upload it to a grid storage element")
8689

8790
if isinstance(fileId, (list, tuple)):
@@ -113,7 +116,8 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
113116
result = self.sandboxDB.getSandboxId(seName, sePFN, credDict["username"], credDict["group"])
114117
if result["OK"]:
115118
gLogger.info("Sandbox already exists. Skipping upload")
116-
fileHelper.markAsTransferred()
119+
if fileHelper:
120+
fileHelper.markAsTransferred()
117121
sbURL = f"SB:{seName}|{sePFN}"
118122
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
119123
result = self.export_assignSandboxesToEntities(assignTo)
@@ -126,14 +130,35 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
126130
else:
127131
hdPath = False
128132
# Write to local file
129-
result = self.__networkToFile(fileHelper, hdPath)
133+
134+
if fileHelper:
135+
result = self.__networkToFile(fileHelper, hdPath)
136+
elif data:
137+
hdPath = os.path.realpath(hdPath)
138+
mkDir(os.path.dirname(hdPath))
139+
with open(hdPath, "bw") as output:
140+
output.write(data)
141+
result = S_OK(hdPath)
142+
else:
143+
result = S_ERROR("No data provided")
144+
130145
if not result["OK"]:
131146
gLogger.error("Error while receiving sandbox file", result["Message"])
132147
return result
133148
hdPath = result["Value"]
134149
gLogger.info("Wrote sandbox to file", hdPath)
135150
# Check hash!
136-
if fileHelper.getHash() != aHash:
151+
if fileHelper:
152+
hdHash = fileHelper.getHash()
153+
else:
154+
oMD5 = hashlib.md5()
155+
with open(hdPath, "rb") as fd:
156+
bData = fd.read(10240)
157+
while bData:
158+
oMD5.update(bData)
159+
bData = fd.read(10240)
160+
hdHash = oMD5.hexdigest()
161+
if hdHash != aHash:
137162
self.__secureUnlinkFile(hdPath)
138163
gLogger.error("Hashes don't match! Client defined hash is different with received data hash!")
139164
return S_ERROR("Hashes don't match!")
@@ -147,13 +172,14 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
147172
sbPath = result["Value"][1]
148173
# Register!
149174
gLogger.info("Registering sandbox in the DB with", f"SB:{self.__seNameToUse}|{sbPath}")
175+
fSize = getGlobbedTotalSize(hdPath)
150176
result = self.sandboxDB.registerAndGetSandbox(
151177
credDict["username"],
152178
credDict["DN"],
153179
credDict["group"],
154180
self.__seNameToUse,
155181
sbPath,
156-
fileHelper.getTransferedBytes(),
182+
fSize,
157183
)
158184
if not result["OK"]:
159185
self.__secureUnlinkFile(hdPath)
@@ -166,6 +192,13 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
166192
return result
167193
return S_OK(sbURL)
168194

195+
def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
196+
"""
197+
Receive a file as a sandbox
198+
"""
199+
200+
return self._getFromFile(fileId, token, fileSize, fileHelper=fileHelper)
201+
169202
def transfer_bulkFromClient(self, fileId, token, _fileSize, fileHelper):
170203
"""Receive files packed into a tar archive by the fileHelper logic.
171204
token is used for access rights confirmation.
@@ -322,47 +355,39 @@ def __copyToExternalSE(self, localFilePath, sbPath):
322355

323356
types_assignSandboxesToEntities = [dict]
324357

325-
def export_assignSandboxesToEntities(self, enDict, ownerName="", ownerGroup="", entitySetup=False):
358+
def export_assignSandboxesToEntities(self, enDict, ownerName="", ownerGroup=""):
326359
"""
327360
Assign sandboxes to jobs.
328361
Expects a dict of { entityId : [ ( SB, SBType ), ... ] }
329362
"""
330-
if not entitySetup:
331-
entitySetup = self.serviceInfoDict["clientSetup"]
332363
credDict = self.getRemoteCredentials()
333364
return self.sandboxDB.assignSandboxesToEntities(
334-
enDict, credDict["username"], credDict["group"], entitySetup, ownerName, ownerGroup
365+
enDict, credDict["username"], credDict["group"], ownerName, ownerGroup
335366
)
336367

337368
##################
338369
# Unassign sbs to jobs
339370

340371
types_unassignEntities = [(list, tuple)]
341372

342-
def export_unassignEntities(self, entitiesList, entitiesSetup=False):
373+
def export_unassignEntities(self, entitiesList):
343374
"""
344375
Unassign a list of jobs
345376
"""
346-
if not entitiesSetup:
347-
entitiesSetup = self.serviceInfoDict["clientSetup"]
348377
credDict = self.getRemoteCredentials()
349-
return self.sandboxDB.unassignEntities({entitiesSetup: entitiesList}, credDict["username"], credDict["group"])
378+
return self.sandboxDB.unassignEntities(entitiesList, credDict["username"], credDict["group"])
350379

351380
##################
352381
# Getting assigned sandboxes
353382

354383
types_getSandboxesAssignedToEntity = [str]
355384

356-
def export_getSandboxesAssignedToEntity(self, entityId, entitySetup=False):
385+
def export_getSandboxesAssignedToEntity(self, entityId):
357386
"""
358387
Get the sandboxes associated to a job and the association type
359388
"""
360-
if not entitySetup:
361-
entitySetup = self.serviceInfoDict["clientSetup"]
362389
credDict = self.getRemoteCredentials()
363-
result = self.sandboxDB.getSandboxesAssignedToEntity(
364-
entityId, entitySetup, credDict["username"], credDict["group"]
365-
)
390+
result = self.sandboxDB.getSandboxesAssignedToEntity(entityId, credDict["username"], credDict["group"])
366391
if not result["OK"]:
367392
return result
368393
sbDict = {}
@@ -400,6 +425,10 @@ def transfer_toClient(self, fileID, token, fileHelper):
400425
fileID is the local file name in the SE.
401426
token is used for access rights confirmation.
402427
"""
428+
429+
return self._sendToClient(fileID, token, fileHelper=fileHelper)
430+
431+
def _sendToClient(self, fileID, token, fileHelper=None, raw=False):
403432
credDict = self.getRemoteCredentials()
404433
serviceURL = self.serviceInfoDict["URL"]
405434
filePath = fileID.replace(serviceURL, "")
@@ -412,13 +441,21 @@ def transfer_toClient(self, fileID, token, fileHelper):
412441
hdPath = self.__sbToHDPath(filePath)
413442
if not os.path.isfile(hdPath):
414443
return S_ERROR("Sandbox does not exist")
415-
result = fileHelper.getFileDescriptor(hdPath, "rb")
416-
if not result["OK"]:
417-
return S_ERROR(f"Failed to get file descriptor: {result['Message']}")
418-
fd = result["Value"]
419-
result = fileHelper.FDToNetwork(fd)
420-
fileHelper.oFile.close()
421-
return result
444+
445+
if fileHelper:
446+
result = fileHelper.getFileDescriptor(hdPath, "rb")
447+
if not result["OK"]:
448+
return S_ERROR(f"Failed to get file descriptor: {result['Message']}")
449+
fd = result["Value"]
450+
result = fileHelper.FDToNetwork(fd)
451+
fileHelper.oFile.close()
452+
return result
453+
454+
with open(hdPath, "rb") as fd:
455+
if raw:
456+
return fd.read()
457+
else:
458+
return S_OK(fd.read())
422459

423460
##################
424461
# Purge sandboxes
@@ -531,3 +568,12 @@ def __deleteSandboxFromExternalBackend(self, SEName, SEPFN):
531568
except Exception:
532569
gLogger.exception("RM raised an exception while trying to delete a remote sandbox")
533570
return S_ERROR("RM raised an exception while trying to delete a remote sandbox")
571+
572+
def export_sendFile(self, filename, fileID, data=""):
573+
print(filename, fileID, data)
574+
575+
return S_OK(filename)
576+
577+
578+
class SandboxStoreHandler(SandboxStoreHandlerMixin, RequestHandler):
579+
pass
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
""" Tornado-based HTTPs SandboxStore service.
2+
3+
.. literalinclude:: ../ConfigTemplate.cfg
4+
:start-after: ##BEGIN TornadoSandboxStore
5+
:end-before: ##END
6+
:dedent: 2
7+
:caption: SandboxStore options
8+
9+
"""
10+
from DIRAC import gLogger, S_OK, S_ERROR
11+
from DIRAC.Core.Tornado.Server.TornadoService import TornadoService
12+
from DIRAC.WorkloadManagementSystem.Service.SandboxStoreHandler import SandboxStoreHandlerMixin
13+
14+
15+
class TornadoSandboxStoreHandler(SandboxStoreHandlerMixin, TornadoService):
16+
def initializeRequest(self):
17+
return SandboxStoreHandlerMixin.initializeRequest(self)
18+
19+
def export_streamFromClient(self, fileId, token, fileSize, data):
20+
return self._getFromClient(fileId, token, fileSize, data=data)
21+
22+
def export_streamToClient(self, fileId, token=""):
23+
return self._sendToClient(fileId, token, raw=True)

0 commit comments

Comments
 (0)