Skip to content

Commit bad218c

Browse files
remove deletion operation in es connector (#570)
in ES connector, function delete_by_record_id is not necessary, because when document_id already exists in the index, we want to replace the record with new content, which is equivalent to upsert operation. deleting existing records is dangerous because it may remove unintended content inadvertently. For more details see: https://linear.app/unstructured/issue/ENG-360/upsert-in-elastic-incorrectly-implemented
1 parent ac0a0ab commit bad218c

File tree

4 files changed

+60
-27
lines changed

4 files changed

+60
-27
lines changed

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1+
## 1.1.3
2+
3+
* **Fix: Remove unnecessary deletion operation in ES connector**
4+
15
## 1.1.2
26

3-
* **Fix**: DeltaTableConnectionConfig default assignment is compliant with stricter typing in Pydantic
7+
* **Fix: DeltaTableConnectionConfig default assignment is compliant with stricter typing in Pydantic**
48

59
## 1.1.1
610

711
* **Fix: Update examples**
812

913
## 1.1.0
1014

11-
* **Feature**: Embedding with OpenAI (or Azure OpenAI) can trust custom certificate authority by specifying environment variable REQUESTS_CA_BUNDLE.
15+
* **Feature: Embedding with OpenAI (or Azure OpenAI) can trust custom certificate authority by specifying environment variable REQUESTS_CA_BUNDLE.**
1216

1317
## 1.0.59
1418

test/integration/connectors/elasticsearch/test_elasticsearch.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,11 +280,6 @@ async def test_elasticsearch_destination(
280280
with get_client() as client:
281281
validate_count(client=client, expected_count=expected_count, index_name=destination_index)
282282

283-
# Rerun and make sure the same documents get updated
284-
uploader.run(path=staged_filepath, file_data=file_data)
285-
with get_client() as client:
286-
validate_count(client=client, expected_count=expected_count, index_name=destination_index)
287-
288283

289284
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, NOSQL_TAG)
290285
def test_elasticsearch_destination_precheck_fail():
@@ -333,3 +328,50 @@ def test_elasticsearch_stager(
333328
stager=stager,
334329
tmp_dir=tmp_path,
335330
)
331+
332+
333+
@pytest.mark.asyncio
334+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, NOSQL_TAG)
335+
async def test_elasticsearch_upsert_destination(
336+
upload_file: Path,
337+
destination_index: str,
338+
tmp_path: Path,
339+
):
340+
file_data = FileData(
341+
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
342+
connector_type=CONNECTOR_TYPE,
343+
identifier="mock file data",
344+
)
345+
connection_config = ElasticsearchConnectionConfig(
346+
access_config=ElasticsearchAccessConfig(password=ES_PASSWORD),
347+
username=ES_USERNAME,
348+
hosts=["http://localhost:9200"],
349+
)
350+
stager = ElasticsearchUploadStager(
351+
upload_stager_config=ElasticsearchUploadStagerConfig(index_name=destination_index)
352+
)
353+
354+
uploader = ElasticsearchUploader(
355+
connection_config=connection_config,
356+
upload_config=ElasticsearchUploaderConfig(index_name=destination_index),
357+
)
358+
staged_filepath = stager.run(
359+
elements_filepath=upload_file,
360+
file_data=file_data,
361+
output_dir=tmp_path,
362+
output_filename=upload_file.name,
363+
)
364+
uploader.precheck()
365+
uploader.run(path=staged_filepath, file_data=file_data)
366+
367+
# Run validation
368+
with staged_filepath.open() as f:
369+
staged_elements = json.load(f)
370+
expected_count = len(staged_elements)
371+
with get_client() as client:
372+
validate_count(client=client, expected_count=expected_count, index_name=destination_index)
373+
374+
# Rerun and make sure the same documents get updated
375+
uploader.run(path=staged_filepath, file_data=file_data)
376+
with get_client() as client:
377+
validate_count(client=client, expected_count=expected_count, index_name=destination_index)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.1.2" # pragma: no cover
1+
__version__ = "1.1.3" # pragma: no cover

unstructured_ingest/processes/connectors/elasticsearch/elasticsearch.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
DestinationConnectionError,
2020
SourceConnectionError,
2121
SourceConnectionNetworkError,
22-
WriteError,
2322
)
2423
from unstructured_ingest.interfaces import (
2524
AccessConfig,
@@ -336,6 +335,8 @@ class ElasticsearchUploadStager(UploadStager):
336335

337336
def conform_dict(self, element_dict: dict, file_data: FileData) -> dict:
338337
data = element_dict.copy()
338+
# when _op_type is not specified, it defaults to "index":
339+
# Overwrites if exists, creates if not.
339340
resp = {
340341
"_index": self.upload_stager_config.index_name,
341342
"_id": get_enhanced_element_id(element_dict=data, file_data=file_data),
@@ -397,23 +398,6 @@ def load_parallel_bulk(self):
397398

398399
return parallel_bulk
399400

400-
def delete_by_record_id(self, client, file_data: FileData) -> None:
401-
logger.debug(
402-
f"deleting any content with metadata {RECORD_ID_LABEL}={file_data.identifier} "
403-
f"from {self.upload_config.index_name} index"
404-
)
405-
delete_resp = client.delete_by_query(
406-
index=self.upload_config.index_name,
407-
body={"query": {"match": {self.upload_config.record_id_key: file_data.identifier}}},
408-
)
409-
logger.info(
410-
"deleted {} records from index {}".format(
411-
delete_resp["deleted"], self.upload_config.index_name
412-
)
413-
)
414-
if failures := delete_resp.get("failures"):
415-
raise WriteError(f"failed to delete records: {failures}")
416-
417401
@requires_dependencies(["elasticsearch"], extras="elasticsearch")
418402
def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None: # noqa: E501
419403
from elasticsearch.helpers.errors import BulkIndexError
@@ -429,7 +413,6 @@ def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None
429413
)
430414

431415
with self.connection_config.get_client() as client:
432-
self.delete_by_record_id(client=client, file_data=file_data)
433416
if not client.indices.exists(index=self.upload_config.index_name):
434417
logger.warning(
435418
f"{(self.__class__.__name__).replace('Uploader', '')} index does not exist: "
@@ -446,6 +429,10 @@ def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None
446429
thread_count=self.upload_config.num_threads,
447430
)
448431
collections.deque(iterator, maxlen=0)
432+
logger.info(
433+
f"uploaded batch of {len(batch)} elements to index "
434+
f"{self.upload_config.index_name}"
435+
)
449436
except BulkIndexError as e:
450437
sanitized_errors = [
451438
self._sanitize_bulk_index_error(error) for error in e.errors

0 commit comments

Comments
 (0)