3
3
It is used to query Elasticsearch instances.
4
4
"""
5
5
6
- from datetime import datetime
7
- from datetime import timedelta
8
- from urllib import parse as urlparse
9
-
10
6
import copy
11
7
import functools
12
8
import json
9
+ from datetime import datetime , timedelta
10
+ from urllib import parse as urlparse
13
11
14
12
import certifi
15
13
16
14
try :
17
- from opensearch_dsl import Search , Q , A
15
+ from opensearch_dsl import A , Q , Search
18
16
from opensearchpy import OpenSearch as Elasticsearch
19
- from opensearchpy .exceptions import (
20
- ConnectionError as ElasticConnectionError ,
21
- TransportError ,
22
- NotFoundError ,
23
- RequestError ,
24
- )
17
+ from opensearchpy .exceptions import ConnectionError as ElasticConnectionError
18
+ from opensearchpy .exceptions import NotFoundError , RequestError , TransportError
25
19
from opensearchpy .helpers import BulkIndexError , bulk
26
20
except ImportError :
27
21
from elasticsearch_dsl import Search , Q , A
34
28
)
35
29
from elasticsearch .helpers import BulkIndexError , bulk
36
30
37
- from DIRAC import gLogger , S_OK , S_ERROR
31
+ from DIRAC import S_ERROR , S_OK , gLogger
38
32
from DIRAC .Core .Utilities import DErrno , TimeUtilities
39
33
from DIRAC .FrameworkSystem .Client .BundleDeliveryClient import BundleDeliveryClient
40
34
41
-
42
35
sLog = gLogger .getSubLogger (__name__ )
43
36
44
37
@@ -49,9 +42,8 @@ def ifConnected(method):
49
42
def wrapper_decorator (self , * args , ** kwargs ):
50
43
if self ._connected :
51
44
return method (self , * args , ** kwargs )
52
- else :
53
- sLog .error ("Not connected" )
54
- return S_ERROR ("Not connected" )
45
+ sLog .error ("Not connected" )
46
+ return S_ERROR ("Not connected" )
55
47
56
48
return wrapper_decorator
57
49
@@ -208,7 +200,7 @@ def getIndexPrefix(self):
208
200
return self .__indexPrefix
209
201
210
202
@ifConnected
211
- def query (self , index , query ):
203
+ def query (self , index : str , query ):
212
204
"""Executes a query and returns its result (uses ES DSL language).
213
205
214
206
:param self: self reference
@@ -223,18 +215,17 @@ def query(self, index, query):
223
215
return S_ERROR (re )
224
216
225
217
@ifConnected
226
- def update (self , index , query = None , updateByQuery = True , id = None ):
218
+ def update (self , index : str , query = None , updateByQuery : bool = True , docID : str = None ):
227
219
"""Executes an update of a document, and returns S_OK/S_ERROR
228
220
229
- :param self: self reference
230
- :param str index: index name
231
- :param dict query: It is the query in ElasticSearch DSL language
232
- :param bool updateByQuery: A bool to determine update by query or index values using index function.
233
- :param int id: ID for the document to be created.
221
+ :param index: index name
222
+ :param query: It is the query in ElasticSearch DSL language
223
+ :param updateByQuery: A bool to determine update by query or index values using index function.
224
+ :param docID: ID for the document to be created.
234
225
235
226
"""
236
227
237
- sLog .debug (f"Updating { index } with { query } , updateByQuery={ updateByQuery } , id= { id } " )
228
+ sLog .debug (f"Updating { index } with { query } , updateByQuery={ updateByQuery } , docID= { docID } " )
238
229
239
230
if not index or not query :
240
231
return S_ERROR ("Missing index or query" )
@@ -243,64 +234,64 @@ def update(self, index, query=None, updateByQuery=True, id=None):
243
234
if updateByQuery :
244
235
esDSLQueryResult = self .client .update_by_query (index = index , body = query )
245
236
else :
246
- esDSLQueryResult = self .client .index (index = index , body = query , id = id )
237
+ esDSLQueryResult = self .client .index (index = index , body = query , id = docID )
247
238
return S_OK (esDSLQueryResult )
248
239
except RequestError as re :
249
240
return S_ERROR (re )
250
241
251
242
@ifConnected
252
- def getDoc (self , index : str , id : str ) -> dict :
243
+ def getDoc (self , index : str , docID : str ) -> dict :
253
244
"""Retrieves a document in an index.
254
245
255
246
:param index: name of the index
256
- :param id : document ID
247
+ :param docID : document ID
257
248
"""
258
- sLog .debug (f"Retrieving document { id } in index { index } " )
249
+ sLog .debug (f"Retrieving document { docID } in index { index } " )
259
250
try :
260
- return S_OK (self .client .get (index , id )["_source" ])
251
+ return S_OK (self .client .get (index , docID )["_source" ])
261
252
except NotFoundError :
262
253
sLog .warn ("Could not find the document in index" , index )
263
254
return S_OK ({})
264
255
except RequestError as re :
265
256
return S_ERROR (re )
266
257
267
258
@ifConnected
268
- def updateDoc (self , index : str , id : str , body : dict ) -> dict :
259
+ def updateDoc (self , index : str , docID : str , body ) -> dict :
269
260
"""Update an existing document with a script or partial document
270
261
271
262
:param index: name of the index
272
- :param id : document ID
263
+ :param docID : document ID
273
264
:param body: The request definition requires either `script` or
274
265
partial `doc`
275
266
"""
276
- sLog .debug (f"Updating document { id } in index { index } " )
267
+ sLog .debug (f"Updating document { docID } in index { index } " )
277
268
try :
278
- return S_OK (self .client .update (index , id , body ))
269
+ return S_OK (self .client .update (index , docID , body ))
279
270
except RequestError as re :
280
271
return S_ERROR (re )
281
272
282
273
@ifConnected
283
- def deleteDoc (self , index : str , id : str ):
274
+ def deleteDoc (self , index : str , docID : str ):
284
275
"""Deletes a document in an index.
285
276
286
277
:param index: name of the index
287
- :param id : document ID
278
+ :param docID : document ID
288
279
"""
289
- sLog .debug (f"Deleting document { id } in index { index } " )
280
+ sLog .debug (f"Deleting document { docID } in index { index } " )
290
281
try :
291
- return S_OK (self .client .delete (index , id ))
282
+ return S_OK (self .client .delete (index , docID ))
292
283
except RequestError as re :
293
284
return S_ERROR (re )
294
285
295
286
@ifConnected
296
- def existsDoc (self , index : str , id : str ) -> bool :
287
+ def existsDoc (self , index : str , docID : str ) -> bool :
297
288
"""Returns information about whether a document exists in an index.
298
289
299
290
:param index: name of the index
300
- :param id : document ID
291
+ :param docID : document ID
301
292
"""
302
- sLog .debug (f"Checking if document { id } in index { index } exists" )
303
- return self .client .exists (index , id )
293
+ sLog .debug (f"Checking if document { docID } in index { index } exists" )
294
+ return self .client .exists (index , docID )
304
295
305
296
@ifConnected
306
297
def _Search (self , indexname ):
@@ -329,9 +320,9 @@ def getIndexes(self, indexName=None):
329
320
"""
330
321
if not indexName :
331
322
indexName = self .__indexPrefix
332
- sLog .debug ("Getting indices alias of %s" % indexName )
323
+ sLog .debug (f "Getting indices alias of { indexName } " )
333
324
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
334
- return list (self .client .indices .get_alias ("%s*" % indexName ))
325
+ return list (self .client .indices .get_alias (f" { indexName } *" ))
335
326
336
327
@ifConnected
337
328
def getDocTypes (self , indexName ):
0 commit comments