|
3 | 3 | It is used to query Elasticsearch instances.
|
4 | 4 | """
|
5 | 5 |
|
6 |
| -from __future__ import absolute_import |
7 |
| -from __future__ import division |
8 |
| -from __future__ import print_function |
9 |
| - |
10 |
| -__RCSID__ = "$Id$" |
11 |
| - |
12 | 6 | from datetime import datetime
|
13 | 7 | from datetime import timedelta
|
14 | 8 |
|
|
17 | 11 | import functools
|
18 | 12 | import json
|
19 | 13 |
|
20 |
| -from elasticsearch import Elasticsearch |
21 |
| -from elasticsearch_dsl import Search, Q, A |
22 |
| -from elasticsearch.exceptions import ConnectionError, TransportError, NotFoundError, RequestError |
23 |
| -from elasticsearch.helpers import BulkIndexError, bulk |
| 14 | +try: |
| 15 | + from opensearchpy import OpenSearch as Elasticsearch |
| 16 | + from opensearch_dsl import Search, Q, A |
| 17 | + from opensearchpy.exceptions import ConnectionError, TransportError, NotFoundError, RequestError |
| 18 | + from opensearchpy.helpers import BulkIndexError, bulk |
| 19 | +except ImportError: |
| 20 | + from elasticsearch import Elasticsearch |
| 21 | + from elasticsearch_dsl import Search, Q, A |
| 22 | + from elasticsearch.exceptions import ConnectionError, TransportError, NotFoundError, RequestError |
| 23 | + from elasticsearch.helpers import BulkIndexError, bulk |
24 | 24 |
|
25 | 25 | from DIRAC import gLogger, S_OK, S_ERROR
|
26 | 26 | from DIRAC.Core.Utilities import Time, DErrno
|
@@ -53,8 +53,7 @@ def generateDocs(data, withTimeStamp=True):
|
53 | 53 | :return: doc
|
54 | 54 | """
|
55 | 55 | for doc in copy.deepcopy(data):
|
56 |
| - if "_type" not in doc: |
57 |
| - doc["_type"] = "_doc" |
| 56 | + |
58 | 57 | if withTimeStamp:
|
59 | 58 | if "timestamp" not in doc:
|
60 | 59 | sLog.warn("timestamp is not given")
|
@@ -231,7 +230,7 @@ def update(self, index, query=None, updateByQuery=True, id=None):
|
231 | 230 | if updateByQuery:
|
232 | 231 | esDSLQueryResult = self.client.update_by_query(index=index, body=query)
|
233 | 232 | else:
|
234 |
| - esDSLQueryResult = self.client.index(index=index, doc_type="_doc", body=query, id=id) |
| 233 | + esDSLQueryResult = self.client.index(index=index, body=query, id=id) |
235 | 234 | return S_OK(esDSLQueryResult)
|
236 | 235 | except RequestError as re:
|
237 | 236 | return S_ERROR(re)
|
@@ -343,11 +342,8 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
|
343 | 342 |
|
344 | 343 | try:
|
345 | 344 | sLog.info("Create index: ", fullIndex + str(mapping))
|
346 |
| - try: |
347 |
| - self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7 |
348 |
| - except RequestError as re: |
349 |
| - if re.error == "mapper_parsing_exception": |
350 |
| - self.client.indices.create(index=fullIndex, body={"mappings": {"_doc": mapping}}) # ES6 |
| 345 | + self.client.indices.create(index=fullIndex, body={"mappings": mapping}) # ES7 |
| 346 | + |
351 | 347 | return S_OK(fullIndex)
|
352 | 348 | except Exception as e: # pylint: disable=broad-except
|
353 | 349 | sLog.error("Can not create the index:", repr(e))
|
@@ -388,7 +384,7 @@ def index(self, indexName, body=None, docID=None):
|
388 | 384 | return S_ERROR("Missing index or body")
|
389 | 385 |
|
390 | 386 | try:
|
391 |
| - res = self.client.index(index=indexName, doc_type="_doc", body=body, id=docID) |
| 387 | + res = self.client.index(index=indexName, body=body, id=docID) |
392 | 388 | except (RequestError, TransportError) as e:
|
393 | 389 | sLog.exception()
|
394 | 390 | return S_ERROR(e)
|
|
0 commit comments