diff --git a/docs/source/AdministratorGuide/Systems/Transformation/index.rst b/docs/source/AdministratorGuide/Systems/Transformation/index.rst index 1fefb4646db..1cc30b546ae 100644 --- a/docs/source/AdministratorGuide/Systems/Transformation/index.rst +++ b/docs/source/AdministratorGuide/Systems/Transformation/index.rst @@ -82,7 +82,6 @@ The TS is a standard DIRAC system, and therefore it is composed by components in | Tables_in_TransformationDB | +------------------------------+ | AdditionalParameters | - | DataFiles | | TaskInputs | | TransformationFileTasks | | TransformationFiles | diff --git a/docs/source/DeveloperGuide/Systems/Transformation/architecture.rst b/docs/source/DeveloperGuide/Systems/Transformation/architecture.rst index 2bb24ae1843..eeaea9daee7 100644 --- a/docs/source/DeveloperGuide/Systems/Transformation/architecture.rst +++ b/docs/source/DeveloperGuide/Systems/Transformation/architecture.rst @@ -30,7 +30,6 @@ A technical drawing explaining the interactions between the various components f | Tables_in_TransformationDB | +------------------------------+ | AdditionalParameters | - | DataFiles | | TaskInputs | | TransformationFileTasks | | TransformationFiles | diff --git a/src/DIRAC/Resources/Catalog/TSCatalogClient.py b/src/DIRAC/Resources/Catalog/TSCatalogClient.py index bb74cba2890..fbde343a363 100644 --- a/src/DIRAC/Resources/Catalog/TSCatalogClient.py +++ b/src/DIRAC/Resources/Catalog/TSCatalogClient.py @@ -3,8 +3,8 @@ """ from DIRAC import S_OK from DIRAC.Core.Utilities.List import breakListIntoChunks -from DIRAC.Resources.Catalog.Utilities import checkCatalogArguments from DIRAC.Resources.Catalog.FileCatalogClientBase import FileCatalogClientBase +from DIRAC.Resources.Catalog.Utilities import checkCatalogArguments class TSCatalogClient(FileCatalogClientBase): @@ -12,7 +12,7 @@ class TSCatalogClient(FileCatalogClientBase): """Exposes the catalog functionality available in the DIRAC/TransformationHandler""" # List of common File Catalog methods implemented by this client - WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + ["addFile", "removeFile", "setMetadata"] + WRITE_METHODS = FileCatalogClientBase.WRITE_METHODS + ["addFile", "setMetadata"] NO_LFN_METHODS = ["setMetadata"] @@ -25,21 +25,6 @@ def addFile(self, lfns, force=False): rpcClient = self._getRPC() return rpcClient.addFile(lfns, force) - @checkCatalogArguments - def removeFile(self, lfns): - rpcClient = self._getRPC() - successful = {} - failed = {} - listOfLists = breakListIntoChunks(lfns, 100) - for fList in listOfLists: - res = rpcClient.removeFile(fList) - if not res["OK"]: - return res - successful.update(res["Value"]["Successful"]) - failed.update(res["Value"]["Failed"]) - resDict = {"Successful": successful, "Failed": failed} - return S_OK(resDict) - def setMetadata(self, path, metadatadict): """Set metadata parameter for the given path diff --git a/src/DIRAC/TransformationSystem/Client/TransformationCLI.py b/src/DIRAC/TransformationSystem/Client/TransformationCLI.py index 1821893d53c..d98983b4319 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationCLI.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationCLI.py @@ -4,13 +4,13 @@ DIRAC.initialize() # Initialize configuration -from DIRAC.Core.Base.CLI import CLI from DIRAC.Core.Base.API import API +from DIRAC.Core.Base.CLI import CLI from DIRAC.Core.Utilities.Subprocess import shellCall +from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.TransformationSystem.Client import TransformationFilesStatus from DIRAC.TransformationSystem.Client.Transformation import Transformation from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient -from DIRAC.Resources.Catalog.FileCatalog import FileCatalog def printDict(dictionary): @@ -563,25 +563,6 @@ def do_addFile(self, args): for lfn in sorted(res["Value"]["Successful"]): print(f"added {lfn}") - def do_removeFile(self, args): - """Remove file from transformation DB - - usage: removeFile [lfn] - """ - argss = args.split() - if not len(argss) > 0: - print("no files supplied") - return - res = self.transClient.removeFile(argss) - if not res["OK"]: - print(f"failed to remove any files: {res['Message']}") - return - for lfn in sorted(res["Value"]["Failed"]): - error = res["Value"]["Failed"][lfn] - print(f"failed to remove {lfn}: {error}") - for lfn in sorted(res["Value"]["Successful"]): - print(f"removed {lfn}") - def do_addReplica(self, args): """Add new replica to the transformation DB diff --git a/src/DIRAC/TransformationSystem/Client/TransformationClient.py b/src/DIRAC/TransformationSystem/Client/TransformationClient.py index 2ff3945e45f..3e57b495703 100644 --- a/src/DIRAC/TransformationSystem/Client/TransformationClient.py +++ b/src/DIRAC/TransformationSystem/Client/TransformationClient.py @@ -46,7 +46,6 @@ class TransformationClient(Client): File/directory manipulation methods (the remainder of the interface can be found below) getFileSummary(lfns) - exists(lfns) Web monitoring tools diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index a08c95669ed..ae61d5fea26 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -7,21 +7,20 @@ """ import re -import time import threading - +import time from errno import ENOENT -from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK, gLogger +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.Base.DB import DB -from DIRAC.Core.Utilities.DErrno import cmpError -from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.Core.Security.ProxyInfo import getProxyInfo -from DIRAC.Core.Utilities.List import stringListToString, intListToString, breakListIntoChunks +from DIRAC.Core.Utilities.DErrno import cmpError +from DIRAC.Core.Utilities.List import breakListIntoChunks, intListToString, stringListToString from DIRAC.Core.Utilities.Shifter import setupShifterProxyInEnv -from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.Core.Utilities.Subprocess import pythonCall from DIRAC.DataManagementSystem.Client.MetaQuery import MetaQuery +from DIRAC.Resources.Catalog.FileCatalog import FileCatalog MAX_ERROR_COUNT = 10 @@ -85,7 +84,7 @@ def __init__(self, dbname=None, dbconfig=None, dbIn=None, parentLogger=None): self.TRANSFILEPARAMS = [ "TransformationID", - "FileID", + "LFN", "Status", "TaskID", "TargetSE", @@ -95,7 +94,7 @@ def __init__(self, dbname=None, dbconfig=None, dbIn=None, parentLogger=None): "InsertedTime", ] - self.TRANSFILETASKPARAMS = ["TransformationID", "FileID", "TaskID"] + self.TRANSFILETASKPARAMS = ["TransformationID", "LFN", "TaskID"] self.TASKSPARAMS = [ "TaskID", @@ -242,7 +241,7 @@ def addTransformation( gLogger.error("Could not insert files, now deleting", res["Message"]) return self.deleteTransformation(transID, connection=connection) - # Add files to the DataFiles table + # Add files to the TransformationFiles table catalog = FileCatalog() if addFiles and inputMetaQuery: res = catalog.findFilesByMetadata(inputMetaQuery) @@ -253,16 +252,7 @@ def addTransformation( gLogger.notice("filesToAdd", filesToAdd) if filesToAdd: connection = self.__getConnection(connection) - res = self.__addDataFiles(filesToAdd, connection=connection) - if not res["OK"]: - return res - lfnFileIDs = res["Value"] - # Add the files to the transformations - fileIDs = [] - for lfn in filesToAdd: - if lfn in lfnFileIDs: - fileIDs.append(lfnFileIDs[lfn]) - res = self.__addFilesToTransformation(transID, fileIDs, connection=connection) + res = self.__addFilesToTransformation(transID, filesToAdd, connection=connection) if not res["OK"]: gLogger.error("Failed to add files to transformation", f"{transID} {res['Message']}") message = f"Created transformation {transID}" @@ -558,7 +548,7 @@ def addFilesToTransformation(self, transName, lfns, connection=False): """Add a list of LFNs to the transformation directly""" gLogger.info( "TransformationDB.addFilesToTransformation:" - " Attempting to add %s files to transformations: %s" % (len(lfns), transName) + f"Attempting to add {len(lfns)} files to transformation {transName}" ) if not lfns: return S_ERROR("Zero length LFN list") @@ -567,20 +557,13 @@ def addFilesToTransformation(self, transName, lfns, connection=False): return res connection = res["Value"]["Connection"] transID = res["Value"]["TransformationID"] - # Add missing files if necessary (__addDataFiles does the job) - res = self.__addDataFiles(lfns, connection=connection) - if not res["OK"]: - return res - fileIDs = {fileID: lfn for lfn, fileID in res["Value"].items()} - # Attach files to transformation successful = {} - if fileIDs: - res = self.__addFilesToTransformation(transID, list(fileIDs), connection=connection) + if lfns: + res = self.__addFilesToTransformation(transID, lfns, connection=connection) if not res["OK"]: return res - for fileID in fileIDs: - lfn = fileIDs[fileID] - successful[lfn] = "Added" if fileID in res["Value"] else "Present" + for lfn in lfns: + successful[lfn] = "Added" if lfn in res["Value"] else "Present" resDict = {"Successful": successful, "Failed": {}} return S_OK(resDict) @@ -607,8 +590,6 @@ def getTransformationFiles( req = ", ".join(f"df.{x}" if x == "LFN" else f"tf.{x}" for x in columns) req = f"SELECT {req} FROM TransformationFiles tf" - if "LFN" in columns or (condDict and "LFN" in condDict): - req = f"{req} JOIN DataFiles df ON tf.FileID = df.FileID" fixedCondDict = {} if condDict: @@ -713,21 +694,21 @@ def getTransformationFilesCount(self, transName, field, selection=None, connecti countDict["Total"] = sum(countDict.values()) return S_OK(countDict) - def __addFilesToTransformation(self, transID, fileIDs, connection=False): - req = "SELECT FileID from TransformationFiles" - req = req + " WHERE TransformationID = %d AND FileID IN (%s);" % (transID, intListToString(fileIDs)) + def __addFilesToTransformation(self, transID, LFNs, connection=False): + req = "SELECT LFN from TransformationFiles" + req = req + " WHERE TransformationID = %d AND LFN IN (%s);" % (transID, LFNs) res = self._query(req, conn=connection) if not res["OK"]: return res for tupleIn in res["Value"]: - fileIDs.remove(tupleIn[0]) - if not fileIDs: + LFNs.remove(tupleIn[0]) + if not LFNs: return S_OK([]) - values = [(transID, fileID) for fileID in fileIDs] - req = "INSERT INTO TransformationFiles (TransformationID,FileID,LastUpdate,InsertedTime) VALUES (%s, %s, UTC_TIMESTAMP(), UTC_TIMESTAMP())" + values = [(transID, lfn) for lfn in LFNs] + req = "INSERT INTO TransformationFiles (TransformationID,LFN,LastUpdate,InsertedTime) VALUES (%s, %s, UTC_TIMESTAMP(), UTC_TIMESTAMP())" if not (res := self._updatemany(req, values, conn=connection))["OK"]: return res - return S_OK(fileIDs) + return S_OK(LFNs) def __insertExistingTransformationFiles(self, transID, fileTuplesList, connection=False): """Inserting already transformation files in TransformationFiles table (e.g. for deriving transformations)""" @@ -738,13 +719,13 @@ def __insertExistingTransformationFiles(self, transID, fileTuplesList, connectio gLogger.verbose( f"Adding first {len(fileTuples)} files in TransformationFiles (out of {len(fileTuplesList)})" ) - req = "INSERT INTO TransformationFiles (TransformationID,Status,TaskID,FileID,TargetSE,UsedSE,LastUpdate) VALUES" + req = ( + "INSERT INTO TransformationFiles (TransformationID,Status,TaskID,LFN,TargetSE,UsedSE,LastUpdate) VALUES" + ) candidates = False for ft in fileTuples: - _lfn, originalID, fileID, status, taskID, targetSE, usedSE, _errorCount, _lastUpdate, _insertTime = ft[ - :10 - ] + _lfn, originalID, lfn, status, taskID, targetSE, usedSE, _errorCount, _lastUpdate, _insertTime = ft[:10] if status not in ("Removed",): candidates = True if not re.search("-", status): @@ -752,7 +733,7 @@ def __insertExistingTransformationFiles(self, transID, fileTuplesList, connectio if taskID: # Should be readable up to 999,999 tasks: that field is an int(11) in the DB, not a string taskID = 1000000 * int(originalID) + int(taskID) - req = f"{req} ({transID},'{status}','{taskID}',{fileID},'{targetSE}','{usedSE}',UTC_TIMESTAMP())," + req = f"{req} ({transID},'{status}','{taskID}',{lfn},'{targetSE}','{usedSE}',UTC_TIMESTAMP())," if not candidates: continue @@ -763,71 +744,38 @@ def __insertExistingTransformationFiles(self, transID, fileTuplesList, connectio return S_OK() - def __assignTransformationFile(self, transID, taskID, se, fileIDs, connection=False): + def __assignTransformationFile(self, transID, taskID, se, LFNs, connection=False): """Make necessary updates to the TransformationFiles table for the newly created task""" req = "UPDATE TransformationFiles SET TaskID='%d',UsedSE='%s',Status='Assigned',LastUpdate=UTC_TIMESTAMP()" - req = (req + " WHERE TransformationID = %d AND FileID IN (%s);") % ( + req = (req + " WHERE TransformationID = %d AND LFN IN (%s);") % ( taskID, se, transID, - intListToString(fileIDs), + LFNs, ) res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to assign file to task", res["Message"]) - values = [(transID, fileID, taskID) for fileID in fileIDs] - req = "INSERT INTO TransformationFileTasks (TransformationID,FileID,TaskID) VALUES (%s, %s, %s)" + values = [(transID, lfn, taskID) for lfn in LFNs] + req = "INSERT INTO TransformationFileTasks (TransformationID,LFN,TaskID) VALUES (%s, %s, %s)" res = self._updatemany(req, values, conn=connection) if not res["OK"]: gLogger.error("Failed to assign file to task", res["Message"]) return res - def __setTransformationFileStatus(self, fileIDs, status, connection=False): - req = f"UPDATE TransformationFiles SET Status = '{status}' WHERE FileID IN ({intListToString(fileIDs)});" - res = self._update(req, conn=connection) - if not res["OK"]: - gLogger.error("Failed to update file status", res["Message"]) - return res - - def __setTransformationFileUsedSE(self, fileIDs, usedSE, connection=False): - req = f"UPDATE TransformationFiles SET UsedSE = '{usedSE}' WHERE FileID IN ({intListToString(fileIDs)});" - res = self._update(req, conn=connection) - if not res["OK"]: - gLogger.error("Failed to update file usedSE", res["Message"]) - return res - - def __resetTransformationFile(self, transID, taskID, connection=False): - req = ( - "UPDATE TransformationFiles SET TaskID=NULL, UsedSE='Unknown', Status='Unused'\ - WHERE TransformationID = %d AND TaskID=%d;" - % (transID, taskID) + def __resetTransformationFile(self, transformationID, taskID, connection=False): + query = ( + f"UPDATE TransformationFiles SET TaskID=NULL, UsedSE='Unknown', Status='Unused' " + f"WHERE TransformationID = {transformationID} AND TaskID={taskID};" ) - res = self._update(req, conn=connection) - if not res["OK"]: - gLogger.error("Failed to reset transformation file", res["Message"]) - return res + result = self._update(query, conn=connection) + if not result["OK"]: + gLogger.error("Failed to reset transformation file", result["Message"]) + return result def __deleteTransformationFiles(self, transID, connection=False): - """Remove the files associated to a transformation. - It also tries to remove the associated DataFiles. - If these DataFiles are still used by other transformations, they - will be kept thanks to the ForeignKey constraint. - In the very unlikely event of removing a file that was juuuuuuuust about to be - used by another transformation, well, tough luck, but the other transformation - will succeed at the next attempt to insert the file. - """ - # The IGNORE keyword will make sure we do not abort the full removal - # on a foreign key error - # https://dev.mysql.com/doc/refman/5.7/en/sql-mode.html#ignore-strict-comparison - req = ( - "DELETE IGNORE tf, df \ - FROM TransformationFiles tf \ - JOIN DataFiles df \ - ON tf.FileID=df.FileID \ - WHERE TransformationID = %d;" - % transID - ) - res = self._update(req, conn=connection) + """Remove the files associated to a transformation.""" + res = self._update(f"DELETE FROM TransformationFiles WHERE TransformationID = {transID};", conn=connection) if not res["OK"]: gLogger.error("Failed to delete transformation files", res["Message"]) return res @@ -1269,87 +1217,31 @@ def __deleteTransformationLog(self, transID, connection=False): req = f"DELETE FROM TransformationLog WHERE TransformationID={transID}" return self._update(req, conn=connection) - ########################################################################### - # - # These methods manipulate the DataFiles table - # - - def __getFileIDsForLfns(self, lfns, connection=False): - """Get file IDs for the given list of lfns - warning: if the file is not present, we'll see no errors - """ - req = f"SELECT LFN,FileID FROM DataFiles WHERE LFN in ({stringListToString(lfns)});" - res = self._query(req, conn=connection) - if not res["OK"]: - return res - lfns = dict(res["Value"]) - # Reverse dictionary - fids = {fileID: lfn for lfn, fileID in lfns.items()} - return S_OK((fids, lfns)) - - def __getLfnsForFileIDs(self, fileIDs, connection=False): - """Get lfns for the given list of fileIDs""" - req = f"SELECT LFN,FileID FROM DataFiles WHERE FileID in ({stringListToString(fileIDs)});" - res = self._query(req, conn=connection) - if not res["OK"]: - return res - fids = dict(res["Value"]) - # Reverse dictionary - lfns = {fileID: lfn for lfn, fileID in fids.items()} - return S_OK((fids, lfns)) - - def __addDataFiles(self, lfns, connection=False): - """Add a file to the DataFiles table and retrieve the FileIDs""" - res = self.__getFileIDsForLfns(lfns, connection=connection) - if not res["OK"]: - return res - # Insert only files not found, and assume the LFN is unique in the table - lfnFileIDs = res["Value"][1] - for lfn in set(lfns) - set(lfnFileIDs): - req = f"INSERT INTO DataFiles (LFN,Status) VALUES ('{lfn}','New');" - res = self._update(req, conn=connection) - # If the LFN is duplicate we get an error and ignore it - if res["OK"]: - lfnFileIDs[lfn] = res["lastRowId"] - # If two transformations are adding files at the same time we will have missed some LFNs - missedLfns = set(lfns) - set(lfnFileIDs) - if missedLfns: - res = self.__getFileIDsForLfns(missedLfns, connection=connection) - if not res["OK"]: - return res - lfnFileIDs.update(res["Value"][1]) - return S_OK(lfnFileIDs) - - def __setDataFileStatus(self, fileIDs, status, connection=False): - """Set the status of the supplied files""" - req = f"UPDATE DataFiles SET Status = '{status}' WHERE FileID IN ({intListToString(fileIDs)});" - return self._update(req, conn=connection) - ########################################################################### # # These methods manipulate multiple tables # - def addTaskForTransformation(self, transID, lfns=None, se="Unknown", connection=False): + def addTaskForTransformation(self, transID, LFNs=None, se="Unknown", connection=False): """Create a new task with the supplied files for a transformation.""" res = self._getConnectionTransID(connection, transID) if not res["OK"]: return res - if lfns is None: - lfns = [] + if LFNs is None: + LFNs = [] connection = res["Value"]["Connection"] transID = res["Value"]["TransformationID"] # Be sure the all the supplied LFNs are known to the database for the supplied transformation - fileIDs = [] - if lfns: + lfns = [] + if LFNs: res = self.getTransformationFiles( - condDict={"TransformationID": transID, "LFN": lfns}, connection=connection + condDict={"TransformationID": transID, "LFN": LFNs}, connection=connection ) if not res["OK"]: return res foundLfns = set() for fileDict in res["Value"]: - fileIDs.append(fileDict["FileID"]) + lfns.append(fileDict["LFN"]) lfn = fileDict["LFN"] if fileDict["Status"] in self.allowedStatusForTasks: foundLfns.add(lfn) @@ -1390,7 +1282,7 @@ def addTaskForTransformation(self, transID, lfns=None, se="Unknown", connection= if not res["OK"]: self.__removeTransformationTask(transID, taskID, connection=connection) return res - res = self.__assignTransformationFile(transID, taskID, se, fileIDs, connection=connection) + res = self.__assignTransformationFile(transID, taskID, se, lfns, connection=connection) if not res["OK"]: self.__removeTransformationTask(transID, taskID, connection=connection) return res @@ -1514,21 +1406,6 @@ def _getConnectionTransID(self, connection, transName): # #################################################################################### - def exists(self, lfns, connection=False): - """Check the presence of the lfn in the TransformationDB DataFiles table""" - gLogger.info(f"TransformationDB.exists: Attempting to determine existence of {len(lfns)} files.") - res = self.__getFileIDsForLfns(lfns, connection=connection) - if not res["OK"]: - return res - fileIDs = res["Value"][0] - failed = {} - successful = {} - fileIDsValues = set(fileIDs.values()) - for lfn in lfns: - successful[lfn] = lfn in fileIDsValues - resDict = {"Successful": successful, "Failed": failed} - return S_OK(resDict) - def addFile(self, fileDicts, force=False, connection=False): """Add the supplied lfn to the Transformations and to the DataFiles table if it passes the filter""" gLogger.info(f"TransformationDB.addFile: Attempting to add {len(fileDicts)} files.") @@ -1575,34 +1452,6 @@ def addFile(self, fileDicts, force=False, connection=False): res = S_OK({"Successful": successful, "Failed": failed}) return res - def removeFile(self, lfns, connection=False): - """Remove file specified by lfn from the ProcessingDB""" - gLogger.info(f"TransformationDB.removeFile: Attempting to remove {len(lfns)} files.") - failed = {} - successful = {} - connection = self.__getConnection(connection) - if not lfns: - return S_ERROR("No LFNs supplied") - res = self.__getFileIDsForLfns(lfns, connection=connection) - if not res["OK"]: - return res - fileIDs, lfnFilesIDs = res["Value"] - for lfn in lfns: - if lfn not in lfnFilesIDs: - successful[lfn] = "File does not exist" - if fileIDs: - res = self.__setTransformationFileStatus(list(fileIDs), "Deleted", connection=connection) - if not res["OK"]: - return res - res = self.__setDataFileStatus(list(fileIDs), "Deleted", connection=connection) - if not res["OK"]: - return S_ERROR("TransformationDB.removeFile: Failed to remove files.") - for lfn in lfnFilesIDs: - if lfn not in failed: - successful[lfn] = True - resDict = {"Successful": successful, "Failed": failed} - return S_OK(resDict) - def addDirectory(self, path, force=False): """Adds all the files stored in a given directory in file catalog""" gLogger.info(f"TransformationDB.addDirectory: Attempting to populate {path}.") diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.sql b/src/DIRAC/TransformationSystem/DB/TransformationDB.sql index 1a56a0f27c6..7e2343d94fe 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.sql +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.sql @@ -50,16 +50,6 @@ CREATE TABLE Transformations( INDEX(Type) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; --- ------------------------------------------------------------------------------- -DROP TABLE IF EXISTS DataFiles; -CREATE TABLE DataFiles( - FileID INTEGER NOT NULL AUTO_INCREMENT, - LFN VARCHAR(255) UNIQUE NOT NULL DEFAULT '', - Status varchar(32) DEFAULT 'AprioriGood', - INDEX(Status), - INDEX(LFN), - PRIMARY KEY(FileID) -) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- ------------------------------------------------------------------------------- DROP TABLE IF EXISTS AdditionalParameters; @@ -108,34 +98,33 @@ FOR EACH ROW SET NEW.TaskID = (SELECT @last:= IFNULL(MAX(TaskID) + 1, 1) FROM Tr -- ------------------------------------------------------------------------------- DROP TABLE IF EXISTS TransformationFiles; -CREATE TABLE TransformationFiles( +CREATE TABLE TransformationFiles ( TransformationID INTEGER NOT NULL, - FileID INTEGER NOT NULL, + LFN VARCHAR(255) NOT NULL DEFAULT '', Status VARCHAR(32) DEFAULT 'Unused', - ErrorCount INTEGER(4) NOT NULL DEFAULT 0, + ErrorCount INTEGER NOT NULL DEFAULT 0, TaskID INTEGER, TargetSE VARCHAR(255) DEFAULT 'Unknown', UsedSE VARCHAR(255) DEFAULT 'Unknown', LastUpdate DATETIME, InsertedTime DATETIME, - PRIMARY KEY(TransformationID, FileID), - INDEX(TransformationID), - INDEX(Status), - INDEX(FileID), - INDEX(TransformationID,Status), - FOREIGN KEY(TransformationID) REFERENCES Transformations(TransformationID), - FOREIGN KEY(FileID) REFERENCES DataFiles(FileID) + PRIMARY KEY (TransformationID, LFN), + INDEX (TransformationID), + INDEX (Status), + INDEX (LFN), + INDEX (TransformationID, Status), + FOREIGN KEY (TransformationID) REFERENCES Transformations(TransformationID) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- ------------------------------------------------------------------------------- DROP TABLE IF EXISTS TransformationFileTasks; CREATE TABLE TransformationFileTasks( TransformationID INTEGER NOT NULL, - FileID INTEGER NOT NULL, + LFN VARCHAR(255) NOT NULL DEFAULT '', TaskID INTEGER NOT NULL, - PRIMARY KEY(TransformationID, FileID, TaskID), + PRIMARY KEY(TransformationID, LFN, TaskID), FOREIGN KEY(TransformationID) REFERENCES Transformations(TransformationID), - FOREIGN KEY(TransformationID, FileID) REFERENCES TransformationFiles(TransformationID, FileID), + FOREIGN KEY(TransformationID, LFN) REFERENCES TransformationFiles(TransformationID, LFN), FOREIGN KEY(TransformationID, TaskID) REFERENCES TransformationTasks(TransformationID, TaskID) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; diff --git a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py index ee89cfad3cf..e6446c0dcd8 100644 --- a/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py +++ b/src/DIRAC/TransformationSystem/Service/TransformationManagerHandler.py @@ -501,12 +501,6 @@ def export_getFileSummary(cls, lfns): def export_addDirectory(cls, path, force=False): return cls.transformationDB.addDirectory(path, force=force) - types_exists = [list] - - @classmethod - def export_exists(cls, lfns): - return cls.transformationDB.exists(lfns) - types_addFile = [[list, dict, str]] @classmethod @@ -514,15 +508,6 @@ def export_addFile(cls, fileDicts, force=False): """Interface provides { LFN1 : { PFN1, SE1, ... }, LFN2 : { PFN2, SE2, ... } }""" return cls.transformationDB.addFile(fileDicts, force=force) - types_removeFile = [[list, dict]] - - @classmethod - def export_removeFile(cls, lfns): - """Interface provides [ LFN1, LFN2, ... ]""" - if isinstance(lfns, dict): - lfns = list(lfns) - return cls.transformationDB.removeFile(lfns) - types_setMetadata = [str, dict] @classmethod