diff --git a/docs/source/AdministratorGuide/Resources/storage.rst b/docs/source/AdministratorGuide/Resources/storage.rst index 232faad659d..0dc907bcdce 100644 --- a/docs/source/AdministratorGuide/Resources/storage.rst +++ b/docs/source/AdministratorGuide/Resources/storage.rst @@ -59,7 +59,8 @@ Configuration options are: * ``SpaceReservation``: just a name of a zone of the physical storage which can have some space reserved. Extends the SRM ``SpaceToken`` concept. * ``ArchiveTimeout``: for tape SE only. If set to a value in seconds, enables the `FTS Archive Monitoring feature `_ * ``BringOnlineTimeout``: for tape SE only. If set to a value in seconds, specify the BringOnline parameter for FTS transfers. Otherwise, the default is whatever is in the ``FTS3Job`` class. -* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to +* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to (only used for FTS transfers for now) +* ``TokenSupport``: EXPERIMENTAL Default False. If true, allow using tokens for FTS3 transfers. VO specific paths ----------------- diff --git a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst index 7822ce7c1a1..35b3aff85ed 100644 --- a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst +++ b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst @@ -198,16 +198,40 @@ Token support .. versionadded:: v8.0.51 .. warning:: - Very experimental feature + Experimental feature -The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions... +Currently used in production by LHCb for all disk to disk transfers. + A transfer will happen with token if: * ``UseTokens`` is true in the FTSAgent configuration * ``WLCGTokenBasePath`` is set for both the source and the destination + * ``TokenSupport`` is true for both the source and the destination + +The token issued are file specific, long lived, and unmanaged (i.e. FTS will not refresh them). + +You will need to define a specific client in IAM with the following scopes: + + * fts + * storage.modify:/ + * storage.read:/ + +Obviously, you can adapt the ``/`` if needed. This client then needs to be added to your DIRAC IAM IdP configuration as: ``fts_client_id`` and ``fts_client_secret``. For example + + .. code-block:: guess -The tokens use specific file path, and not generic wildcard permissions. + Resources + { + IdProviders + { + + { + fts_client_id = + fts_client_secret = + } + } + } .. warning:: Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental) diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 4d17ca1b430..abf5c993f7c 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -1,9 +1,11 @@ -""" FTS3Job module containing only the FTS3Job class """ +"""FTS3Job module containing only the FTS3Job class""" import datetime import errno +import os from packaging.version import Version +from cachetools import cachedmethod, LRUCache # Requires at least version 3.3.3 from fts3 import __version__ as fts3_version @@ -26,8 +28,9 @@ from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager +from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import getIdProviderClient -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnValueOrRaise from DIRAC.Core.Utilities.DErrno import cmpError from DIRAC.Core.Utilities.JEncode import JSerializable @@ -36,6 +39,13 @@ # 3 days in seconds BRING_ONLINE_TIMEOUT = 259200 +# Number of IdP to keep in cache. Should correspond roughly +# to the number of groups performing transfers +IDP_CACHE_SIZE = 8 + +# POC CHRIS Tape Tokens +# Done following https://indico.cern.ch/event/1542060/contributions/6723217/ + class FTS3Job(JSerializable): """Abstract class to represent a job to be executed by FTS. It belongs @@ -78,6 +88,8 @@ class FTS3Job(JSerializable): "userGroup", ] + _idp_cache = LRUCache(maxsize=IDP_CACHE_SIZE) + def __init__(self): self.submitTime = None self.lastUpdate = None @@ -113,6 +125,11 @@ def __init__(self): # when a job is in a final state self.accountingDict = None + @classmethod + @cachedmethod(lambda cls: cls._idp_cache) + def _getIdpClient(cls, group_name: str): + return returnValueOrRaise(getIdProviderClient(group_name, None, client_name_prefix="fts")) + def monitor(self, context=None, ftsServer=None, ucert=None): """Queries the fts server to monitor the job. The internal state of the object is updated depending on the @@ -365,6 +382,10 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N copy_pin_lifetime = None bring_online = None archive_timeout = None + ### POC CHRIS tape token + tape_token_src = None + tape_token_dst = None + ### transfers = [] @@ -388,6 +409,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.error("Could not get source SURL", f"{lfn} {reason}") allSrcDstSURLs = res["Value"]["Successful"] + ### POC CHRIS Tape token + common_lfns_prefix = os.path.commonprefix(list(allSrcDstSURLs)) + ### srcProto, destProto = res["Value"]["Protocols"] # If the source is a tape SE, we should set the @@ -398,13 +422,54 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N copy_pin_lifetime = pinTime bring_online = srcSE.options.get("BringOnlineTimeout", BRING_ONLINE_TIMEOUT) + ### POC CHRIS Tape token + # Problem here: the grouping at FTS will not + # be efficient as it needs to group based on scope as well + # It is more optimal to keep "storage.stage:/" as scope + # The risk is that "stage" is a superset of "read", which + # is not a problem at the moment, but could be for + # communities like astrophysics + # See https://github.com/WLCG-AuthZ-WG/common-jwt-profile/blob/master/profile.md + if tokensEnabled and self.__seTokenSupport(srcSE): + res = srcSE.getWLCGTokenPath(common_lfns_prefix) + if not res["OK"]: + return res + tape_token_src_path = res["Value"] + + res = self._getIdpClient(self.userGroup).fetchToken( + grant_type="client_credentials", + scope=[f"storage.stage:/{tape_token_src_path}", f"storage.stat:/{tape_token_src_path}"], + # TODO: add a specific audience + ) + if not res["OK"]: + return res + tape_token_src = res["Value"]["access_token"] + ### + # If the destination is a tape, and the protocol supports it, # check if we want to have an archive timeout # In case of multihop, this is relevant only for the # final target, but again, code factorization is more important dstIsTape = self.__isTapeSE(hopDstSEName, self.vo) if dstIsTape and destProto in dstSE.localStageProtocolList: + tape_token_src_path = res["Value"] archive_timeout = dstSE.options.get("ArchiveTimeout") + ### POC CHRIS Tape token + if tokensEnabled and self.__seTokenSupport(dstSE): + res = dstSE.getWLCGTokenPath(common_lfns_prefix) + if not res["OK"]: + return res + tape_token_dst_path = res["Value"] + + res = self._getIdpClient(self.userGroup).fetchToken( + grant_type="client_credentials", + scope=[f"storage.stat:/{tape_token_dst_path}"], + # TODO: add a specific audience + ) + if not res["OK"]: + return res + tape_dst_src = res["Value"]["access_token"] + ### # This contains the staging URLs if they are different from the transfer URLs # (CTA...) @@ -509,11 +574,10 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if not res["OK"]: return res srcTokenPath = res["Value"] - res = gTokenManager.getToken( - userGroup=self.userGroup, - requiredTimeLeft=3600, + res = self._getIdpClient(self.userGroup).fetchToken( + grant_type="client_credentials", scope=[f"storage.read:/{srcTokenPath}", "offline_access"], - useCache=False, + # TODO: add a specific audience ) if not res["OK"]: return res @@ -528,11 +592,17 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if not res["OK"]: return res dstTokenPath = res["Value"] - res = gTokenManager.getToken( - userGroup=self.userGroup, - requiredTimeLeft=3600, - scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"], - useCache=False, + res = self._getIdpClient(self.userGroup).fetchToken( + grant_type="client_credentials", + scope=[ + f"storage.modify:/{dstTokenPath}", + f"storage.read:/{dstTokenPath}", + # Needed because CNAF + # https://ggus.eu/index.php?mode=ticket_info&ticket_id=165048 + f"storage.read:/{os.path.dirname(dstTokenPath)}", + "offline_access", + ], + # TODO: add a specific audience ) if not res["OK"]: return res @@ -589,6 +659,10 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N metadata=job_metadata, priority=self.priority, archive_timeout=archive_timeout, + ### POC Chris tape token + stage_access_token=tape_token_src_path, + tape_poll_access_token=tape_token_dst or tape_token_src_path, + ### **dest_spacetoken, ) @@ -728,6 +802,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken): retry=3, metadata=job_metadata, priority=self.priority, + unmanaged_tokens=True, **dest_spacetoken, ) diff --git a/src/DIRAC/FrameworkSystem/Utilities/TokenManagementUtilities.py b/src/DIRAC/FrameworkSystem/Utilities/TokenManagementUtilities.py index d2572c1d3f7..e059dcb309e 100644 --- a/src/DIRAC/FrameworkSystem/Utilities/TokenManagementUtilities.py +++ b/src/DIRAC/FrameworkSystem/Utilities/TokenManagementUtilities.py @@ -10,11 +10,12 @@ DEFAULT_AT_EXPIRATION_TIME = 1200 -def getIdProviderClient(userGroup: str, idProviderClientName: str = None): +def getIdProviderClient(userGroup: str, idProviderClientName: str = None, client_name_prefix: str = ""): """Get an IdProvider client :param userGroup: group name :param idProviderClientName: name of an identity provider in the DIRAC CS + :param client_name_prefix: prefix of the client in the CS options """ # Get IdProvider credentials from CS if not idProviderClientName and userGroup: @@ -23,7 +24,7 @@ def getIdProviderClient(userGroup: str, idProviderClientName: str = None): return S_ERROR(f"The {userGroup} group belongs to the VO that is not tied to any Identity Provider.") # Prepare the client instance of the appropriate IdP - return IdProviderFactory().getIdProvider(idProviderClientName) + return IdProviderFactory().getIdProvider(idProviderClientName, client_name_prefix=client_name_prefix) def getCachedKey( diff --git a/src/DIRAC/Resources/IdProvider/IdProviderFactory.py b/src/DIRAC/Resources/IdProvider/IdProviderFactory.py index a0399ccae7b..a165b1b681c 100644 --- a/src/DIRAC/Resources/IdProvider/IdProviderFactory.py +++ b/src/DIRAC/Resources/IdProvider/IdProviderFactory.py @@ -1,6 +1,7 @@ -""" The Identity Provider Factory instantiates IdProvider objects - according to their configuration +"""The Identity Provider Factory instantiates IdProvider objects +according to their configuration """ + import jwt from DIRAC import S_OK, S_ERROR, gLogger, gConfig @@ -40,11 +41,12 @@ def getIdProviderFromToken(self, accessToken): return result return self.getIdProvider(result["Value"]) - def getIdProvider(self, name, **kwargs): + def getIdProvider(self, name, client_name_prefix="", **kwargs): """This method returns a IdProvider instance corresponding to the supplied name. :param str name: the name of the Identity Provider client + :param str client_name_prefix: name of the client of the IdP :return: S_OK(IdProvider)/S_ERROR() """ @@ -68,8 +70,14 @@ def getIdProvider(self, name, **kwargs): if not result["OK"]: self.log.error("Failed to read configuration", f"{name}: {result['Message']}") return result + pDict = result["Value"] + if client_name_prefix: + client_name_prefix = client_name_prefix + "_" + pDict["client_id"] = pDict[f"{client_name_prefix}client_id"] + pDict["client_secret"] = pDict[f"{client_name_prefix}client_secret"] + pDict.update(kwargs) pDict["ProviderName"] = name