Skip to content

Commit 003d381

Browse files
authored
Merge pull request #6511 from DIRACGridBot/cherry-pick-2-529778972-integration
[sweep:integration] Bug fixes, and minor style fixes for ES
2 parents 4b0e8a8 + a0b10e3 commit 003d381

File tree

16 files changed

+58
-95
lines changed

16 files changed

+58
-95
lines changed

src/DIRAC/Core/Security/VOMS.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ def setVOMSAttributes(self, proxy, attribute=None, vo=None):
273273
cmd += [f"{vo}:{attribute}" if attribute and attribute != "NoRole" else vo]
274274
cmd += ["-valid", f"{hours}:{mins}"]
275275
cmd += ["-bits", str(bitStrength)]
276-
tmpDir = False
277276
vomsesPath = self.getVOMSESLocation()
278277
if vomsesPath:
279278
cmd += ["-vomses", vomsesPath]
@@ -283,8 +282,6 @@ def setVOMSAttributes(self, proxy, attribute=None, vo=None):
283282
cmd += ["-timeout", "12"]
284283

285284
result = shellCall(self._secCmdTimeout, shlex.join(cmd))
286-
if tmpDir:
287-
shutil.rmtree(tmpDir)
288285

289286
deleteMultiProxy(proxyDict)
290287

src/DIRAC/Core/Utilities/ElasticSearchDB.py

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,19 @@
33
It is used to query Elasticsearch instances.
44
"""
55

6-
from datetime import datetime
7-
from datetime import timedelta
8-
from urllib import parse as urlparse
9-
106
import copy
117
import functools
128
import json
9+
from datetime import datetime, timedelta
10+
from urllib import parse as urlparse
1311

1412
import certifi
1513

1614
try:
17-
from opensearch_dsl import Search, Q, A
15+
from opensearch_dsl import A, Q, Search
1816
from opensearchpy import OpenSearch as Elasticsearch
19-
from opensearchpy.exceptions import (
20-
ConnectionError as ElasticConnectionError,
21-
TransportError,
22-
NotFoundError,
23-
RequestError,
24-
)
17+
from opensearchpy.exceptions import ConnectionError as ElasticConnectionError
18+
from opensearchpy.exceptions import NotFoundError, RequestError, TransportError
2519
from opensearchpy.helpers import BulkIndexError, bulk
2620
except ImportError:
2721
from elasticsearch_dsl import Search, Q, A
@@ -34,11 +28,10 @@
3428
)
3529
from elasticsearch.helpers import BulkIndexError, bulk
3630

37-
from DIRAC import gLogger, S_OK, S_ERROR
31+
from DIRAC import S_ERROR, S_OK, gLogger
3832
from DIRAC.Core.Utilities import DErrno, TimeUtilities
3933
from DIRAC.FrameworkSystem.Client.BundleDeliveryClient import BundleDeliveryClient
4034

41-
4235
sLog = gLogger.getSubLogger(__name__)
4336

4437

@@ -49,9 +42,8 @@ def ifConnected(method):
4942
def wrapper_decorator(self, *args, **kwargs):
5043
if self._connected:
5144
return method(self, *args, **kwargs)
52-
else:
53-
sLog.error("Not connected")
54-
return S_ERROR("Not connected")
45+
sLog.error("Not connected")
46+
return S_ERROR("Not connected")
5547

5648
return wrapper_decorator
5749

@@ -208,7 +200,7 @@ def getIndexPrefix(self):
208200
return self.__indexPrefix
209201

210202
@ifConnected
211-
def query(self, index, query):
203+
def query(self, index: str, query):
212204
"""Executes a query and returns its result (uses ES DSL language).
213205
214206
:param self: self reference
@@ -223,18 +215,17 @@ def query(self, index, query):
223215
return S_ERROR(re)
224216

225217
@ifConnected
226-
def update(self, index, query=None, updateByQuery=True, id=None):
218+
def update(self, index: str, query=None, updateByQuery: bool = True, docID: str = None):
227219
"""Executes an update of a document, and returns S_OK/S_ERROR
228220
229-
:param self: self reference
230-
:param str index: index name
231-
:param dict query: It is the query in ElasticSearch DSL language
232-
:param bool updateByQuery: A bool to determine update by query or index values using index function.
233-
:param int id: ID for the document to be created.
221+
:param index: index name
222+
:param query: It is the query in ElasticSearch DSL language
223+
:param updateByQuery: A bool to determine update by query or index values using index function.
224+
:param docID: ID for the document to be created.
234225
235226
"""
236227

