|
1 | 1 | import logging |
2 | | -from typing import List, Optional, Union |
| 2 | +from typing import List, Optional, Union, Dict, Any |
3 | 3 |
|
4 | 4 | from haystack.lazy_imports import LazyImport |
| 5 | +from haystack import Document |
5 | 6 |
|
6 | 7 | with LazyImport("Run 'pip install farm-haystack[elasticsearch8]'") as es_import: |
7 | 8 | from elasticsearch import Elasticsearch, RequestError |
@@ -294,3 +295,63 @@ def _init_elastic_client( |
294 | 295 | f"correct credentials if you are using a secured Elasticsearch instance." |
295 | 296 | ) |
296 | 297 | return client |
| 298 | + |
| 299 | + def _index_exists(self, index_name: str, headers: Optional[Dict[str, str]] = None) -> bool: |
| 300 | + if logger.isEnabledFor(logging.DEBUG): |
| 301 | + if self.client.options(headers=headers).indices.exists_alias(name=index_name): |
| 302 | + logger.debug("Index name %s is an alias.", index_name) |
| 303 | + |
| 304 | + return self.client.options(headers=headers).indices.exists(index=index_name) |
| 305 | + |
| 306 | + def _index_delete(self, index): |
| 307 | + if self._index_exists(index): |
| 308 | + self.client.options(ignore_status=[400, 404]).indices.delete(index=index) |
| 309 | + logger.info("Index '%s' deleted.", index) |
| 310 | + |
| 311 | + def _index_refresh(self, index, headers): |
| 312 | + if self._index_exists(index): |
| 313 | + self.client.options(headers=headers).indices.refresh(index=index) |
| 314 | + |
| 315 | + def _index_create(self, *args, **kwargs): |
| 316 | + headers = kwargs.pop("headers", {}) |
| 317 | + return self.client.options(headers=headers).indices.create(*args, **kwargs) |
| 318 | + |
| 319 | + def _index_get(self, *args, **kwargs): |
| 320 | + headers = kwargs.pop("headers", {}) |
| 321 | + return self.client.options(headers=headers).indices.get(*args, **kwargs) |
| 322 | + |
| 323 | + def _index_put_mapping(self, *args, **kwargs): |
| 324 | + headers = kwargs.pop("headers", {}) |
| 325 | + body = kwargs.pop("body", {}) |
| 326 | + return self.client.options(headers=headers).indices.put_mapping(*args, **kwargs, **body) |
| 327 | + |
| 328 | + def _search(self, *args, **kwargs): |
| 329 | + headers = kwargs.pop("headers", {}) |
| 330 | + return self.client.options(headers=headers).search(*args, **kwargs) |
| 331 | + |
| 332 | + def _update(self, *args, **kwargs): |
| 333 | + headers = kwargs.pop("headers", {}) |
| 334 | + return self.client.options(headers=headers).update(*args, **kwargs) |
| 335 | + |
| 336 | + def _count(self, *args, **kwargs): |
| 337 | + headers = kwargs.pop("headers", {}) |
| 338 | + body = kwargs.pop("body", {}) |
| 339 | + return self.client.options(headers=headers).count(*args, **kwargs, **body) |
| 340 | + |
| 341 | + def _delete_by_query(self, *args, **kwargs): |
| 342 | + headers = kwargs.pop("headers", {}) |
| 343 | + ignore_status = kwargs.pop("ignore", []) |
| 344 | + body = kwargs.pop("body", {}) |
| 345 | + return self.client.options(headers=headers, ignore_status=ignore_status).delete_by_query( |
| 346 | + *args, **kwargs, **body |
| 347 | + ) |
| 348 | + |
| 349 | + def _execute_msearch(self, index: str, body: List[Dict[str, Any]], scale_score: bool) -> List[List[Document]]: |
| 350 | + responses = self.client.msearch(index=index, body=body) |
| 351 | + documents = [] |
| 352 | + for response in responses["responses"]: |
| 353 | + result = response["hits"]["hits"] |
| 354 | + cur_documents = [self._convert_es_hit_to_document(hit, scale_score=scale_score) for hit in result] |
| 355 | + documents.append(cur_documents) |
| 356 | + |
| 357 | + return documents |
0 commit comments