|
12 | 12 | import errno
|
13 | 13 | import os
|
14 | 14 | import re
|
| 15 | +import time |
15 | 16 | from datetime import datetime, timedelta
|
| 17 | +from hashlib import md5 |
16 | 18 |
|
17 | 19 | # # from DIRAC
|
18 | 20 | from DIRAC import S_ERROR, S_OK
|
|
24 | 26 | from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
|
25 | 27 | from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
|
26 | 28 | from DIRAC.DataManagementSystem.Client.DataManager import DataManager
|
| 29 | +from DIRAC.RequestManagementSystem.Client.File import File |
| 30 | +from DIRAC.RequestManagementSystem.Client.Operation import Operation |
27 | 31 | from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
|
| 32 | +from DIRAC.RequestManagementSystem.Client.Request import Request |
| 33 | +from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator |
28 | 34 | from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
|
29 | 35 | from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient
|
30 | 36 | from DIRAC.Resources.Storage.StorageElement import StorageElement
|
@@ -74,6 +80,7 @@ def __init__(self, *args, **kwargs):
|
74 | 80 | self.logSE = "LogSE"
|
75 | 81 | # # enable/disable execution
|
76 | 82 | self.enableFlag = "True"
|
| 83 | + self.cleanWithRMS = False |
77 | 84 |
|
78 | 85 | self.dataProcTTypes = ["MCSimulation", "Merge"]
|
79 | 86 | self.dataManipTTypes = ["Replication", "Removal"]
|
@@ -113,6 +120,7 @@ def initialize(self):
|
113 | 120 | self.logSE = Operations().getValue("/LogStorage/LogSE", self.logSE)
|
114 | 121 | self.log.info(f"Will remove logs found on storage element: {self.logSE}")
|
115 | 122 |
|
| 123 | + self.cleanWithRMS = self.am_getOption("CleanWithRMS", self.cleanWithRMS) |
116 | 124 | # # transformation client
|
117 | 125 | self.transClient = TransformationClient()
|
118 | 126 | # # wms client
|
@@ -387,6 +395,11 @@ def cleanContent(self, directory):
|
387 | 395 | # Executing with shifter proxy
|
388 | 396 | gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false")
|
389 | 397 | failed = {}
|
| 398 | + if self.cleanWithRMS: |
| 399 | + res = self.__submitRemovalRequests(filesFound, 0) |
| 400 | + gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") |
| 401 | + return res |
| 402 | + |
390 | 403 | for chunkId, filesChunk in enumerate(breakListIntoChunks(filesFound, 500)):
|
391 | 404 | self.log.info("Removing chunk", chunkId)
|
392 | 405 | res = DataManager().removeFile(filesChunk, force=True)
|
@@ -567,10 +580,13 @@ def cleanMetadataCatalogFiles(self, transID):
|
567 | 580 | self.log.info("No files found for transID", transID)
|
568 | 581 | return S_OK()
|
569 | 582 |
|
570 |
| - # Executing with shifter proxy |
571 |
| - gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false") |
572 |
| - res = DataManager().removeFile(fileToRemove, force=True) |
573 |
| - gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") |
| 583 | + if self.cleanWithRMS: |
| 584 | + res = self.__submitRemovalRequests(fileToRemove, transID) |
| 585 | + else: |
| 586 | + # Executing with shifter proxy |
| 587 | + gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "false") |
| 588 | + res = DataManager().removeFile(fileToRemove, force=True) |
| 589 | + gConfigurationData.setOptionInCFG("/DIRAC/Security/UseServerCertificate", "true") |
574 | 590 |
|
575 | 591 | if not res["OK"]:
|
576 | 592 | return res
|
@@ -697,3 +713,51 @@ def __removeWMSTasks(self, transJobIDs):
|
697 | 713 | return S_ERROR("Failed to remove all the request from RequestDB")
|
698 | 714 | self.log.info("Successfully removed all the associated failover requests")
|
699 | 715 | return S_OK()
|
| 716 | + |
| 717 | + def __submitRemovalRequests(self, lfns, transID=0): |
| 718 | + """Create removal requests for given lfns. |
| 719 | +
|
| 720 | + :param list lfns: list of lfns to be removed |
| 721 | + :param int transID: transformationID, only used in RequestName |
| 722 | + :returns: S_ERROR/S_OK |
| 723 | + """ |
| 724 | + for index, lfnList in enumerate(breakListIntoChunks(lfns, 300)): |
| 725 | + oRequest = Request() |
| 726 | + requestName = "TCA_{transID}_{index}_{md5(repr(time.time())).hexdigest()[:5]}" |
| 727 | + oRequest.RequestName = requestName |
| 728 | + oOperation = Operation() |
| 729 | + oOperation.Type = "RemoveFile" |
| 730 | + oOperation.TargetSE = "All" |
| 731 | + resMeta = self.metadataClient.getFileMetadata(lfnList) |
| 732 | + if not resMeta["OK"]: |
| 733 | + self.log.error("Cannot get file metadata", resMeta["Message"]) |
| 734 | + return resMeta |
| 735 | + if resMeta["Value"]["Failed"]: |
| 736 | + self.log.warning( |
| 737 | + "Could not get the file metadata of the following, so skipping them:", resMeta["Value"]["Failed"] |
| 738 | + ) |
| 739 | + |
| 740 | + for lfn, lfnInfo in resMeta["Value"]["Successful"].items(): |
| 741 | + rarFile = File() |
| 742 | + rarFile.LFN = lfn |
| 743 | + rarFile.ChecksumType = "ADLER32" |
| 744 | + rarFile.Size = lfnInfo["Size"] |
| 745 | + rarFile.Checksum = lfnInfo["Checksum"] |
| 746 | + rarFile.GUID = lfnInfo["GUID"] |
| 747 | + oOperation.addFile(rarFile) |
| 748 | + |
| 749 | + oRequest.addOperation(oOperation) |
| 750 | + isValid = RequestValidator().validate(oRequest) |
| 751 | + if not isValid["OK"]: |
| 752 | + self.log.error("Request is not valid:", isValid["Message"]) |
| 753 | + return isValid |
| 754 | + result = self.reqClient.putRequest(oRequest) |
| 755 | + if not result["OK"]: |
| 756 | + self.log.error("Failed to submit Request: ", result["Message"]) |
| 757 | + return result |
| 758 | + self.log.info( |
| 759 | + "RemoveFiles request %d submitted for %d LFNs" % (result["Value"], len(resMeta["Value"]["Successful"])) |
| 760 | + ) |
| 761 | + |
| 762 | + # after the for loop |
| 763 | + return S_OK() |
0 commit comments