237-
sLog.debug(f"Updating {index} with {query}, updateByQuery={updateByQuery}, id={id}")
228+
sLog.debug(f"Updating {index} with {query}, updateByQuery={updateByQuery}, docID={docID}")
238229

239230
if not index or not query:
240231
return S_ERROR("Missing index or query")
@@ -243,64 +234,64 @@ def update(self, index, query=None, updateByQuery=True, id=None):
243234
if updateByQuery:
244235
esDSLQueryResult = self.client.update_by_query(index=index, body=query)
245236
else:
246-
esDSLQueryResult = self.client.index(index=index, body=query, id=id)
237+
esDSLQueryResult = self.client.index(index=index, body=query, id=docID)
247238
return S_OK(esDSLQueryResult)
248239
except RequestError as re:
249240
return S_ERROR(re)
250241

251242
@ifConnected
252-
def getDoc(self, index: str, id: str) -> dict:
243+
def getDoc(self, index: str, docID: str) -> dict:
253244
"""Retrieves a document in an index.
254245
255246
:param index: name of the index
256-
:param id: document ID
247+
:param docID: document ID
257248
"""
258-
sLog.debug(f"Retrieving document {id} in index {index}")
249+
sLog.debug(f"Retrieving document {docID} in index {index}")
259250
try:
260-
return S_OK(self.client.get(index, id)["_source"])
251+
return S_OK(self.client.get(index, docID)["_source"])
261252
except NotFoundError:
262253
sLog.warn("Could not find the document in index", index)
263254
return S_OK({})
264255
except RequestError as re:
265256
return S_ERROR(re)
266257

267258
@ifConnected
268-
def updateDoc(self, index: str, id: str, body: dict) -> dict:
259+
def updateDoc(self, index: str, docID: str, body) -> dict:
269260
"""Update an existing document with a script or partial document
270261
271262
:param index: name of the index
272-
:param id: document ID
263+
:param docID: document ID
273264
:param body: The request definition requires either `script` or
274265
partial `doc`
275266
"""
276-
sLog.debug(f"Updating document {id} in index {index}")
267+
sLog.debug(f"Updating document {docID} in index {index}")
277268
try:
278-
return S_OK(self.client.update(index, id, body))
269+
return S_OK(self.client.update(index, docID, body))
279270
except RequestError as re:
280271
return S_ERROR(re)
281272

282273
@ifConnected
283-
def deleteDoc(self, index: str, id: str):
274+
def deleteDoc(self, index: str, docID: str):
284275
"""Deletes a document in an index.
285276
286277
:param index: name of the index
287-
:param id: document ID
278+
:param docID: document ID
288279
"""
289-
sLog.debug(f"Deleting document {id} in index {index}")
280+
sLog.debug(f"Deleting document {docID} in index {index}")
290281
try:
291-
return S_OK(self.client.delete(index, id))
282+
return S_OK(self.client.delete(index, docID))
292283
except RequestError as re:
293284
return S_ERROR(re)
294285

295286
@ifConnected
296-
def existsDoc(self, index: str, id: str) -> bool:
287+
def existsDoc(self, index: str, docID: str) -> bool:
297288
"""Returns information about whether a document exists in an index.
298289
299290
:param index: name of the index
300-
:param id: document ID
291+
:param docID: document ID
301292
"""
302-
sLog.debug(f"Checking if document {id} in index {index} exists")
303-
return self.client.exists(index, id)
293+
sLog.debug(f"Checking if document {docID} in index {index} exists")
294+
return self.client.exists(index, docID)
304295

