Skip to content

Commit 4e282b4

Browse files
authored
Merge pull request #7049 from atsareg/tornado-sandbox
[8.0] Simple TornadoSandboxStore handler
2 parents 37a2629 + 3f0571a commit 4e282b4

File tree

9 files changed

+148
-26
lines changed

9 files changed

+148
-26
lines changed

src/DIRAC/Core/Tornado/Client/TornadoClient.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from DIRAC.Core.Tornado.Client.private.TornadoBaseClient import TornadoBaseClient
2626
from DIRAC.Core.Utilities.JEncode import encode
27+
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
2728

2829

2930
class TornadoClient(TornadoBaseClient):
@@ -78,6 +79,25 @@ def receiveFile(self, destFile, *args):
7879
retVal = self._request(outputFile=destFile, **rpcCall)
7980
return retVal
8081

82+
def sendFile(self, filename, fileID):
83+
"""
84+
Equivalent of :py:meth:`~DIRAC.Core.DISET.TransferClient.TransferClient.sendFile`
85+
86+
In practice, it calls the remote method `streamFromClient` and transfers the file
87+
as a string type value
88+
89+
:param str filename: file (or path) where to store the result
90+
:param any fileID: tuple if file identifiers
91+
:returns: S_OK/S_ERROR result of the remote RPC call
92+
"""
93+
fileSize = getGlobbedTotalSize(filename)
94+
token = ""
95+
96+
with open(filename, "br") as input:
97+
result = self.executeRPC("streamFromClient", *[fileID, token, fileSize, input.read()])
98+
99+
return result
100+
81101

