Skip to content

Commit beafcc5

Browse files
authored
Merge pull request #8069 from DIRACGridBot/cherry-pick-2-9872ba6fc-integration
[sweep:integration] feat: method findFileByMetadata
2 parents 87f2d23 + f725a81 commit beafcc5

File tree

2 files changed

+375
-0
lines changed

2 files changed

+375
-0
lines changed

src/DIRAC/Resources/Catalog/RucioFileCatalogClient.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class RucioFileCatalogClient(FileCatalogClientBase):
6060
"resolveDataset",
6161
"getLFNForPFN",
6262
"getUserDirectory",
63+
"getFileUserMetadata",
64+
"findFilesByMetadata",
6365
]
6466

6567
WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + [
@@ -78,13 +80,15 @@ class RucioFileCatalogClient(FileCatalogClientBase):
7880
"createDataset",
7981
"changePathOwner",
8082
"changePathMode",
83+
"setMetadata",
8184
]
8285

8386
NO_LFN_METHODS = FileCatalogClientBase.NO_LFN_METHODS + [
8487
"getUserDirectory",
8588
"createUserDirectory",
8689
"createUserMapping",
8790
"removeUserDirectory",
91+
"findFilesByMetadata",
8892
]
8993

9094
ADMIN_METHODS = FileCatalogClientBase.ADMIN_METHODS + [
@@ -697,3 +701,193 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False):
697701
except Exception as err:
698702
return S_ERROR(str(err))
699703
return S_OK(resDict)
704+
705+
@checkCatalogArguments
706+
def getFileUserMetadata(self, path):
707+
"""Get the meta data attached to a file, but also to
708+
all its parents
709+
"""
710+
path = next(iter(path))
711+
resDict = {"Successful": {}, "Failed": {}}
712+
try:
713+
did = self.__getDidsFromLfn(path)
714+
meta = next(self.client.get_metadata_bulk(dids=[did], inherit=True, plugin="ALL"))
715+
if meta["did_type"] == "FILE": # Should we also return the metadata for the directories ?
716+
resDict["Successful"][path] = meta
717+
else:
718+
resDict["Failed"][path] = "Not a file"
719+
except DataIdentifierNotFound:
720+
resDict["Failed"][path] = "No such file or directory"
721+
except Exception as err:
722+
return S_ERROR(str(err))
723+
return S_OK(resDict)
724+
725+
@checkCatalogArguments
726+
def getFileUserMetadataBulk(self, lfns):
727+
"""Get the meta data attached to a list of files, but also to
728+
all their parents
729+
"""
730+
resDict = {"Successful": {}, "Failed": {}}
731+
dids = []
732+
lfnChunks = breakListIntoChunks(lfns, 1000)
733+
for lfnList in lfnChunks:
734+
try:
735+
dids = [self.__getDidsFromLfn(lfn) for lfn in lfnList]
736+
except Exception as err:
737+
return S_ERROR(str(err))
738+
try:
739+
for met in self.client.get_metadata_bulk(dids=dids, inherit=True):
740+
lfn = met["name"]
741+
resDict["Successful"][lfn] = met
742+
for lfn in lfnList:
743+
if lfn not in resDict["Successful"]:
744+
resDict["Failed"][lfn] = "No such file or directory"
745+
except Exception as err:
746+
return S_ERROR(str(err))
747+
return S_OK(resDict)
748+
749+
@checkCatalogArguments
750+
def setMetadataBulk(self, pathMetadataDict):
751+
"""Add metadata for the given paths"""
752+
resDict = {"Successful": {}, "Failed": {}}
753+
dids = []
754+
for path, metadataDict in pathMetadataDict.items():
755+
try:
756+
did = self.__getDidsFromLfn(path)
757+
did["meta"] = metadataDict
758+
dids.append(did)
759+
except Exception as err:
760+
return S_ERROR(str(err))
761+
try:
762+
self.client.set_dids_metadata_bulk(dids=dids, recursive=False)
763+
except Exception as err:
764+
return S_ERROR(str(err))
765+
return S_OK(resDict)
766+
767+
@checkCatalogArguments
768+
def setMetadata(self, path, metadataDict):
769+
"""Add metadata to the given path"""
770+
pathMetadataDict = {}
771+
path = next(iter(path))
772+
pathMetadataDict[path] = metadataDict
773+
return self.setMetadataBulk(pathMetadataDict)
774+
775+
@checkCatalogArguments
776+
def removeMetadata(self, path, metadata):
777+
"""Remove the specified metadata for the given file"""
778+
resDict = {"Successful": {}, "Failed": {}}
779+
try:
780+
did = self.__getDidsFromLfn(path)
781+
failedMeta = {}
782+
# TODO : Implement bulk delete_metadata method in Rucio
783+
for meta in metadata:
784+
try:
785+
self.client.delete_metadata(scope=did["scope"], name=did["name"], key=meta)
786+
except DataIdentifierNotFound:
787+
return S_ERROR(f"File {path} not found")
788+
except Exception as err:
789+
failedMeta[meta] = str(err)
790+
791+
if failedMeta:
792+
metaExample = list(failedMeta)[0]
793+
result = S_ERROR(f"Failed to remove {len(failedMeta)} metadata, e.g. {failedMeta[metaExample]}")
794+
result["FailedMetadata"] = failedMeta
795+
except Exception as err:
796+
return S_ERROR(str(err))
797+
return S_OK()
798+
799+
def findFilesByMetadata(self, metadataFilterDict, path="/", timeout=120):
800+
"""find the dids for the given metadataFilterDict"""
801+
ruciometadataFilterDict = self.__transform_DIRAC_filter_dict_to_Rucio_filter_dict([metadataFilterDict])
802+
dids = []
803+
for scope in self.scopes:
804+
try:
805+
dids.extend(self.client.list_dids(scope=scope, filters=ruciometadataFilterDict, did_type="all"))
806+
except Exception as err:
807+
return S_ERROR(str(err))
808+
return S_OK(dids)
809+
810+
def __transform_DIRAC_operator_to_Rucio(self, DIRAC_dict):
811+
"""
812+
Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary.
813+
This method takes a dictionary with DIRAC operators and converts it to a
814+
dictionary with Rucio-compatible operators based on predefined mappings.
815+
for example :
816+
input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
817+
return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
818+
"""
819+
rucio_dict = {}
820+
operator_mapping = {">": ".gt", "<": ".lt", ">=": ".gte", "<=": ".lte", "=<": ".lte", "!=": ".ne", "=": ""}
821+
822+
for key, value in DIRAC_dict.items():
823+
if isinstance(value, dict):
824+
for operator, num in value.items():
825+
if operator in operator_mapping:
826+
mapped_operator = operator_mapping[operator]
827+
rucio_dict[f"{key}{mapped_operator}"] = num
828+
else:
829+
rucio_dict[key] = value
830+
831+
return rucio_dict
832+
833+
def __transform_dict_with_in_operateur(self, DIRAC_dict_with_in_operator_list):
834+
"""
835+
Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries,
836+
expanding the 'in' operator into individual dictionaries while preserving other keys.
837+
example
838+
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
839+
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }]
840+
"""
841+
if not isinstance(DIRAC_dict_with_in_operator_list, list):
842+
raise TypeError("DIRAC_dict_with_in_operator_list must be a list of dictionaries")
843+
844+
combined_dict_list = [] # Final list of transformed dictionaries
845+
break_reached = False # Boolean to track if 'in' was found and processed in any dictionary
846+
847+
# Process each dictionary in the input list
848+
for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list:
849+
if not isinstance(DIRAC_dict_with_in_operator, dict):
850+
raise TypeError("Each element in DIRAC_dict_with_in_operator_list must be a dictionary")
851+
852+
in_key = None
853+
in_values = []
854+
855+
# Extract the key with 'in' operator and the list of values
856+
for key, value in DIRAC_dict_with_in_operator.items():
857+
if isinstance(value, dict) and "in" in value:
858+
in_key = key
859+
in_values = value["in"]
860+
break_reached = True # 'in' operator found
861+
break
862+
863+
# If an 'in' key exists, expand the dictionary for each value
864+
if in_key:
865+
for val in in_values:
866+
# Copy the original dictionary and replace the 'in' key
867+
new_dict = DIRAC_dict_with_in_operator.copy()
868+
new_dict[in_key] = val # Replace the 'in' key with the current value
869+
combined_dict_list.append(new_dict)
870+
else:
871+
# If no 'in' key, simply add the input dictionary as-is
872+
combined_dict_list.append(DIRAC_dict_with_in_operator)
873+
874+
return combined_dict_list, break_reached
875+
876+
def __transform_DIRAC_filter_dict_to_Rucio_filter_dict(self, DIRAC_filter_dict_list):
877+
"""
878+
Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries.
879+
This method takes a list of filter dictionaries used in DIRAC and converts them into a format
880+
that is compatible with Rucio. It handles the transformation of operators and expands filters
881+
that use the 'in' operator.
882+
example:
883+
input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
884+
return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
885+
"""
886+
break_detected = True
887+
DIRAC_expanded_filters = DIRAC_filter_dict_list
888+
while break_detected:
889+
DIRAC_expanded_filters, break_detected = self.__transform_dict_with_in_operateur(DIRAC_expanded_filters)
890+
Rucio_filters = []
891+
for filter in DIRAC_expanded_filters:
892+
Rucio_filters.append(self.__transform_DIRAC_operator_to_Rucio(filter))
893+
return Rucio_filters

0 commit comments

Comments
 (0)