305296
@ifConnected
306297
def _Search(self, indexname):
@@ -329,9 +320,9 @@ def getIndexes(self, indexName=None):
329320
"""
330321
if not indexName:
331322
indexName = self.__indexPrefix
332-
sLog.debug("Getting indices alias of %s" % indexName)
323+
sLog.debug(f"Getting indices alias of {indexName}")
333324
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
334-
return list(self.client.indices.get_alias("%s*" % indexName))
325+
return list(self.client.indices.get_alias(f"{indexName}*"))
335326

336327
@ifConnected
337328
def getDocTypes(self, indexName):

src/DIRAC/Core/Utilities/Graphs/GraphData.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -570,17 +570,6 @@ def getPlotNumData(self):
570570

571571
return zip(self.num_keys, self.values, self.errors)
572572

573-
def getPlotDataForKeys(self, keys):
574-
575-
result_pairs = []
576-
for key in keys:
577-
if key in self.parsed_data:
578-
result_pairs.append(key, self.parsed_data[key], self.parsed_errors[key])
579-
else:
580-
result_pairs.append(key, None, 0.0)
581-
582-
return result_pairs
583-
584573
def getPlotDataForNumKeys(self, num_keys, zeroes=False):
585574

586575
result_pairs = []

src/DIRAC/Core/Workflow/Parameter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def linkUp(self, opt, prefix="", postfix="", objname="self"):
438438
if par is None:
439439
print("ERROR ParameterCollection.linkUp() can not find parameter with the name=%s" % (s))
440440
else:
441-
par.link(objname, prefix + p.getName() + postfix)
441+
par.link(objname, prefix + par.getName() + postfix)
442442
elif isinstance(opt, str):
443443
par = self.find(opt)
444444
if par is None:
@@ -480,7 +480,7 @@ def unlink(self, opt):
480480
elif isinstance(opt, str):
481481
par = self.find(opt)
482482
if par is None:
483-
print("ERROR ParameterCollection.unlink() can not find parameter with the name=%s" % (s))
483+
print("ERROR ParameterCollection.unlink() can not find parameter with the name=%s" % (opt))
484484
else:
485485
par.unlink()
486486
else:

src/DIRAC/DataManagementSystem/Agent/RequestOperations/test/Test_ArchiveFiles.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,7 @@ def test_run_IgnoreMissingFiles(archiveFiles, _myMocker, listOfLFNs):
163163
)
164164
for index, opFile in enumerate(archiveFiles.operation):
165165
LOG.debug("%s", opFile) # lazy evaluation of the argument!
166-
if index == 5:
167-
assert opFile.Status == "Done"
168-
else:
169-
assert opFile.Status == "Done"
166+
assert opFile.Status == "Done"
170167

171168

172169
def test_checkFilePermissions(archiveFiles, _myMocker):

src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileMetadata/FileMetadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def __createMetaSelection(self, value):
457457
result = self.db._escapeValues(value)
458458
if not result["OK"]:
459459
return result
460-
query = "( $s )" % ", ".join(result["Value"])
460+
query = "( %s )" % ", ".join(result["Value"])
461461
queryList.append(("IN", query))
462462
elif isinstance(value, dict):
463463
for operation, operand in value.items():

src/DIRAC/DataManagementSystem/scripts/dirac_dms_directory_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def doUpload(fc, dm, result, source_dir, dest_dir, storage, delete, nthreads):
273273
if len(lfns) > 0:
274274
res = removeRemoteFiles(dm, lfns)
275275
if not res["OK"]:
276-
gLogger.fatal("Deleting of files: " + lfns + " -X- [FAILED]" + res["Message"])
276+
gLogger.fatal("Deleting of files: " + str(lfns) + " -X- [FAILED]" + res["Message"])
277277
DIRAC.exit(1)
278278
else:
279279
gLogger.notice("Deleting " + ", ".join(lfns) + " -> [DONE]")

src/DIRAC/FrameworkSystem/scripts/dirac_proxy_destroy.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,6 @@ def deleteLocalProxy(proxyLoc):
112112
except OSError:
113113
gLogger.error("IOError: Failed to delete local proxy.")
114114
return
115-
except OSError:
116-
gLogger.error("OSError: Failed to delete local proxy.")
117-
return
118115
gLogger.notice("Local proxy deleted.")
119116

120117

src/DIRAC/Interfaces/scripts/dirac_admin_sync_users_from_file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def main():
4040
try:
4141
usersCFG = CFG().loadFromFile(args[0])
4242
except Exception as e:
43-
errorList.append("file open", f"Can't parse file {args[0]}: {str(e)}")
43+
errorList.append(("file open", f"Can't parse file {args[0]}: {str(e)}"))
4444
errorCode = 1
4545
else:
4646
if not diracAdmin.csSyncUsersWithCFG(usersCFG):

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ def __init__(self):
186186
self.log.info("Host Normalization", f"{self.host}: {self.hostNorm}")
187187
except ValueError as e:
188188
self.log.exception("Exception parsing lshosts output", l1, e)
189-
finally:
190-
break
191189

192190
if self.hostNorm and self.normRef:
193191
self.hostNorm /= self.normRef

0 commit comments

Comments
 (0)