82102
def executeRPCStub(rpcStub):
83103
"""

src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def getToken(
5757
idpObj = result["Value"]
5858

5959
# Search for an existing token in tokensCache
60-
cachedKey = getCachedKey(idpObj.name, username, userGroup, scope, audience)
60+
cachedKey = getCachedKey(idpObj, username, userGroup, scope, audience)
6161
result = getCachedToken(self.__tokensCache, cachedKey, requiredTimeLeft)
6262
if result["OK"]:
6363
# A valid token has been found and is returned

src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def export_getToken(
213213
idpObj = result["Value"]
214214

215215
# Search for an existing token in tokensCache
216-
cachedKey = getCachedKey(idpObj.name, username, userGroup, scope, audience)
216+
cachedKey = getCachedKey(idpObj, username, userGroup, scope, audience)
217217
result = getCachedToken(self.__tokensCache, cachedKey, requiredTimeLeft)
218218
if result["OK"]:
219219
# A valid token has been found and is returned
@@ -259,7 +259,7 @@ def export_getToken(
259259
result = self.__checkProperties(dn, userGroup)
260260
if result["OK"]:
261261
# refresh token with requested scope
262-
result = idpObj.refreshToken(tokens.get("refresh_token"))
262+
result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope)
263263
if result["OK"]:
264264
# caching new tokens
265265
self.__tokensCache.add(

src/DIRAC/Resources/IdProvider/Utilities.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ def getIdProviderIdentifierFromIssuerAndClientID(issuer, clientID):
3939

4040
for identifier in result["Value"]:
4141
testedClientID = gConfig.getValue(f"/Resources/IdProviders/{identifier}/client_id")
42-
if testedClientID and testedClientID == clientID:
42+
# clientID is not always available, e.g. in case of a token of particular user
43+
if not clientID or (testedClientID and testedClientID == clientID):
4344
# Found the client ID but need to check the issuer
4445
# 2 different issuers could theoretically have a same client ID
4546
testedIssuer = gConfig.getValue(f"/Resources/IdProviders/{identifier}/issuer")

src/DIRAC/Resources/Storage/DIPStorage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def getFile(self, path, localPath=False):
160160

161161
def __getFile(self, src_url, dest_file):
162162
transferClient = TransferClient(self.url)
163-
res = transferClient.receiveFile(dest_file, src_url, token=self.checkSum)
163+
res = transferClient.receiveFile(dest_file, src_url, self.checkSum)
164164
if not res["OK"]:
165165
return res
166166
if not os.path.exists(dest_file):

src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,28 @@ Services
149149
}
150150
}
151151
##END
152+
##BEGIN TornadoSandboxStore
153+
TornadoSandboxStore
154+
{
155+
Protocol = https
156+
LocalSE = ProductionSandboxSE
157+
MaxThreads = 200
158+
toClientMaxThreads = 100
159+
Backend = local
160+
MaxSandboxSizeMiB = 10
161+
SandboxPrefix = Sandbox
162+
BasePath = /opt/dirac/storage/sandboxes
163+
DelayedExternalDeletion = True
164+
Authorization
165+
{
166+
Default = authenticated
167+
FileTransfer
168+
{
169+
Default = authenticated
170+
}
171+
}
172+
}
173+
##END
152174
OptimizationMind
153175
{
154176
Port = 9175

src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import time
1212
import threading
1313
import tempfile
14+
import hashlib
1415

1516
from DIRAC import gLogger, S_OK, S_ERROR
1617
from DIRAC.Core.DISET.RequestHandler import RequestHandler
@@ -24,9 +25,10 @@
2425
from DIRAC.RequestManagementSystem.Client.Operation import Operation
2526
from DIRAC.RequestManagementSystem.Client.File import File
2627
from DIRAC.Resources.Storage.StorageElement import StorageElement
28+
from DIRAC.Core.Utilities.File import getGlobbedTotalSize
2729

2830

29-
class SandboxStoreHandler(RequestHandler):
31+
class SandboxStoreHandlerMixin:
3032
__purgeCount = -1
3133
__purgeLock = threading.Lock()
3234
__purgeWorking = False
@@ -44,10 +46,10 @@ def initializeHandler(cls, serviceInfoDict):
4446
return S_ERROR(f"Can't connect to DB: {repr(excp)}")
4547
return S_OK()
4648

47-
def initialize(self):
49+
def initializeRequest(self):
4850
self.__backend = self.getCSOption("Backend", "local")
4951
self.__localSEName = self.getCSOption("LocalSE", "SandboxSE")
50-
self.__maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
52+
self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
5153
if self.__backend.lower() == "local" or self.__backend == self.__localSEName:
5254
self.__useLocalStorage = True
5355
self.__seNameToUse = self.__localSEName
@@ -75,13 +77,14 @@ def __getSandboxPath(self, md5):
7577
pathItems.extend([md5[0:3], md5[3:6], md5])
7678
return os.path.join(*pathItems)
7779

78-
def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
80+
def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
7981
"""
8082
Receive a file as a sandbox
8183
"""
8284

83-
if self.__maxUploadBytes and fileSize > self.__maxUploadBytes:
84-
fileHelper.markAsTransferred()
85+
if self._maxUploadBytes and fileSize > self._maxUploadBytes:
86+
if fileHelper:
87+
fileHelper.markAsTransferred()
8588
return S_ERROR("Sandbox is too big. Please upload it to a grid storage element")
8689

8790
if isinstance(fileId, (list, tuple)):
@@ -113,7 +116,8 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
113116
result = self.sandboxDB.getSandboxId(seName, sePFN, credDict["username"], credDict["group"])
114117
if result["OK"]:
115118
gLogger.info("Sandbox already exists. Skipping upload")
116-
fileHelper.markAsTransferred()
119+
if fileHelper:
120+
fileHelper.markAsTransferred()
117121
sbURL = f"SB:{seName}|{sePFN}"
118122
assignTo = {key: [(sbURL, assignTo[key])] for key in assignTo}
119123
result = self.export_assignSandboxesToEntities(assignTo)
@@ -126,14 +130,35 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
126130
else:
127131
hdPath = False
128132
# Write to local file
129-
result = self.__networkToFile(fileHelper, hdPath)
133+
134+
if fileHelper:
135+
result = self.__networkToFile(fileHelper, hdPath)
136+
elif data:
137+
hdPath = os.path.realpath(hdPath)
138+
mkDir(os.path.dirname(hdPath))
139+
with open(hdPath, "bw") as output:
140+
output.write(data)
141+
result = S_OK(hdPath)
142+
else:
143+
result = S_ERROR("No data provided")
144+
130145
if not result["OK"]:
131146
gLogger.error("Error while receiving sandbox file", result["Message"])
132147
return result
133148
hdPath = result["Value"]
134149
gLogger.info("Wrote sandbox to file", hdPath)
135150
# Check hash!
136-
if fileHelper.getHash() != aHash:
151+
if fileHelper:
152+
hdHash = fileHelper.getHash()
153+
else:
154+
oMD5 = hashlib.md5()
155+
with open(hdPath, "rb") as fd:
156+
bData = fd.read(10240)
157+
while bData:
158+
oMD5.update(bData)
159+
bData = fd.read(10240)
160+
hdHash = oMD5.hexdigest()
161+
if hdHash != aHash:
137162
self.__secureUnlinkFile(hdPath)
138163
gLogger.error("Hashes don't match! Client defined hash is different with received data hash!")
139164
return S_ERROR("Hashes don't match!")
@@ -147,13 +172,14 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
147172
sbPath = result["Value"][1]
148173
# Register!
149174
gLogger.info("Registering sandbox in the DB with", f"SB:{self.__seNameToUse}|{sbPath}")
175+
fSize = getGlobbedTotalSize(hdPath)
150176
result = self.sandboxDB.registerAndGetSandbox(
151177
credDict["username"],
152178
credDict["DN"],
153179
credDict["group"],
154180
self.__seNameToUse,
155181
sbPath,
156-
fileHelper.getTransferedBytes(),
182+
fSize,
157183
)
158184
if not result["OK"]:
159185
self.__secureUnlinkFile(hdPath)
@@ -166,6 +192,13 @@ def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
166192
return result
167193
return S_OK(sbURL)
168194

195+
def transfer_fromClient(self, fileId, token, fileSize, fileHelper):
196+
"""
197+
Receive a file as a sandbox
198+
"""
199+
200+
return self._getFromClient(fileId, token, fileSize, fileHelper=fileHelper)
201+
169202
def transfer_bulkFromClient(self, fileId, token, _fileSize, fileHelper):
170203
"""Receive files packed into a tar archive by the fileHelper logic.
171204
token is used for access rights confirmation.
@@ -259,7 +292,7 @@ def __networkToFile(self, fileHelper, destFileName=False):
259292
fd = tfd
260293
else:
261294
fd = open(destFileName, "wb")
262-
result = fileHelper.networkToDataSink(fd, maxFileSize=self.__maxUploadBytes)
295+
result = fileHelper.networkToDataSink(fd, maxFileSize=self._maxUploadBytes)
263296
fd.close()
264297
except Exception as e:
265298
gLogger.error("Cannot open to write destination file", f"{destFileName}: {repr(e).replace(',)', ')')}")
@@ -328,7 +361,7 @@ def export_assignSandboxesToEntities(self, enDict, ownerName="", ownerGroup="",
328361
Expects a dict of { entityId : [ ( SB, SBType ), ... ] }
329362
"""
330363
if not entitySetup:
331-
entitySetup = self.serviceInfoDict["clientSetup"]
364+
entitySetup = self.diracSetup
332365
credDict = self.getRemoteCredentials()
333366
return self.sandboxDB.assignSandboxesToEntities(
334367
enDict, credDict["username"], credDict["group"], entitySetup, ownerName, ownerGroup
@@ -344,7 +377,7 @@ def export_unassignEntities(self, entitiesList, entitiesSetup=False):
344377
Unassign a list of jobs
345378
"""
346379
if not entitiesSetup:
347-
entitiesSetup = self.serviceInfoDict["clientSetup"]
380+
entitiesSetup = self.diracSetup
348381
credDict = self.getRemoteCredentials()
349382
return self.sandboxDB.unassignEntities({entitiesSetup: entitiesList}, credDict["username"], credDict["group"])
350383

@@ -358,7 +391,7 @@ def export_getSandboxesAssignedToEntity(self, entityId, entitySetup=False):
358391
Get the sandboxes associated to a job and the association type
359392
"""
360393
if not entitySetup:
361-
entitySetup = self.serviceInfoDict["clientSetup"]
394+
entitySetup = self.diracSetup
362395
credDict = self.getRemoteCredentials()
363396
result = self.sandboxDB.getSandboxesAssignedToEntity(
364397
entityId, entitySetup, credDict["username"], credDict["group"]
@@ -400,6 +433,10 @@ def transfer_toClient(self, fileID, token, fileHelper):
400433
fileID is the local file name in the SE.
401434
token is used for access rights confirmation.
402435
"""
436+
437+
return self._sendToClient(fileID, token, fileHelper=fileHelper)
438+
439+
def _sendToClient(self, fileID, token, fileHelper=None, raw=False):
403440
credDict = self.getRemoteCredentials()
404441
serviceURL = self.serviceInfoDict["URL"]
405442
filePath = fileID.replace(serviceURL, "")
@@ -412,13 +449,20 @@ def transfer_toClient(self, fileID, token, fileHelper):
412449
hdPath = self.__sbToHDPath(filePath)
413450
if not os.path.isfile(hdPath):
414451
return S_ERROR("Sandbox does not exist")
415-
result = fileHelper.getFileDescriptor(hdPath, "rb")
416-
if not result["OK"]:
417-
return S_ERROR(f"Failed to get file descriptor: {result['Message']}")
418-
fd = result["Value"]
419-
result = fileHelper.FDToNetwork(fd)
420-
fileHelper.oFile.close()
421-
return result
452+
453+
if fileHelper:
454+
result = fileHelper.getFileDescriptor(hdPath, "rb")
455+
if not result["OK"]:
456+
return result
457+
fd = result["Value"]
458+
result = fileHelper.FDToNetwork(fd)
459+
fileHelper.oFile.close()
460+
return result
461+
462+
with open(hdPath, "rb") as fd:
463+
if raw:
464+
return fd.read()
465+
return S_OK(fd.read())
422466

423467
##################
424468
# Purge sandboxes
@@ -531,3 +575,10 @@ def __deleteSandboxFromExternalBackend(self, SEName, SEPFN):
531575
except Exception:
532576
gLogger.exception("RM raised an exception while trying to delete a remote sandbox")
533577
return S_ERROR("RM raised an exception while trying to delete a remote sandbox")
578+
579+
580+
class SandboxStoreHandler(SandboxStoreHandlerMixin, RequestHandler):
581+
def initialize(self):
582+
# we need it still in 8.0
583+
self.diracSetup = self.serviceInfoDict["clientSetup"]
584+
return self.initializeRequest()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
""" Tornado-based HTTPs SandboxStore service.
2+
3+
.. literalinclude:: ../ConfigTemplate.cfg
4+
:start-after: ##BEGIN TornadoSandboxStore
5+
:end-before: ##END
6+
:dedent: 2
7+
:caption: SandboxStore options
8+
9+
"""
10+
from DIRAC import gLogger, S_OK, S_ERROR
11+
from DIRAC.Core.Tornado.Server.TornadoService import TornadoService
12+
from DIRAC.WorkloadManagementSystem.Service.SandboxStoreHandler import SandboxStoreHandlerMixin
13+
14+
15+
class TornadoSandboxStoreHandler(SandboxStoreHandlerMixin, TornadoService):
16+
def initializeRequest(self):
17+
self.diracSetup = self.get_argument("clientSetup")
18+
# Ugly, but makes DIPS and HTTPS handlers compatible, TBD properly
19+
self.serviceInfoDict = self._serviceInfoDict
20+
return SandboxStoreHandlerMixin.initializeRequest(self)
21+
22+
def export_streamFromClient(self, fileId, token, fileSize, data):
23+
return self._getFromClient(fileId, token, fileSize, data=data)
24+
25+
def export_streamToClient(self, fileId, token=""):
26+
return self._sendToClient(fileId, token, raw=True)

tests/Jenkins/dirac-cfg-update-server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
csAPI.setOption("Systems/WorkloadManagement/Production/Services/SandboxStore/BasePath", f"{setupName}/sandboxes")
2727
csAPI.setOption("Systems/WorkloadManagement/Production/Services/SandboxStore/LogLevel", "DEBUG")
28+
csAPI.setOption("Systems/WorkloadManagement/Production/Services/TornadoSandboxStore/BasePath", f"{setupName}/sandboxes")
29+
csAPI.setOption("Systems/WorkloadManagement/Production/Services/TornadoSandboxStore/LogLevel", "DEBUG")
2830

2931
# Now setting a SandboxSE as the following:
3032
# ProductionSandboxSE

0 commit comments

Comments
 (0)