Skip to content

Commit 2acd553

Browse files
authored
Merge pull request #7316 from fstagni/cherry-pick-2-71c456a1b-integration
[sweep:integration] ElasticJobParametersDB: do not configure the IndexPrefix name
2 parents af7ab51 + 66d4307 commit 2acd553

File tree

4 files changed

+45
-62
lines changed

4 files changed

+45
-62
lines changed

src/DIRAC/Core/Utilities/ElasticSearchDB.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,25 @@
1515
try:
1616
from opensearchpy import OpenSearch as Elasticsearch
1717
from opensearchpy.exceptions import (
18-
ConnectionError as ElasticConnectionError,
19-
TransportError,
18+
ConflictError,
2019
NotFoundError,
2120
RequestError,
22-
ConflictError,
21+
TransportError,
22+
)
23+
from opensearchpy.exceptions import (
24+
ConnectionError as ElasticConnectionError,
2325
)
2426
from opensearchpy.helpers import BulkIndexError, bulk
2527
except ImportError:
2628
from elasticsearch import Elasticsearch
2729
from elasticsearch.exceptions import (
28-
ConnectionError as ElasticConnectionError,
29-
TransportError,
30+
ConflictError,
3031
NotFoundError,
3132
RequestError,
32-
ConflictError,
33+
TransportError,
34+
)
35+
from elasticsearch.exceptions import (
36+
ConnectionError as ElasticConnectionError,
3337
)
3438
from elasticsearch.helpers import BulkIndexError, bulk
3539

@@ -39,7 +43,7 @@
3943
except ImportError:
4044
from opensearch_dsl import A, Q, Search
4145
except ImportError:
42-
from elasticsearch_dsl import Search, Q, A
46+
from elasticsearch_dsl import A, Q, Search
4347

4448

4549
from DIRAC import S_ERROR, S_OK, gLogger

