|
6 | 6 | import copy
|
7 | 7 | import functools
|
8 | 8 | import json
|
| 9 | +import time |
9 | 10 | from datetime import datetime, timedelta
|
10 | 11 | from urllib import parse as urlparse
|
11 | 12 |
|
|
15 | 16 | from opensearch_dsl import A, Q, Search
|
16 | 17 | from opensearchpy import OpenSearch as Elasticsearch
|
17 | 18 | from opensearchpy.exceptions import ConnectionError as ElasticConnectionError
|
18 |
| - from opensearchpy.exceptions import NotFoundError, RequestError, TransportError |
| 19 | + from opensearchpy.exceptions import NotFoundError, RequestError, TransportError, ConflictError |
19 | 20 | from opensearchpy.helpers import BulkIndexError, bulk
|
20 | 21 | except ImportError:
|
21 | 22 | from elasticsearch_dsl import Search, Q, A
|
|
25 | 26 | TransportError,
|
26 | 27 | NotFoundError,
|
27 | 28 | RequestError,
|
| 29 | + ConflictError, |
28 | 30 | )
|
29 | 31 | from elasticsearch.helpers import BulkIndexError, bulk
|
30 | 32 |
|
@@ -76,7 +78,7 @@ def generateDocs(data, withTimeStamp=True):
|
76 | 78 | sLog.error("Wrong timestamp", e)
|
77 | 79 | doc["timestamp"] = int(TimeUtilities.toEpochMilliSeconds())
|
78 | 80 |
|
79 |
| - sLog.debug(f"yielding {doc}") |
| 81 | + sLog.debug("yielding", doc) |
80 | 82 | yield doc
|
81 | 83 |
|
82 | 84 |
|
@@ -265,9 +267,15 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
|
265 | 267 | """
|
266 | 268 | sLog.debug(f"Updating document {docID} in index {index}")
|
267 | 269 | try:
|
268 |
| - return S_OK(self.client.update(index, docID, body)) |
| 270 | + self.client.update(index, docID, body) |
| 271 | + except ConflictError: |
| 272 | + # updates are rather "heavy" operations from ES point of view, needing seqNo to be updated. |
| 273 | + # Not ideal, but we just wait and retry. |
| 274 | + time.sleep(1) |
| 275 | + self.client.update(index, docID, body, params={"retry_on_conflict": 3}) |
269 | 276 | except RequestError as re:
|
270 | 277 | return S_ERROR(re)
|
| 278 | + return S_OK() |
271 | 279 |
|
272 | 280 | @ifConnected
|
273 | 281 | def deleteDoc(self, index: str, docID: str):
|
|
0 commit comments