src/DIRAC/Interfaces/API/Dirac.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def submitJob(self, job, mode="wms"):
127127
128128
Example usage:
129129
130-
>>> print dirac.submitJob(job)
130+
>>> print(dirac.submitJob(job))
131131
{'OK': True, 'Value': '12345'}
132132
133133
:param job: Instance of Job class or JDL string
@@ -218,7 +218,7 @@ def getInputDataCatalog(self, lfns, siteName="", fileName="pool_xml_catalog.xml"
218218
219219
Example usage:
220220
221-
>>> print print d.getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml')
221+
>>> print(getInputDataCatalog('/lhcb/a/b/c/00001680_00000490_5.dst',None,'myCat.xml'))
222222
{'Successful': {'<LFN>': {'pfntype': 'ROOT_All', 'protocol': 'SRM2',
223223
'pfn': '<PFN>', 'turl': '<TURL>', 'guid': '3E3E097D-0AC0-DB11-9C0A-00188B770645',
224224
'se': 'CERN-disk'}}, 'Failed': [], 'OK': True, 'Value': ''}
@@ -677,7 +677,7 @@ def getReplicas(self, lfns, active=True, preferDisk=False, diskOnly=False, print
677677
678678
Example usage:
679679
680-
>>> print dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
680+
>>> print(dirac.getReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
681681
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
682682
{'CERN-RDST':
683683
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
@@ -733,7 +733,7 @@ def getReplicasForJobs(self, lfns, diskOnly=False, printOutput=False):
733733
734734
Example usage:
735735
736-
>>> print dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
736+
>>> print(dirac.getReplicasForJobs('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
737737
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
738738
{'CERN-RDST':
739739
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
@@ -786,7 +786,7 @@ def getAllReplicas(self, lfns, printOutput=False):
786786
787787
Example usage:
788788
789-
>>> print dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
789+
>>> print(dirac.getAllReplicas('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
790790
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
791791
{'CERN-RDST':
792792
'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'}},
@@ -908,7 +908,7 @@ def getLfnMetadata(self, lfns, printOutput=False):
908908
909909
Example usage:
910910
911-
>>> print dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst')
911+
>>> print(dirac.getLfnMetadata('/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst'))
912912
{'OK': True, 'Value': {'Successful': {'/lhcb/data/CCRC08/RDST/00000106/0000/00000106_00006321_1.rdst':
913913
{'Status': '-', 'Size': 619475828L, 'GUID': 'E871FBA6-71EA-DC11-8F0C-000E0C4DEB4B', 'ChecksumType': 'AD',
914914
'CheckSumValue': ''}}, 'Failed': {}}}
@@ -962,7 +962,7 @@ def addFile(self, lfn, fullPath, diracSE, fileGuid=None, printOutput=False):
962962
963963
Example Usage:
964964
965-
>>> print dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER')
965+
>>> print(dirac.addFile('/lhcb/user/p/paterson/myFile.tar.gz','myFile.tar.gz','CERN-USER'))
966966
{'OK': True, 'Value':{'Failed': {},
967967
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'put': 64.246301889419556,
968968
'register': 1.1102778911590576}}}}
@@ -1003,7 +1003,7 @@ def getFile(self, lfn, destDir="", printOutput=False):
10031003
10041004
Example Usage:
10051005
1006-
>>> print dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz')
1006+
>>> print(dirac.getFile('/lhcb/user/p/paterson/myFile.tar.gz'))
10071007
{'OK': True, 'Value':{'Failed': {},
10081008
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': '/afs/cern.ch/user/p/paterson/myFile.tar.gz'}}}
10091009
@@ -1042,7 +1042,7 @@ def replicateFile(self, lfn, destinationSE, sourceSE="", localCache="", printOut
10421042
10431043
Example Usage:
10441044
1045-
>>> print dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
1045+
>>> print(dirac.replicateFile('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
10461046
{'OK': True, 'Value':{'Failed': {},
10471047
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984,
10481048
'replicate': 56.42345404624939}}}}
@@ -1105,7 +1105,7 @@ def replicate(self, lfn, destinationSE, sourceSE="", printOutput=False):
11051105
11061106
Example Usage:
11071107
1108-
>>> print dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER')
1108+
>>> print(dirac.replicate('/lhcb/user/p/paterson/myFile.tar.gz','CNAF-USER'))
11091109
{'OK': True, 'Value':{'Failed': {},
11101110
'Successful': {'/lhcb/user/p/paterson/test/myFile.tar.gz': {'register': 0.44766902923583984}}}}
11111111
@@ -1145,7 +1145,7 @@ def getAccessURL(self, lfn, storageElement, printOutput=False, protocol=False):
11451145
11461146
Example Usage:
11471147
1148-
>>> print dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW')
1148+
>>> print(dirac.getAccessURL('/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN-RAW'))
11491149
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
11501150
11511151
:param lfn: Logical File Name (LFN)
@@ -1178,7 +1178,7 @@ def getPhysicalFileAccessURL(self, pfn, storageElement, printOutput=False):
11781178
11791179
Example Usage:
11801180
1181-
>>> print dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST')
1181+
>>> print(dirac.getPhysicalFileAccessURL('srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst','CERN_M-DST'))
11821182
{'OK': True, 'Value':{'Failed': {},
11831183
'Successful': {'srm://srm-lhcb.cern.ch/castor/cern.ch/grid/lhcb/data/CCRC08/DST/00000151/0000/00000151_00004848_2.dst': {'RFIO': 'castor://...'}}}}
11841184
@@ -1210,7 +1210,7 @@ def getPhysicalFileMetadata(self, pfn, storageElement, printOutput=False):
12101210
12111211
Example Usage:
12121212
1213-
>>> print dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data
1213+
>>> print(dirac.getPhysicalFileMetadata('srm://srm.grid.sara.nl/pnfs/grid.sara.nl/data)
12141214
/lhcb/data/CCRC08/RAW/LHCb/CCRC/23341/023341_0000039571.raw','NIKHEF-RAW')
12151215
{'OK': True, 'Value': {'Successful': {'srm://...': {'SRM2': 'rfio://...'}}, 'Failed': {}}}
12161216
@@ -1241,7 +1241,7 @@ def removeFile(self, lfn, printOutput=False):
12411241
12421242
Example Usage:
12431243
1244-
>>> print dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw')
1244+
>>> print(dirac.removeFile('LFN:/lhcb/data/CCRC08/RAW/LHCb/CCRC/22808/022808_0000018443.raw'))
12451245
{'OK': True, 'Value':...}
12461246
12471247
:param lfn: Logical File Name (LFN)
@@ -1269,7 +1269,7 @@ def removeReplica(self, lfn, storageElement, printOutput=False):
12691269
12701270
Example Usage:
12711271
1272-
>>> print dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER')
1272+
>>> print(dirac.removeReplica('LFN:/lhcb/user/p/paterson/myDST.dst','CERN-USER'))
12731273
{'OK': True, 'Value':...}
12741274
12751275
:param lfn: Logical File Name (LFN)
@@ -1300,7 +1300,7 @@ def getInputSandbox(self, jobID, outputDir=None):
13001300
13011301
Example Usage:
13021302
1303-
>>> print dirac.getInputSandbox(12345)
1303+
>>> print(dirac.getInputSandbox(12345))
13041304
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
13051305
13061306
:param jobID: JobID
@@ -1348,7 +1348,7 @@ def getOutputSandbox(self, jobID, outputDir=None, oversized=True, noJobDir=False
13481348
13491349
Example Usage:
13501350
1351-
>>> print dirac.getOutputSandbox(12345)
1351+
>>> print(dirac.getOutputSandbox(12345))
13521352
{'OK': True, 'Value': ['Job__Sandbox__.tar.bz2']}
13531353
13541354
:param jobID: JobID
@@ -1436,7 +1436,7 @@ def deleteJob(self, jobID):
14361436
14371437
Example Usage:
14381438
1439-
>>> print dirac.deleteJob(12345)
1439+
>>> print(dirac.deleteJob(12345))
14401440
{'OK': True, 'Value': [12345]}
14411441
14421442
:param jobID: JobID
@@ -1468,7 +1468,7 @@ def rescheduleJob(self, jobID):
14681468
14691469
Example Usage:
14701470
1471-
>>> print dirac.rescheduleJob(12345)
1471+
>>> print(dirac.rescheduleJob(12345))
14721472
{'OK': True, 'Value': [12345]}
14731473
14741474
:param jobID: JobID
@@ -1524,7 +1524,7 @@ def getJobStatus(self, jobID):
15241524
15251525
Example Usage:
15261526
1527-
>>> print dirac.getJobStatus(79241)
1527+
>>> print(dirac.getJobStatus(79241))
15281528
{79241: {'Status': 'Done',
15291529
'MinorStatus': 'Execution Complete',
15301530
'ApplicationStatus': 'some app status'
@@ -2030,7 +2030,7 @@ def getJobAttributes(self, jobID, printOutput=False):
20302030
20312031
Example Usage:
20322032
2033-
>>> print dirac.getJobAttributes(79241)
2033+
>>> print(dirac.getJobAttributes(79241))
20342034
{'AccountedFlag': 'False','ApplicationNumStatus': '0',
20352035
'ApplicationStatus': 'Job Finished Successfully',
20362036
'CPUTime': '0.0'}
@@ -2067,7 +2067,7 @@ def getJobParameters(self, jobID, printOutput=False):
20672067
20682068
Example Usage:
20692069
2070-
>>> print dirac.getJobParameters(79241)
2070+
>>> print(dirac.getJobParameters(79241))
20712071
{'OK': True, 'Value': {'JobPath': 'JobPath,JobSanity,JobPolicy,InputData,JobScheduling,TaskQueue',
20722072
'JobSanityCheck': 'Job: 768 JDL: OK, InputData: 2 LFNs OK, '}
20732073
@@ -2106,7 +2106,7 @@ def getJobLoggingInfo(self, jobID, printOutput=False):
21062106
21072107
Example Usage:
21082108
2109-
>>> print dirac.getJobLoggingInfo(79241)
2109+
>>> print(dirac.getJobLoggingInfo(79241))
21102110
{'OK': True, 'Value': [('Received', 'JobPath', 'Unknown', '2008-01-29 15:37:09', 'JobPathAgent'),
21112111
('Checking', 'JobSanity', 'Unknown', '2008-01-29 15:37:14', 'JobSanityAgent')]}
21122112
@@ -2149,7 +2149,7 @@ def peekJob(self, jobID, printOutput=False):
21492149
21502150
Example Usage:
21512151
2152-
>>> print dirac.peekJob(1484)
2152+
>>> print(dirac.peekJob(1484))
21532153
{'OK': True, 'Value': 'Job peek result'}
21542154
21552155
:param jobID: JobID
@@ -2187,7 +2187,7 @@ def pingService(self, system, service, printOutput=False, url=None):
21872187
21882188
Example Usage:
21892189
2190-
>>> print dirac.pingService('WorkloadManagement','JobManager')
2190+
>>> print(dirac.pingService('WorkloadManagement','JobManager'))
21912191
{'OK': True, 'Value': 'Job ping result'}
21922192
21932193
:param system: system
@@ -2235,7 +2235,7 @@ def getJobJDL(self, jobID, original=False, printOutput=False):
22352235
22362236
Example Usage:
22372237
2238-
>>> print dirac.getJobJDL(12345)
2238+
>>> print(dirac.getJobJDL(12345))
22392239
{'Arguments': 'jobDescription.xml',...}
22402240
22412241
:param jobID: JobID

src/DIRAC/WorkloadManagementSystem/DB/ElasticJobParametersDB.py

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,19 @@
11
""" Module containing a front-end to the ElasticSearch-based ElasticJobParametersDB.
2-
This module interacts with one ES index: "ElasticJobParametersDB",
3-
which is a drop-in replacement for MySQL-based table JobDB.JobParameters.
4-
While JobDB.JobParameters in MySQL is defined as::
5-
6-
CREATE TABLE `JobParameters` (
7-
`JobID` INT(11) UNSIGNED NOT NULL,
8-
`Name` VARCHAR(100) NOT NULL,
9-
`Value` TEXT NOT NULL,
10-
PRIMARY KEY (`JobID`,`Name`),
11-
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
12-
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
13-
14-
Here we define a dynamic mapping with the constant fields::
15-
16-
"JobID": {"type": "long"},
17-
"timestamp": {"type": "date"},
18-
19-
and all other custom fields added dynamically.
2+
This is a drop-in replacement for MySQL-based table JobDB.JobParameters.
203
214
The reason for switching to a ES-based JobParameters lies in the extended searching
22-
capabilities of ES..
5+
capabilities of ES.
236
This results in higher traceability for DIRAC jobs.
247
258
The following class methods are provided for public usage
269
- getJobParameters()
2710
- setJobParameter()
2811
- deleteJobParameters()
2912
"""
30-
from DIRAC import S_OK, S_ERROR, gConfig
31-
from DIRAC.Core.Utilities import TimeUtilities
32-
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
13+
from DIRAC import S_ERROR, S_OK
3314
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals
3415
from DIRAC.Core.Base.ElasticDB import ElasticDB
16+
from DIRAC.Core.Utilities import TimeUtilities
3517

3618

3719
mapping = {
@@ -57,8 +39,7 @@ def __init__(self, parentLogger=None):
5739
"""Standard Constructor"""
5840

5941
try:
60-
section = getDatabaseSection("WorkloadManagement/ElasticJobParametersDB")
61-
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", CSGlobals.getSetup()).lower()
42+
indexPrefix = CSGlobals.getSetup().lower()
6243

6344
# Connecting to the ES cluster
6445
super().__init__("WorkloadManagement/ElasticJobParametersDB", indexPrefix, parentLogger=parentLogger)

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111
"""
1212
from pydantic import ValidationError
1313

14-
from DIRAC import S_OK, S_ERROR
15-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
16-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
17-
from DIRAC.Core.DISET.RequestHandler import RequestHandler
14+
from DIRAC import S_ERROR, S_OK
15+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getVOForGroup
1816
from DIRAC.Core.DISET.MessageClient import MessageClient
17+
from DIRAC.Core.DISET.RequestHandler import RequestHandler
1918
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2019
from DIRAC.Core.Utilities.DErrno import EWMSJDL, EWMSSUBM
2120
from DIRAC.Core.Utilities.JDL import jdlToBaseJobDescriptionModel
@@ -352,7 +351,6 @@ def export_rescheduleJob(self, jobIDs):
352351
)
353352
for jobID in validJobList:
354353
self.taskQueueDB.deleteJob(jobID)
355-
# gJobDB.deleteJobFromQueue(jobID)
356354
result = self.jobDB.rescheduleJob(jobID)
357355
self.log.debug(str(result))
358356
if not result["OK"]:

0 commit comments

Comments
 (0)