From 0dca8ec03ccc38dca8278fb6bb7fbbd8a6c0633f Mon Sep 17 00:00:00 2001 From: Hubert Rutkowski <157481729+hubert-rutkowski85@users.noreply.github.com> Date: Mon, 2 Dec 2024 17:49:06 +0100 Subject: [PATCH 01/15] fix/Kafka cloud source couldn't connect, add test (#257) --- .github/workflows/e2e.yml | 3 + CHANGELOG.md | 10 ++ test/integration/connectors/test_kafka.py | 115 +++++++++++++++++- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/kafka/cloud.py | 16 +-- .../v2/processes/connectors/kafka/kafka.py | 11 +- .../v2/processes/connectors/kafka/local.py | 2 +- 7 files changed, 141 insertions(+), 18 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index c51d38504..3a501ed03 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -104,6 +104,9 @@ jobs: ASTRA_DB_API_ENDPOINT: ${{ secrets.ASTRA_DB_ENDPOINT }} MONGODB_URI: ${{ secrets.MONGODB_URI }} MONGODB_DATABASE: ${{ secrets.MONGODB_DATABASE_NAME }} + KAFKA_API_KEY: ${{ secrets.KAFKA_API_KEY }} + KAFKA_SECRET: ${{ secrets.KAFKA_SECRET }} + KAFKA_BOOTSTRAP_SERVER: ${{ secrets.KAFKA_BOOTSTRAP_SERVER }} run : | source .venv/bin/activate make install-test diff --git a/CHANGELOG.md b/CHANGELOG.md index 8668ba87f..d169d487d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 0.3.7-dev1 + +### Fixes + +* **Fix Kafka source connection problems** + +### Enhancements + +* **Kafka source connector has new field: group_id** + ## 0.3.6 ### Fixes diff --git a/test/integration/connectors/test_kafka.py b/test/integration/connectors/test_kafka.py index ecd9e7486..1ffd93402 100644 --- a/test/integration/connectors/test_kafka.py +++ b/test/integration/connectors/test_kafka.py @@ -1,4 +1,5 @@ import json +import os import tempfile import time from pathlib import Path @@ -17,8 +18,17 @@ ValidationConfigs, source_connector_validation, ) +from test.integration.utils import requires_env from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers +from unstructured_ingest.v2.processes.connectors.kafka.cloud import ( + CloudKafkaAccessConfig, + CloudKafkaConnectionConfig, + CloudKafkaDownloader, + CloudKafkaDownloaderConfig, + CloudKafkaIndexer, + CloudKafkaIndexerConfig, +) from unstructured_ingest.v2.processes.connectors.kafka.local import ( CONNECTOR_TYPE, LocalKafkaConnectionConfig, @@ -47,20 +57,27 @@ def docker_compose_ctx(): yield ctx -def wait_for_topic(topic: str, retries: int = 10, interval: int = 1): - admin_client = get_admin_client() +def wait_for_topic( + topic: str, + retries: int = 10, + interval: int = 1, + exists: bool = True, + admin_client=None, +): + if admin_client is None: + admin_client = get_admin_client() current_topics = admin_client.list_topics().topics attempts = 0 - while topic not in current_topics and attempts < retries: + while (topic not in current_topics) == exists and attempts < retries: attempts += 1 print( - "Attempt {}: Waiting for topic {} to exist in {}".format( - attempts, topic, ", ".join(current_topics) + "Attempt {}: Waiting for topic {} to {} exist. Current topics: [{}]".format( + attempts, topic, "" if exists else "not", ", ".join(current_topics) ) ) time.sleep(interval) current_topics = admin_client.list_topics().topics - if topic not in current_topics: + if (topic not in current_topics) == exists: raise TimeoutError(f"Timeout out waiting for topic {topic} to exist") @@ -110,6 +127,92 @@ async def test_kafka_source_local(kafka_seed_topic: str): ) +@pytest.fixture +def kafka_seed_topic_cloud(expected_messages: int = 5) -> int: + conf = { + "bootstrap.servers": os.environ["KAFKA_BOOTSTRAP_SERVER"], + "sasl.username": os.environ["KAFKA_API_KEY"], + "sasl.password": os.environ["KAFKA_SECRET"], + "sasl.mechanism": "PLAIN", + "security.protocol": "SASL_SSL", + } + admin_client = AdminClient(conf) + try: + res = admin_client.delete_topics([TOPIC], operation_timeout=10) + for topic, f in res.items(): + f.result() + print(f"Topic {topic} removed") + wait_for_topic(TOPIC, 5, 1, False, admin_client) + except Exception: + pass + + cluster_meta = admin_client.list_topics() + current_topics = [topic for topic in cluster_meta.topics if topic != "__consumer_offsets"] + + assert TOPIC not in current_topics, f"Topic {TOPIC} shouldn't exist" + + # Kafka Cloud allows to use replication_factor=1 only for Dedicated clusters. + topic_obj = NewTopic(TOPIC, num_partitions=1, replication_factor=3) + + res = admin_client.create_topics([topic_obj], operation_timeout=10, validate_only=False) + for topic, f in res.items(): + f.result() + + producer = Producer(conf) + for i in range(expected_messages): + message = f"This is some text for message {i}" + producer.produce(topic=TOPIC, value=message) + producer.flush(timeout=10) + return expected_messages + + +@pytest.mark.asyncio +@pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG) +@requires_env("KAFKA_API_KEY", "KAFKA_SECRET", "KAFKA_BOOTSTRAP_SERVER") +async def test_kafka_source_cloud(kafka_seed_topic_cloud: int): + """ + In order to have this test succeed, you need to create cluster on Confluent Cloud, + and create the API key with admin privileges. By default, user account keys have it. + """ + + expected_messages = kafka_seed_topic_cloud + + connection_config = CloudKafkaConnectionConfig( + bootstrap_server=os.environ["KAFKA_BOOTSTRAP_SERVER"], + port=9092, + access_config=CloudKafkaAccessConfig( + kafka_api_key=os.environ["KAFKA_API_KEY"], + secret=os.environ["KAFKA_SECRET"], + ), + ) + + with tempfile.TemporaryDirectory() as tempdir: + tempdir_path = Path(tempdir) + download_config = CloudKafkaDownloaderConfig(download_dir=tempdir_path) + indexer = CloudKafkaIndexer( + connection_config=connection_config, + index_config=CloudKafkaIndexerConfig( + topic=TOPIC, + num_messages_to_consume=expected_messages, + ), + ) + downloader = CloudKafkaDownloader( + connection_config=connection_config, download_config=download_config + ) + indexer.precheck() + await source_connector_validation( + indexer=indexer, + downloader=downloader, + configs=ValidationConfigs( + test_id="kafka", + exclude_fields_extend=["connector_type"], + expected_num_files=expected_messages, + validate_downloaded_files=True, + validate_file_data=True, + ), + ) + + @pytest.mark.tags(CONNECTOR_TYPE, SOURCE_TAG) def test_kafka_source_local_precheck_fail_no_cluster(): connection_config = LocalKafkaConnectionConfig(bootstrap_server="localhost", port=29092) diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 5f1a14e65..54ce214a6 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.6" # pragma: no cover +__version__ = "0.3.7-dev1" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/kafka/cloud.py b/unstructured_ingest/v2/processes/connectors/kafka/cloud.py index 5fd8a9616..b4c2fd4ef 100644 --- a/unstructured_ingest/v2/processes/connectors/kafka/cloud.py +++ b/unstructured_ingest/v2/processes/connectors/kafka/cloud.py @@ -1,6 +1,6 @@ import socket from dataclasses import dataclass -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from pydantic import Field, Secret, SecretStr @@ -26,10 +26,10 @@ class CloudKafkaAccessConfig(KafkaAccessConfig): - api_key: Optional[SecretStr] = Field( - description="Kafka API key to connect at the server", alias="kafka_api_key", default=None + kafka_api_key: SecretStr = Field( + description="Kafka API key to connect at the server", default=None ) - secret: Optional[SecretStr] = Field(description="", default=None) + secret: SecretStr = Field(description="", default=None) class CloudKafkaConnectionConfig(KafkaConnectionConfig): @@ -43,11 +43,11 @@ def get_consumer_configuration(self) -> dict: conf = { "bootstrap.servers": f"{bootstrap}:{port}", "client.id": socket.gethostname(), - "group.id": "default_group_id", + "group.id": self.group_id, "enable.auto.commit": "false", "auto.offset.reset": "earliest", - "sasl.username": access_config.api_key, - "sasl.password": access_config.secret, + "sasl.username": access_config.kafka_api_key.get_secret_value(), + "sasl.password": access_config.secret.get_secret_value(), "sasl.mechanism": "PLAIN", "security.protocol": "SASL_SSL", } @@ -61,7 +61,7 @@ def get_producer_configuration(self) -> dict: conf = { "bootstrap.servers": f"{bootstrap}:{port}", - "sasl.username": access_config.api_key, + "sasl.username": access_config.kafka_api_key, "sasl.password": access_config.secret, "sasl.mechanism": "PLAIN", "security.protocol": "SASL_SSL", diff --git a/unstructured_ingest/v2/processes/connectors/kafka/kafka.py b/unstructured_ingest/v2/processes/connectors/kafka/kafka.py index 2842fb7a4..85c35d675 100644 --- a/unstructured_ingest/v2/processes/connectors/kafka/kafka.py +++ b/unstructured_ingest/v2/processes/connectors/kafka/kafka.py @@ -43,6 +43,11 @@ class KafkaConnectionConfig(ConnectionConfig, ABC): access_config: Secret[KafkaAccessConfig] bootstrap_server: str port: int + group_id: str = Field( + description="A consumer group is a way to allow a pool of consumers " + "to divide the consumption of data over topics and partitions.", + default="default_group_id", + ) @abstractmethod def get_consumer_configuration(self) -> dict: @@ -75,7 +80,7 @@ def get_producer(self) -> "Producer": class KafkaIndexerConfig(IndexerConfig): topic: str = Field(description="which topic to consume from") num_messages_to_consume: Optional[int] = 100 - timeout: Optional[float] = Field(default=1.0, description="polling timeout") + timeout: Optional[float] = Field(default=3.0, description="polling timeout", ge=3.0) def update_consumer(self, consumer: "Consumer") -> None: consumer.subscribe([self.topic]) @@ -157,7 +162,9 @@ async def run_async(self, file_data: FileData, **kwargs: Any) -> DownloadRespons def precheck(self): try: with self.get_consumer() as consumer: - cluster_meta = consumer.list_topics(timeout=self.index_config.timeout) + # timeout needs at least 3 secs, more info: + # https://forum.confluent.io/t/kafkacat-connect-failure-to-confcloud-ssl/2513 + cluster_meta = consumer.list_topics(timeout=5) current_topics = [ topic for topic in cluster_meta.topics if topic != "__consumer_offsets" ] diff --git a/unstructured_ingest/v2/processes/connectors/kafka/local.py b/unstructured_ingest/v2/processes/connectors/kafka/local.py index 3fe7e7d7b..d0dbc2d06 100644 --- a/unstructured_ingest/v2/processes/connectors/kafka/local.py +++ b/unstructured_ingest/v2/processes/connectors/kafka/local.py @@ -39,7 +39,7 @@ def get_consumer_configuration(self) -> dict: conf = { "bootstrap.servers": f"{bootstrap}:{port}", - "group.id": "default_group_id", + "group.id": self.group_id, "enable.auto.commit": "false", "auto.offset.reset": "earliest", } From 6b1a05ac9ca1e8f29a3677912a21271c8bd522c2 Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Mon, 2 Dec 2024 12:27:39 -0500 Subject: [PATCH 02/15] feat/add release branch to PR triggers (#284) * add release branch to PR triggers * omit vectar dest e2e test --- .github/workflows/e2e.yml | 2 +- .github/workflows/unit_tests.yml | 2 +- test_e2e/test-dest.sh | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 3a501ed03..009a2a6fd 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -4,7 +4,7 @@ on: push: branches: [ main ] pull_request: - branches: [ main ] + branches: [ main, release/* ] merge_group: branches: [ main ] diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 07605d2e9..dc2b950b0 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -4,7 +4,7 @@ on: push: branches: [ main ] pull_request: - branches: [ main ] + branches: [ main, release/* ] merge_group: branches: [ main ] diff --git a/test_e2e/test-dest.sh b/test_e2e/test-dest.sh index 20f16fac8..dbf3d7845 100755 --- a/test_e2e/test-dest.sh +++ b/test_e2e/test-dest.sh @@ -25,7 +25,7 @@ all_tests=( 's3.sh' 's3_no_access.sh' 'sharepoint-embed-cog-index.sh' - 'vectara.sh' + # 'vectara.sh' # 'kdbai.sh' ) From ee6000a24726c717e149a68bf6e54323e2b22987 Mon Sep 17 00:00:00 2001 From: Hubert Rutkowski <157481729+hubert-rutkowski85@users.noreply.github.com> Date: Tue, 3 Dec 2024 12:24:05 +0100 Subject: [PATCH 03/15] fix/Azure AI search - reuse client and close connections (#282) --- CHANGELOG.md | 6 +++ unstructured_ingest/__version__.py | 2 +- .../processes/connectors/azure_ai_search.py | 41 ++++++++++--------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d169d487d..a1db69355 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.3.7-dev2 + +### Fixes + +* **Fix Azure AI Search session handling** + ## 0.3.7-dev1 ### Fixes diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 54ce214a6..d27bb18df 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev1" # pragma: no cover +__version__ = "0.3.7-dev2" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/azure_ai_search.py b/unstructured_ingest/v2/processes/connectors/azure_ai_search.py index b776a0923..6bf30db2d 100644 --- a/unstructured_ingest/v2/processes/connectors/azure_ai_search.py +++ b/unstructured_ingest/v2/processes/connectors/azure_ai_search.py @@ -173,8 +173,10 @@ class AzureAISearchUploader(Uploader): connector_type: str = CONNECTOR_TYPE def query_docs(self, record_id: str, index_key: str) -> list[str]: - client = self.connection_config.get_search_client() - results = list(client.search(filter=f"record_id eq '{record_id}'", select=[index_key])) + with self.connection_config.get_search_client() as search_client: + results = list( + search_client.search(filter=f"record_id eq '{record_id}'", select=[index_key]) + ) return [result[index_key] for result in results] def delete_by_record_id(self, file_data: FileData, index_key: str) -> None: @@ -186,10 +188,10 @@ def delete_by_record_id(self, file_data: FileData, index_key: str) -> None: doc_ids_to_delete = self.query_docs(record_id=file_data.identifier, index_key=index_key) if not doc_ids_to_delete: return - client: SearchClient = self.connection_config.get_search_client() - results = client.delete_documents( - documents=[{index_key: doc_id} for doc_id in doc_ids_to_delete] - ) + with self.connection_config.get_search_client() as search_client: + results = search_client.delete_documents( + documents=[{index_key: doc_id} for doc_id in doc_ids_to_delete] + ) errors = [] success = [] for result in results: @@ -207,7 +209,9 @@ def delete_by_record_id(self, file_data: FileData, index_key: str) -> None: @DestinationConnectionError.wrap @requires_dependencies(["azure"], extras="azure-ai-search") - def write_dict(self, elements_dict: list[dict[str, Any]]) -> None: + def write_dict( + self, elements_dict: list[dict[str, Any]], search_client: "SearchClient" + ) -> None: import azure.core.exceptions logger.info( @@ -215,12 +219,10 @@ def write_dict(self, elements_dict: list[dict[str, Any]]) -> None: f"index at {self.connection_config.index}", ) try: - results = self.connection_config.get_search_client().upload_documents( - documents=elements_dict - ) - + results = search_client.upload_documents(documents=elements_dict) except azure.core.exceptions.HttpResponseError as http_error: raise WriteError(f"http error: {http_error}") from http_error + errors = [] success = [] for result in results: @@ -240,8 +242,8 @@ def write_dict(self, elements_dict: list[dict[str, Any]]) -> None: ) def can_delete(self) -> bool: - search_index_client = self.connection_config.get_search_index_client() - index = search_index_client.get_index(name=self.connection_config.index) + with self.connection_config.get_search_index_client() as search_index_client: + index = search_index_client.get_index(name=self.connection_config.index) index_fields = index.fields record_id_fields = [ field for field in index_fields if field.name == self.upload_config.record_id_key @@ -252,8 +254,8 @@ def can_delete(self) -> bool: return record_id_field.filterable def get_index_key(self) -> str: - search_index_client = self.connection_config.get_search_index_client() - index = search_index_client.get_index(name=self.connection_config.index) + with self.connection_config.get_search_index_client() as search_index_client: + index = search_index_client.get_index(name=self.connection_config.index) index_fields = index.fields key_fields = [field for field in index_fields if field.key] if not key_fields: @@ -262,8 +264,8 @@ def get_index_key(self) -> str: def precheck(self) -> None: try: - client = self.connection_config.get_search_client() - client.get_document_count() + with self.connection_config.get_search_client() as search_client: + search_client.get_document_count() except Exception as e: logger.error(f"failed to validate connection: {e}", exc_info=True) raise DestinationConnectionError(f"failed to validate connection: {e}") @@ -284,8 +286,9 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: logger.warning("criteria for deleting previous content not met, skipping") batch_size = self.upload_config.batch_size - for chunk in batch_generator(elements_dict, batch_size): - self.write_dict(elements_dict=chunk) # noqa: E203 + with self.connection_config.get_search_client() as search_client: + for chunk in batch_generator(elements_dict, batch_size): + self.write_dict(elements_dict=chunk, search_client=search_client) # noqa: E203 azure_ai_search_destination_entry = DestinationRegistryEntry( From 4f43b4a0558dfb94b440a919b62ea50fd7fe8afc Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Tue, 3 Dec 2024 09:10:36 -0500 Subject: [PATCH 04/15] support personal access token for confluence auth (#275) --- CHANGELOG.md | 10 ++--- test/unit/v2/connectors/test_confluence.py | 39 +++++++++++++++++++ unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/confluence.py | 16 +++++++- 4 files changed, 57 insertions(+), 10 deletions(-) create mode 100644 test/unit/v2/connectors/test_confluence.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a1db69355..885fc8022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,18 +1,14 @@ -## 0.3.7-dev2 - -### Fixes - -* **Fix Azure AI Search session handling** - -## 0.3.7-dev1 +## 0.3.7-dev3 ### Fixes * **Fix Kafka source connection problems** +* **Fix Azure AI Search session handling** ### Enhancements * **Kafka source connector has new field: group_id** +* * **Support personal access token for confluence auth** ## 0.3.6 diff --git a/test/unit/v2/connectors/test_confluence.py b/test/unit/v2/connectors/test_confluence.py new file mode 100644 index 000000000..1c62f480b --- /dev/null +++ b/test/unit/v2/connectors/test_confluence.py @@ -0,0 +1,39 @@ +import pytest +from pydantic import ValidationError + +from unstructured_ingest.v2.processes.connectors.confluence import ( + ConfluenceAccessConfig, + ConfluenceConnectionConfig, +) + + +def test_connection_config_multiple_auth(): + with pytest.raises(ValidationError): + ConfluenceConnectionConfig( + access_config=ConfluenceAccessConfig( + api_token="api_token", + access_token="access_token", + ), + user_email="user_email", + url="url", + ) + + +def test_connection_config_no_auth(): + with pytest.raises(ValidationError): + ConfluenceConnectionConfig(access_config=ConfluenceAccessConfig(), url="url") + + +def test_connection_config_basic_auth(): + ConfluenceConnectionConfig( + access_config=ConfluenceAccessConfig(api_token="api_token"), + url="url", + user_email="user_email", + ) + + +def test_connection_config_pat_auth(): + ConfluenceConnectionConfig( + access_config=ConfluenceAccessConfig(access_token="access_token"), + url="url", + ) diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index d27bb18df..d646f0b86 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev2" # pragma: no cover +__version__ = "0.3.7-dev3" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/confluence.py b/unstructured_ingest/v2/processes/connectors/confluence.py index 604e467dd..dbb296066 100644 --- a/unstructured_ingest/v2/processes/connectors/confluence.py +++ b/unstructured_ingest/v2/processes/connectors/confluence.py @@ -30,16 +30,28 @@ class ConfluenceAccessConfig(AccessConfig): - api_token: str = Field(description="Confluence API token") + api_token: Optional[str] = Field(description="Confluence API token", default=None) + access_token: Optional[str] = Field( + description="Confluence Personal Access Token", default=None + ) class ConfluenceConnectionConfig(ConnectionConfig): url: str = Field(description="URL of the Confluence instance") - user_email: str = Field(description="User email for authentication") + user_email: Optional[str] = Field(description="User email for authentication", default=None) access_config: Secret[ConfluenceAccessConfig] = Field( description="Access configuration for Confluence" ) + def model_post_init(self, __context): + access_configs = self.access_config.get_secret_value() + basic_auth = self.user_email and access_configs.api_token + pat_auth = access_configs.access_token + if basic_auth and pat_auth: + raise ValueError("both forms of auth provided, only one allowed") + if not (basic_auth or pat_auth): + raise ValueError("neither forms of auth provided") + @requires_dependencies(["atlassian"], extras="confluence") def get_client(self) -> "Confluence": from atlassian import Confluence From d841befe468795de725925ba08b08c682e8814a9 Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:53:13 -0500 Subject: [PATCH 05/15] feat/determistic ids for uploads (#286) * create deterministic id for upload use * fix id in sql connector --- CHANGELOG.md | 5 +++-- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/azure_ai_search.py | 4 ++-- unstructured_ingest/v2/processes/connectors/chroma.py | 11 ++++++----- .../connectors/elasticsearch/elasticsearch.py | 4 ++-- unstructured_ingest/v2/processes/connectors/kdbai.py | 4 ++-- .../v2/processes/connectors/pinecone.py | 4 ++-- .../v2/processes/connectors/qdrant/qdrant.py | 10 ++++++---- .../v2/processes/connectors/sql/sql.py | 4 ++-- unstructured_ingest/v2/utils.py | 9 +++++++++ 10 files changed, 35 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 885fc8022..2f1cecbaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.3.7-dev3 +## 0.3.7-dev4 ### Fixes @@ -8,7 +8,8 @@ ### Enhancements * **Kafka source connector has new field: group_id** -* * **Support personal access token for confluence auth** +* **Support personal access token for confluence auth** +* **Leverage deterministic id for uploaded content** ## 0.3.6 diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index d646f0b86..8742e4223 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev3" # pragma: no cover +__version__ = "0.3.7-dev4" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/azure_ai_search.py b/unstructured_ingest/v2/processes/connectors/azure_ai_search.py index 6bf30db2d..0978b9409 100644 --- a/unstructured_ingest/v2/processes/connectors/azure_ai_search.py +++ b/unstructured_ingest/v2/processes/connectors/azure_ai_search.py @@ -1,5 +1,4 @@ import json -import uuid from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any @@ -24,6 +23,7 @@ DestinationRegistryEntry, ) from unstructured_ingest.v2.processes.connectors.utils import parse_datetime +from unstructured_ingest.v2.utils import get_enhanced_element_id if TYPE_CHECKING: from azure.search.documents import SearchClient @@ -100,7 +100,7 @@ def conform_dict(data: dict, file_data: FileData) -> dict: Azure Cognitive Search index """ - data["id"] = str(uuid.uuid4()) + data["id"] = get_enhanced_element_id(element_dict=data, file_data=file_data) data[RECORD_ID_LABEL] = file_data.identifier if points := data.get("metadata", {}).get("coordinates", {}).get("points"): diff --git a/unstructured_ingest/v2/processes/connectors/chroma.py b/unstructured_ingest/v2/processes/connectors/chroma.py index fb4a19b90..96db9b92c 100644 --- a/unstructured_ingest/v2/processes/connectors/chroma.py +++ b/unstructured_ingest/v2/processes/connectors/chroma.py @@ -1,5 +1,4 @@ import json -import uuid from dataclasses import dataclass, field from datetime import date, datetime from pathlib import Path @@ -23,6 +22,7 @@ ) from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry +from unstructured_ingest.v2.utils import get_enhanced_element_id from .utils import conform_string_to_dict @@ -83,13 +83,12 @@ def parse_date_string(date_string: str) -> date: return parser.parse(date_string) @staticmethod - def conform_dict(data: dict) -> dict: + def conform_dict(data: dict, file_data: FileData) -> dict: """ Prepares dictionary in the format that Chroma requires """ - element_id = data.get("element_id", str(uuid.uuid4())) return { - "id": element_id, + "id": get_enhanced_element_id(element_dict=data, file_data=file_data), "embedding": data.pop("embeddings", None), "document": data.pop("text", None), "metadata": flatten_dict(data, separator="-", flatten_lists=True, remove_none=True), @@ -105,7 +104,9 @@ def run( ) -> Path: with open(elements_filepath) as elements_file: elements_contents = json.load(elements_file) - conformed_elements = [self.conform_dict(data=element) for element in elements_contents] + conformed_elements = [ + self.conform_dict(data=element, file_data=file_data) for element in elements_contents + ] output_path = Path(output_dir) / Path(f"{output_filename}.json") with open(output_path, "w") as output_file: json.dump(conformed_elements, output_file) diff --git a/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py b/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py index d2235311e..6424b26e3 100644 --- a/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py +++ b/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py @@ -1,7 +1,6 @@ import hashlib import json import sys -import uuid from contextlib import contextmanager from dataclasses import dataclass, field from pathlib import Path @@ -41,6 +40,7 @@ DestinationRegistryEntry, SourceRegistryEntry, ) +from unstructured_ingest.v2.utils import get_enhanced_element_id if TYPE_CHECKING: from elasticsearch import Elasticsearch as ElasticsearchClient @@ -326,7 +326,7 @@ class ElasticsearchUploadStager(UploadStager): def conform_dict(self, data: dict, file_data: FileData) -> dict: resp = { "_index": self.upload_stager_config.index_name, - "_id": str(uuid.uuid4()), + "_id": get_enhanced_element_id(element_dict=data, file_data=file_data), "_source": { "element_id": data.pop("element_id", None), "embeddings": data.pop("embeddings", None), diff --git a/unstructured_ingest/v2/processes/connectors/kdbai.py b/unstructured_ingest/v2/processes/connectors/kdbai.py index e9bc36feb..0ad703f22 100644 --- a/unstructured_ingest/v2/processes/connectors/kdbai.py +++ b/unstructured_ingest/v2/processes/connectors/kdbai.py @@ -1,5 +1,4 @@ import json -import uuid from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, Optional @@ -24,6 +23,7 @@ from unstructured_ingest.v2.processes.connector_registry import ( DestinationRegistryEntry, ) +from unstructured_ingest.v2.utils import get_enhanced_element_id if TYPE_CHECKING: from kdbai_client import Database, Session, Table @@ -81,7 +81,7 @@ def run( for element in elements_contents: data.append( { - "id": str(uuid.uuid4()), + "id": get_enhanced_element_id(element_dict=element, file_data=file_data), "element_id": element.get("element_id"), "document": element.pop("text", None), "embeddings": element.get("embeddings"), diff --git a/unstructured_ingest/v2/processes/connectors/pinecone.py b/unstructured_ingest/v2/processes/connectors/pinecone.py index 0202f861b..889330c9c 100644 --- a/unstructured_ingest/v2/processes/connectors/pinecone.py +++ b/unstructured_ingest/v2/processes/connectors/pinecone.py @@ -1,5 +1,4 @@ import json -import uuid from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, Optional @@ -21,6 +20,7 @@ ) from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry +from unstructured_ingest.v2.utils import get_enhanced_element_id if TYPE_CHECKING: from pinecone import Index as PineconeIndex @@ -149,7 +149,7 @@ def conform_dict(self, element_dict: dict, file_data: FileData) -> dict: metadata[RECORD_ID_LABEL] = file_data.identifier return { - "id": str(uuid.uuid4()), + "id": get_enhanced_element_id(element_dict=element_dict, file_data=file_data), "values": embeddings, "metadata": metadata, } diff --git a/unstructured_ingest/v2/processes/connectors/qdrant/qdrant.py b/unstructured_ingest/v2/processes/connectors/qdrant/qdrant.py index 85ac59c13..c38a73839 100644 --- a/unstructured_ingest/v2/processes/connectors/qdrant/qdrant.py +++ b/unstructured_ingest/v2/processes/connectors/qdrant/qdrant.py @@ -1,6 +1,5 @@ import asyncio import json -import uuid from abc import ABC, abstractmethod from contextlib import asynccontextmanager from dataclasses import dataclass, field @@ -22,6 +21,7 @@ UploadStagerConfig, ) from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.utils import get_enhanced_element_id if TYPE_CHECKING: from qdrant_client import AsyncQdrantClient @@ -64,10 +64,10 @@ class QdrantUploadStager(UploadStager, ABC): ) @staticmethod - def conform_dict(data: dict) -> dict: + def conform_dict(data: dict, file_data: FileData) -> dict: """Prepares dictionary in the format that Chroma requires""" return { - "id": str(uuid.uuid4()), + "id": get_enhanced_element_id(element_dict=data, file_data=file_data), "vector": data.pop("embeddings", {}), "payload": { "text": data.pop("text", None), @@ -91,7 +91,9 @@ def run( with open(elements_filepath) as elements_file: elements_contents = json.load(elements_file) - conformed_elements = [self.conform_dict(data=element) for element in elements_contents] + conformed_elements = [ + self.conform_dict(data=element, file_data=file_data) for element in elements_contents + ] output_path = Path(output_dir) / Path(f"{output_filename}.json") with open(output_path, "w") as output_file: diff --git a/unstructured_ingest/v2/processes/connectors/sql/sql.py b/unstructured_ingest/v2/processes/connectors/sql/sql.py index ee1ebb866..0fe587aa6 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/sql.py +++ b/unstructured_ingest/v2/processes/connectors/sql/sql.py @@ -1,7 +1,6 @@ import hashlib import json import sys -import uuid from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass, field, replace @@ -35,6 +34,7 @@ download_responses, ) from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.utils import get_enhanced_element_id _COLUMNS = ( "id", @@ -251,7 +251,7 @@ def conform_dict(data: dict, file_data: FileData) -> pd.DataFrame: element.update(data_source) element.update(coordinates) - element["id"] = str(uuid.uuid4()) + element["id"] = get_enhanced_element_id(element_dict=element, file_data=file_data) # remove extraneous, not supported columns element = {k: v for k, v in element.items() if k in _COLUMNS} diff --git a/unstructured_ingest/v2/utils.py b/unstructured_ingest/v2/utils.py index 4a46fe2ad..e09f6dd9e 100644 --- a/unstructured_ingest/v2/utils.py +++ b/unstructured_ingest/v2/utils.py @@ -3,10 +3,13 @@ from inspect import isclass from pathlib import Path from typing import Any +from uuid import NAMESPACE_DNS, uuid5 from pydantic import BaseModel from pydantic.types import _SecretBase +from unstructured_ingest.v2.interfaces import FileData + def is_secret(value: Any) -> bool: # Case Secret[int] @@ -50,3 +53,9 @@ def json_serial(obj): # Support json dumps kwargs such as sort_keys return json.dumps(model_dict, default=json_serial, **json_kwargs) + + +def get_enhanced_element_id(element_dict: dict, file_data: FileData) -> str: + element_id = element_dict.get("element_id") + new_data = f"{element_id}{file_data.identifier}" + return str(uuid5(NAMESPACE_DNS, new_data)) From 4dd4d3023c01380cce6c41d3599392bf4efcba34 Mon Sep 17 00:00:00 2001 From: Filip Knefel <158048836+ds-filipknefel@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:32:34 +0100 Subject: [PATCH 06/15] fix: FSSPEC invalid metadata date field types (#279) Few fsspec connectors: SFTP, Azure, Box and GCS havedate_modified and date_created fields of FileDataSourceMetadata class were of type float | None instead of str | None, modified code creating the metadata to cast float timestamps to strings. --------- Co-authored-by: Filip Knefel --- CHANGELOG.md | 3 ++- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/fsspec/azure.py | 8 ++++++-- unstructured_ingest/v2/processes/connectors/fsspec/box.py | 4 ++-- unstructured_ingest/v2/processes/connectors/fsspec/gcs.py | 4 ++-- .../v2/processes/connectors/fsspec/sftp.py | 5 ++--- 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f1cecbaa..a99841b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ -## 0.3.7-dev4 +## 0.3.7-dev5 ### Fixes +* **Correct fsspec connectors date metadata field types** - sftp, azure, box and gcs * **Fix Kafka source connection problems** * **Fix Azure AI Search session handling** diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 8742e4223..032737740 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev4" # pragma: no cover +__version__ = "0.3.7-dev5" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/azure.py b/unstructured_ingest/v2/processes/connectors/fsspec/azure.py index 4f120bcea..dc3ec8a7d 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/azure.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/azure.py @@ -110,10 +110,14 @@ def run(self, **kwargs: Any) -> Generator[FileData, None, None]: def get_metadata(self, file_data: dict) -> FileDataSourceMetadata: path = file_data["name"] date_created = ( - file_data.get("creation_time").timestamp() if "creation_time" in file_data else None + str(file_data.get("creation_time").timestamp()) + if "creation_time" in file_data + else None ) date_modified = ( - file_data.get("last_modified").timestamp() if "last_modified" in file_data else None + str(file_data.get("last_modified").timestamp()) + if "last_modified" in file_data + else None ) file_size = file_data.get("size") if "size" in file_data else None diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/box.py b/unstructured_ingest/v2/processes/connectors/fsspec/box.py index 2fb83f185..b8d865cd9 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/box.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/box.py @@ -80,9 +80,9 @@ def get_metadata(self, file_data: dict) -> FileDataSourceMetadata: date_created = None date_modified = None if modified_at_str := file_data.get("modified_at"): - date_modified = parser.parse(modified_at_str).timestamp() + date_modified = str(parser.parse(modified_at_str).timestamp()) if created_at_str := file_data.get("created_at"): - date_created = parser.parse(created_at_str).timestamp() + date_created = str(parser.parse(created_at_str).timestamp()) file_size = file_data.get("size") if "size" in file_data else None diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py b/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py index 4f84e5966..4bbe59434 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py @@ -113,9 +113,9 @@ def get_metadata(self, file_data: dict) -> FileDataSourceMetadata: date_created = None date_modified = None if modified_at_str := file_data.get("updated"): - date_modified = parser.parse(modified_at_str).timestamp() + date_modified = str(parser.parse(modified_at_str).timestamp()) if created_at_str := file_data.get("timeCreated"): - date_created = parser.parse(created_at_str).timestamp() + date_created = str(parser.parse(created_at_str).timestamp()) file_size = file_data.get("size") if "size" in file_data else None diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py b/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py index f2c235e19..de1aa0f4d 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py @@ -30,7 +30,6 @@ class SftpIndexerConfig(FsspecIndexerConfig): - def model_post_init(self, __context: Any) -> None: super().model_post_init(__context) _, ext = os.path.splitext(self.remote_url) @@ -99,8 +98,8 @@ def precheck(self) -> None: def get_metadata(self, file_data: dict) -> FileDataSourceMetadata: path = file_data["name"] - date_created = file_data.get("time").timestamp() if "time" in file_data else None - date_modified = file_data.get("mtime").timestamp() if "mtime" in file_data else None + date_created = str(file_data.get("time").timestamp()) if "time" in file_data else None + date_modified = str(file_data.get("mtime").timestamp()) if "mtime" in file_data else None file_size = file_data.get("size") if "size" in file_data else None From d80aa0436b4250429c0f2ebecd4a4a342f20eb1c Mon Sep 17 00:00:00 2001 From: mpolomdeepsense <124889668+mpolomdeepsense@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:32:33 +0100 Subject: [PATCH 07/15] [DS-303] SQL Connectors prevent syntax errors and SQL injection (#273) * Snowflake query data binding fix * Enable SingleSource source connector entry * Fix Snowflake nan issue * Make Singlestore connector more robust against SQL injection * Clean sql upload and add query debug log * Make SQLite connector more robust against sql injection * SQL injection fixes; Changelog and version update * Optimize memory usage of Snowflake uploader * Changelog update: Optimize memory usage of Snowflake uploader --- CHANGELOG.md | 7 ++++++- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/sql/__init__.py | 3 ++- .../processes/connectors/sql/singlestore.py | 16 +++++++-------- .../v2/processes/connectors/sql/snowflake.py | 20 ++++++++++++++----- .../v2/processes/connectors/sql/sql.py | 7 ++++++- .../v2/processes/connectors/sql/sqlite.py | 15 ++++++-------- 7 files changed, 43 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a99841b2e..1d24ae0d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,21 @@ -## 0.3.7-dev5 +## 0.3.7-dev6 ### Fixes * **Correct fsspec connectors date metadata field types** - sftp, azure, box and gcs * **Fix Kafka source connection problems** * **Fix Azure AI Search session handling** +* **Fixes issue with SingleStore Source Connector not being available** +* **Fixes issue with SQLite Source Connector using wrong Indexer** - Caused indexer config parameter error when trying to use SQLite Source +* **Fixes issue with Snowflake Destination Connector `nan` values** - `nan` values were not properly replaced with `None` ### Enhancements * **Kafka source connector has new field: group_id** * **Support personal access token for confluence auth** * **Leverage deterministic id for uploaded content** +* **Makes multiple SQL connectors (Snowflake, SingleStore, SQLite) more robust against SQL injection.** +* **Optimizes memory usage of Snowflake Destination Connector.** ## 0.3.6 diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 032737740..5417231d0 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev5" # pragma: no cover +__version__ = "0.3.7-dev6" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/sql/__init__.py b/unstructured_ingest/v2/processes/connectors/sql/__init__.py index d765a9b2a..edde0d077 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/__init__.py +++ b/unstructured_ingest/v2/processes/connectors/sql/__init__.py @@ -8,7 +8,7 @@ from .postgres import CONNECTOR_TYPE as POSTGRES_CONNECTOR_TYPE from .postgres import postgres_destination_entry, postgres_source_entry from .singlestore import CONNECTOR_TYPE as SINGLESTORE_CONNECTOR_TYPE -from .singlestore import singlestore_destination_entry +from .singlestore import singlestore_destination_entry, singlestore_source_entry from .snowflake import CONNECTOR_TYPE as SNOWFLAKE_CONNECTOR_TYPE from .snowflake import snowflake_destination_entry, snowflake_source_entry from .sqlite import CONNECTOR_TYPE as SQLITE_CONNECTOR_TYPE @@ -17,6 +17,7 @@ add_source_entry(source_type=SQLITE_CONNECTOR_TYPE, entry=sqlite_source_entry) add_source_entry(source_type=POSTGRES_CONNECTOR_TYPE, entry=postgres_source_entry) add_source_entry(source_type=SNOWFLAKE_CONNECTOR_TYPE, entry=snowflake_source_entry) +add_source_entry(source_type=SINGLESTORE_CONNECTOR_TYPE, entry=singlestore_source_entry) add_destination_entry(destination_type=SQLITE_CONNECTOR_TYPE, entry=sqlite_destination_entry) add_destination_entry(destination_type=POSTGRES_CONNECTOR_TYPE, entry=postgres_destination_entry) diff --git a/unstructured_ingest/v2/processes/connectors/sql/singlestore.py b/unstructured_ingest/v2/processes/connectors/sql/singlestore.py index f951c6750..490f20768 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/singlestore.py +++ b/unstructured_ingest/v2/processes/connectors/sql/singlestore.py @@ -91,22 +91,20 @@ class SingleStoreDownloader(SQLDownloader): connection_config: SingleStoreConnectionConfig download_config: SingleStoreDownloaderConfig connector_type: str = CONNECTOR_TYPE + values_delimiter: str = "%s" def query_db(self, file_data: FileData) -> tuple[list[tuple], list[str]]: table_name = file_data.additional_metadata["table_name"] id_column = file_data.additional_metadata["id_column"] - ids = file_data.additional_metadata["ids"] + ids = tuple(file_data.additional_metadata["ids"]) with self.connection_config.get_connection() as sqlite_connection: cursor = sqlite_connection.cursor() fields = ",".join(self.download_config.fields) if self.download_config.fields else "*" - query = "SELECT {fields} FROM {table_name} WHERE {id_column} in ({ids})".format( - fields=fields, - table_name=table_name, - id_column=id_column, - ids=",".join([str(i) for i in ids]), + query = ( + f"SELECT {fields} FROM {table_name} WHERE {id_column} IN {self.values_delimiter}" ) - logger.debug(f"running query: {query}") - cursor.execute(query) + logger.debug(f"running query: {query}\nwith values: {(ids,)}") + cursor.execute(query, (ids,)) rows = cursor.fetchall() columns = [col[0] for col in cursor.description] return rows, columns @@ -154,7 +152,7 @@ def prepare_data( singlestore_source_entry = SourceRegistryEntry( connection_config=SingleStoreConnectionConfig, indexer_config=SingleStoreIndexerConfig, - indexer=SQLIndexer, + indexer=SingleStoreIndexer, downloader_config=SingleStoreDownloaderConfig, downloader=SingleStoreDownloader, ) diff --git a/unstructured_ingest/v2/processes/connectors/sql/snowflake.py b/unstructured_ingest/v2/processes/connectors/sql/snowflake.py index 183cf5e0e..21ce8cf42 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/snowflake.py +++ b/unstructured_ingest/v2/processes/connectors/sql/snowflake.py @@ -7,7 +7,9 @@ import pandas as pd from pydantic import Field, Secret +from unstructured_ingest.utils.data_prep import split_dataframe from unstructured_ingest.utils.dep_check import requires_dependencies +from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.processes.connector_registry import ( DestinationRegistryEntry, SourceRegistryEntry, @@ -135,15 +137,23 @@ def upload_contents(self, path: Path) -> None: df.replace({np.nan: None}, inplace=True) columns = list(df.columns) - stmt = f"INSERT INTO {self.upload_config.table_name} ({','.join(columns)}) VALUES({','.join([self.values_delimiter for x in columns])})" # noqa E501 - - for rows in pd.read_json( - path, orient="records", lines=True, chunksize=self.upload_config.batch_size - ): + stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format( + table_name=self.upload_config.table_name, + columns=",".join(columns), + values=",".join([self.values_delimiter for _ in columns]), + ) + logger.info( + f"writing a total of {len(df)} elements via" + f" document batches to destination" + f" table named {self.upload_config.table_name}" + f" with batch size {self.upload_config.batch_size}" + ) + for rows in split_dataframe(df=df, chunk_size=self.upload_config.batch_size): with self.connection_config.get_cursor() as cursor: values = self.prepare_data(columns, tuple(rows.itertuples(index=False, name=None))) # TODO: executemany break on 'Binding data in type (list) is not supported' for val in values: + logger.debug(f"running query: {stmt}\nwith values: {val}") cursor.execute(stmt, val) diff --git a/unstructured_ingest/v2/processes/connectors/sql/sql.py b/unstructured_ingest/v2/processes/connectors/sql/sql.py index 0fe587aa6..f9591f5d9 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/sql.py +++ b/unstructured_ingest/v2/processes/connectors/sql/sql.py @@ -367,7 +367,11 @@ def upload_contents(self, path: Path) -> None: self._fit_to_schema(df=df, columns=self.get_table_columns()) columns = list(df.columns) - stmt = f"INSERT INTO {self.upload_config.table_name} ({','.join(columns)}) VALUES({','.join([self.values_delimiter for x in columns])})" # noqa E501 + stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format( + table_name=self.upload_config.table_name, + columns=",".join(columns), + values=",".join([self.values_delimiter for _ in columns]), + ) logger.info( f"writing a total of {len(df)} elements via" f" document batches to destination" @@ -384,6 +388,7 @@ def upload_contents(self, path: Path) -> None: # except Exception as e: # print(f"Error: {e}") # print(f"failed to write {len(columns)}, {len(val)}: {stmt} -> {val}") + logger.debug(f"running query: {stmt}") cursor.executemany(stmt, values) def get_table_columns(self) -> list[str]: diff --git a/unstructured_ingest/v2/processes/connectors/sql/sqlite.py b/unstructured_ingest/v2/processes/connectors/sql/sqlite.py index 4f77d5a07..393227ad2 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/sqlite.py +++ b/unstructured_ingest/v2/processes/connectors/sql/sqlite.py @@ -95,6 +95,7 @@ class SQLiteDownloader(SQLDownloader): connection_config: SQLConnectionConfig download_config: SQLDownloaderConfig connector_type: str = CONNECTOR_TYPE + values_delimiter: str = "?" def query_db(self, file_data: FileData) -> tuple[list[tuple], list[str]]: table_name = file_data.additional_metadata["table_name"] @@ -103,14 +104,10 @@ def query_db(self, file_data: FileData) -> tuple[list[tuple], list[str]]: with self.connection_config.get_connection() as sqlite_connection: cursor = sqlite_connection.cursor() fields = ",".join(self.download_config.fields) if self.download_config.fields else "*" - query = "SELECT {fields} FROM {table_name} WHERE {id_column} in ({ids})".format( - fields=fields, - table_name=table_name, - id_column=id_column, - ids=",".join([str(i) for i in ids]), - ) - logger.debug(f"running query: {query}") - cursor.execute(query) + values = ",".join(self.values_delimiter for _ in ids) + query = f"SELECT {fields} FROM {table_name} WHERE {id_column} IN ({values})" + logger.debug(f"running query: {query}\nwith values: {ids}") + cursor.execute(query, ids) rows = cursor.fetchall() columns = [col[0] for col in cursor.description] return rows, columns @@ -157,7 +154,7 @@ def prepare_data( sqlite_source_entry = SourceRegistryEntry( connection_config=SQLiteConnectionConfig, indexer_config=SQLiteIndexerConfig, - indexer=SQLIndexer, + indexer=SQLiteIndexer, downloader_config=SQLiteDownloaderConfig, downloader=SQLiteDownloader, ) From 4e459db27b621cd62c790e6dcad3b0b85b9a517e Mon Sep 17 00:00:00 2001 From: Hubert Rutkowski <157481729+hubert-rutkowski85@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:52:41 +0100 Subject: [PATCH 08/15] feat/Qdrant dest - add cloud test (#248) --- .github/workflows/e2e.yml | 2 + CHANGELOG.md | 1 + test/integration/connectors/test_qdrant.py | 55 ++++++++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 009a2a6fd..07e952997 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -155,6 +155,8 @@ jobs: AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }} MONGODB_URI: ${{ secrets.MONGODB_URI }} MONGODB_DATABASE: ${{ secrets.MONGODB_DATABASE_NAME }} + QDRANT_API_KEY: ${{ secrets.QDRANT_API_KEY }} + QDRANT_SERVER_URL: ${{ secrets.QDRANT_SERVER_URL }} run : | source .venv/bin/activate make install-test diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d24ae0d0..053a4350e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * **Leverage deterministic id for uploaded content** * **Makes multiple SQL connectors (Snowflake, SingleStore, SQLite) more robust against SQL injection.** * **Optimizes memory usage of Snowflake Destination Connector.** +* **Added Qdrant Cloud integration test** ## 0.3.6 diff --git a/test/integration/connectors/test_qdrant.py b/test/integration/connectors/test_qdrant.py index 5d2a7acdb..f351ef336 100644 --- a/test/integration/connectors/test_qdrant.py +++ b/test/integration/connectors/test_qdrant.py @@ -1,4 +1,5 @@ import json +import os import uuid from contextlib import asynccontextmanager from pathlib import Path @@ -9,7 +10,16 @@ from test.integration.connectors.utils.constants import DESTINATION_TAG from test.integration.connectors.utils.docker import container_context +from test.integration.utils import requires_env from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers +from unstructured_ingest.v2.processes.connectors.qdrant.cloud import ( + CloudQdrantAccessConfig, + CloudQdrantConnectionConfig, + CloudQdrantUploader, + CloudQdrantUploaderConfig, + CloudQdrantUploadStager, + CloudQdrantUploadStagerConfig, +) from unstructured_ingest.v2.processes.connectors.qdrant.local import ( CONNECTOR_TYPE as LOCAL_CONNECTOR_TYPE, ) @@ -135,3 +145,48 @@ async def test_qdrant_destination_server(upload_file: Path, tmp_path: Path, dock uploader.run(path=upload_file, file_data=file_data) async with qdrant_client(connection_kwargs) as client: await validate_upload(client=client, upload_file=upload_file) + + +@pytest.mark.asyncio +@pytest.mark.tags(SERVER_CONNECTOR_TYPE, DESTINATION_TAG, "qdrant") +@requires_env("QDRANT_API_KEY", "QDRANT_SERVER_URL") +async def test_qdrant_destination_cloud(upload_file: Path, tmp_path: Path): + server_url = os.environ["QDRANT_SERVER_URL"] + api_key = os.environ["QDRANT_API_KEY"] + connection_kwargs = {"location": server_url, "api_key": api_key} + async with qdrant_client(connection_kwargs) as client: + await client.create_collection(COLLECTION_NAME, vectors_config=VECTORS_CONFIG) + AsyncQdrantClient(**connection_kwargs) + + stager = CloudQdrantUploadStager( + upload_stager_config=CloudQdrantUploadStagerConfig(), + ) + uploader = CloudQdrantUploader( + connection_config=CloudQdrantConnectionConfig( + url=server_url, + access_config=CloudQdrantAccessConfig( + api_key=api_key, + ), + ), + upload_config=CloudQdrantUploaderConfig(collection_name=COLLECTION_NAME), + ) + + file_data = FileData( + source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name), + connector_type=SERVER_CONNECTOR_TYPE, + identifier="mock-file-data", + ) + + staged_upload_file = stager.run( + elements_filepath=upload_file, + file_data=file_data, + output_dir=tmp_path, + output_filename=upload_file.name, + ) + + if uploader.is_async(): + await uploader.run_async(path=staged_upload_file, file_data=file_data) + else: + uploader.run(path=staged_upload_file, file_data=file_data) + async with qdrant_client(connection_kwargs) as client: + await validate_upload(client=client, upload_file=upload_file) From 84d297f00b8fcce0a17ecbee2e00972e1ff5ab5d Mon Sep 17 00:00:00 2001 From: David Potter Date: Thu, 5 Dec 2024 09:40:02 -0800 Subject: [PATCH 09/15] feat/ duckdb destination connector (#285) --- CHANGELOG.md | 7 +- requirements/connectors/duckdb.in | 3 + requirements/connectors/duckdb.txt | 4 + setup.py | 1 + .../integration/connectors/duckdb/__init__.py | 0 .../connectors/duckdb/duckdb-schema.sql | 41 ++++++ .../connectors/duckdb/test_duckdb.py | 82 +++++++++++ .../connectors/duckdb/test_motherduck.py | 106 ++++++++++++++ .../env_setup/kafka/create-kafka-instance.sh | 2 +- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/__init__.py | 1 + .../processes/connectors/duckdb/__init__.py | 15 ++ .../v2/processes/connectors/duckdb/base.py | 99 +++++++++++++ .../v2/processes/connectors/duckdb/duckdb.py | 118 ++++++++++++++++ .../processes/connectors/duckdb/motherduck.py | 133 ++++++++++++++++++ 15 files changed, 611 insertions(+), 3 deletions(-) create mode 100644 requirements/connectors/duckdb.in create mode 100644 requirements/connectors/duckdb.txt create mode 100644 test/integration/connectors/duckdb/__init__.py create mode 100644 test/integration/connectors/duckdb/duckdb-schema.sql create mode 100644 test/integration/connectors/duckdb/test_duckdb.py create mode 100644 test/integration/connectors/duckdb/test_motherduck.py create mode 100644 unstructured_ingest/v2/processes/connectors/duckdb/__init__.py create mode 100644 unstructured_ingest/v2/processes/connectors/duckdb/base.py create mode 100644 unstructured_ingest/v2/processes/connectors/duckdb/duckdb.py create mode 100644 unstructured_ingest/v2/processes/connectors/duckdb/motherduck.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 053a4350e..c97f59f54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ -## 0.3.7-dev6 +## 0.3.7-dev7 + +### Features + +* **Add DuckDB destination connector** Adds support storing artifacts in a local DuckDB database. +* **Add MotherDuck destination connector** Adds support storing artifacts in MotherDuck database. ### Fixes diff --git a/requirements/connectors/duckdb.in b/requirements/connectors/duckdb.in new file mode 100644 index 000000000..a30150296 --- /dev/null +++ b/requirements/connectors/duckdb.in @@ -0,0 +1,3 @@ +-c ../common/constraints.txt + +duckdb \ No newline at end of file diff --git a/requirements/connectors/duckdb.txt b/requirements/connectors/duckdb.txt new file mode 100644 index 000000000..583fccfd2 --- /dev/null +++ b/requirements/connectors/duckdb.txt @@ -0,0 +1,4 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile ./connectors/duckdb.in --output-file ./connectors/duckdb.txt --no-strip-extras --python-version 3.9 +duckdb==1.1.3 + # via -r ./connectors/duckdb.in diff --git a/setup.py b/setup.py index be911ddda..af0c34bb9 100644 --- a/setup.py +++ b/setup.py @@ -95,6 +95,7 @@ def load_requirements(file: Union[str, Path]) -> List[str]: "delta-table": load_requirements("requirements/connectors/delta-table.in"), "discord": load_requirements("requirements/connectors/discord.in"), "dropbox": load_requirements("requirements/connectors/dropbox.in"), + "duckdb": load_requirements("requirements/connectors/duckdb.in"), "elasticsearch": load_requirements("requirements/connectors/elasticsearch.in"), "gcs": load_requirements("requirements/connectors/gcs.in"), "github": load_requirements("requirements/connectors/github.in"), diff --git a/test/integration/connectors/duckdb/__init__.py b/test/integration/connectors/duckdb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/integration/connectors/duckdb/duckdb-schema.sql b/test/integration/connectors/duckdb/duckdb-schema.sql new file mode 100644 index 000000000..25fa2d26e --- /dev/null +++ b/test/integration/connectors/duckdb/duckdb-schema.sql @@ -0,0 +1,41 @@ +CREATE TABLE elements ( + id VARCHAR, + element_id VARCHAR, + text TEXT, + embeddings FLOAT[], + type VARCHAR, + system VARCHAR, + layout_width DECIMAL, + layout_height DECIMAL, + points TEXT, + url TEXT, + version VARCHAR, + date_created INTEGER, + date_modified INTEGER, + date_processed DOUBLE, + permissions_data TEXT, + record_locator TEXT, + category_depth INTEGER, + parent_id VARCHAR, + attached_filename VARCHAR, + filetype VARCHAR, + last_modified TIMESTAMP, + file_directory VARCHAR, + filename VARCHAR, + languages VARCHAR[], + page_number VARCHAR, + links TEXT, + page_name VARCHAR, + link_urls VARCHAR[], + link_texts VARCHAR[], + sent_from VARCHAR[], + sent_to VARCHAR[], + subject VARCHAR, + section VARCHAR, + header_footer_type VARCHAR, + emphasized_text_contents VARCHAR[], + emphasized_text_tags VARCHAR[], + text_as_html TEXT, + regex_metadata TEXT, + detection_class_prob DECIMAL +); \ No newline at end of file diff --git a/test/integration/connectors/duckdb/test_duckdb.py b/test/integration/connectors/duckdb/test_duckdb.py new file mode 100644 index 000000000..084951bc5 --- /dev/null +++ b/test/integration/connectors/duckdb/test_duckdb.py @@ -0,0 +1,82 @@ +import tempfile +from contextlib import contextmanager +from pathlib import Path +from typing import Generator + +import duckdb +import pandas as pd +import pytest + +from test.integration.connectors.utils.constants import DESTINATION_TAG +from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers +from unstructured_ingest.v2.processes.connectors.duckdb.duckdb import ( + CONNECTOR_TYPE, + DuckDBConnectionConfig, + DuckDBUploader, + DuckDBUploaderConfig, + DuckDBUploadStager, +) + + +@contextmanager +def duckdbd_setup() -> Generator[Path, None, None]: + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "temp_duck.db" + db_init_path = Path(__file__).parent / "duckdb-schema.sql" + assert db_init_path.exists() + assert db_init_path.is_file() + with duckdb.connect(database=db_path) as duckdb_connection: + with db_init_path.open("r") as f: + query = f.read() + duckdb_connection.execute(query) + duckdb_connection.close() + yield db_path + + +def validate_duckdb_destination(db_path: Path, expected_num_elements: int): + conn = None + try: + conn = duckdb.connect(db_path) + _results = conn.sql("select count(*) from elements").fetchall() + _count = _results[0][0] + assert ( + _count == expected_num_elements + ), f"dest check failed: got {_count}, expected {expected_num_elements}" + conn.close() + finally: + if conn: + conn.close() + + +@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, "duckdb") +def test_duckdb_destination(upload_file: Path): + with duckdbd_setup() as test_db_path: + with tempfile.TemporaryDirectory() as temp_dir: + file_data = FileData( + source_identifiers=SourceIdentifiers( + fullpath=upload_file.name, filename=upload_file.name + ), + connector_type=CONNECTOR_TYPE, + identifier="mock-file-data", + ) + + # deafults to default stager config + stager = DuckDBUploadStager() + stager_params = { + "elements_filepath": upload_file, + "file_data": file_data, + "output_dir": temp_dir, + "output_filename": "test_db", + } + staged_path = stager.run(**stager_params) + + connection_config = DuckDBConnectionConfig(database=str(test_db_path)) + upload_config = DuckDBUploaderConfig() + uploader = DuckDBUploader( + connection_config=connection_config, upload_config=upload_config + ) + + uploader.run(path=staged_path, file_data=file_data) + + staged_df = pd.read_json(staged_path, orient="records", lines=True) + validate_duckdb_destination(db_path=test_db_path, expected_num_elements=len(staged_df)) diff --git a/test/integration/connectors/duckdb/test_motherduck.py b/test/integration/connectors/duckdb/test_motherduck.py new file mode 100644 index 000000000..4f1ee4f28 --- /dev/null +++ b/test/integration/connectors/duckdb/test_motherduck.py @@ -0,0 +1,106 @@ +import os +import tempfile +import uuid +from contextlib import contextmanager +from pathlib import Path +from typing import Generator + +import duckdb +import pandas as pd +import pytest + +from test.integration.connectors.utils.constants import DESTINATION_TAG +from test.integration.utils import requires_env +from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers +from unstructured_ingest.v2.processes.connectors.duckdb.motherduck import ( + CONNECTOR_TYPE, + MotherDuckAccessConfig, + MotherDuckConnectionConfig, + MotherDuckUploader, + MotherDuckUploaderConfig, + MotherDuckUploadStager, +) + + +@contextmanager +def motherduck_setup(md_token: str) -> Generator[Path, None, None]: + database_name = f"test_{str(uuid.uuid4()).replace('-', '_')}" + try: + db_init_path = Path(__file__).parent / "duckdb-schema.sql" + assert db_init_path.exists() + assert db_init_path.is_file() + with duckdb.connect(f"md:?motherduck_token={md_token}") as md_conn: + with db_init_path.open("r") as f: + query = f.read() + md_conn.execute(f"CREATE DATABASE {database_name}") + md_conn.execute(f"USE {database_name}") + md_conn.execute(query) + md_conn.close() + yield database_name + finally: + with duckdb.connect(f"md:?motherduck_token={md_token}") as md_conn: + md_conn.execute(f"DROP DATABASE {database_name}") + md_conn.close() + + +def validate_motherduck_destination(database: str, expected_num_elements: int, md_token: str): + conn = None + try: + conn = duckdb.connect(f"md:?motherduck_token={md_token}") + conn.execute(f"USE {database}") + _results = conn.sql("select count(*) from elements").fetchall() + _count = _results[0][0] + assert ( + _count == expected_num_elements + ), f"dest check failed: got {_count}, expected {expected_num_elements}" + conn.close() + finally: + if conn: + conn.close() + + +def get_motherduck_token() -> dict: + motherduck_token = os.getenv("MOTHERDUCK_TOKEN", None) + assert motherduck_token + return motherduck_token + + +@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, "motherduck") +@requires_env("MOTHERDUCK_TOKEN") +def test_motherduck_destination(upload_file: Path): + md_token = get_motherduck_token() + with motherduck_setup(md_token) as test_database: + with tempfile.TemporaryDirectory() as temp_dir: + file_data = FileData( + source_identifiers=SourceIdentifiers( + fullpath=upload_file.name, filename=upload_file.name + ), + connector_type=CONNECTOR_TYPE, + identifier="mock-file-data", + ) + + # deafults to default stager config + stager = MotherDuckUploadStager() + stager_params = { + "elements_filepath": upload_file, + "file_data": file_data, + "output_dir": temp_dir, + "output_filename": "test_db", + } + staged_path = stager.run(**stager_params) + + access_config = MotherDuckAccessConfig(md_token=md_token) + connection_config = MotherDuckConnectionConfig( + database=test_database, access_config=access_config + ) + upload_config = MotherDuckUploaderConfig() + uploader = MotherDuckUploader( + connection_config=connection_config, upload_config=upload_config + ) + + uploader.run(path=staged_path, file_data=file_data) + + staged_df = pd.read_json(staged_path, orient="records", lines=True) + validate_motherduck_destination( + database=test_database, expected_num_elements=len(staged_df), md_token=md_token + ) diff --git a/test_e2e/env_setup/kafka/create-kafka-instance.sh b/test_e2e/env_setup/kafka/create-kafka-instance.sh index df9a0b589..a942a63ae 100755 --- a/test_e2e/env_setup/kafka/create-kafka-instance.sh +++ b/test_e2e/env_setup/kafka/create-kafka-instance.sh @@ -4,7 +4,7 @@ set -e SCRIPT_DIR=$(dirname "$(realpath "$0")") -# Create the Weaviate instance +# Create the Kafka instance docker compose version docker compose -f "$SCRIPT_DIR"/docker-compose.yml up --wait docker compose -f "$SCRIPT_DIR"/docker-compose.yml ps diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 5417231d0..70d8de130 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev6" # pragma: no cover +__version__ = "0.3.7-dev7" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/__init__.py b/unstructured_ingest/v2/processes/connectors/__init__.py index 2659c4cd2..5304f7efd 100644 --- a/unstructured_ingest/v2/processes/connectors/__init__.py +++ b/unstructured_ingest/v2/processes/connectors/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations import unstructured_ingest.v2.processes.connectors.databricks # noqa: F401 +import unstructured_ingest.v2.processes.connectors.duckdb # noqa: F401 import unstructured_ingest.v2.processes.connectors.elasticsearch # noqa: F401 import unstructured_ingest.v2.processes.connectors.fsspec # noqa: F401 import unstructured_ingest.v2.processes.connectors.kafka # noqa: F401 diff --git a/unstructured_ingest/v2/processes/connectors/duckdb/__init__.py b/unstructured_ingest/v2/processes/connectors/duckdb/__init__.py new file mode 100644 index 000000000..d51ca8929 --- /dev/null +++ b/unstructured_ingest/v2/processes/connectors/duckdb/__init__.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from unstructured_ingest.v2.processes.connector_registry import ( + add_destination_entry, +) + +from .duckdb import CONNECTOR_TYPE as DUCKDB_CONNECTOR_TYPE +from .duckdb import duckdb_destination_entry +from .motherduck import CONNECTOR_TYPE as MOTHERDUCK_CONNECTOR_TYPE +from .motherduck import motherduck_destination_entry + +add_destination_entry(destination_type=DUCKDB_CONNECTOR_TYPE, entry=duckdb_destination_entry) +add_destination_entry( + destination_type=MOTHERDUCK_CONNECTOR_TYPE, entry=motherduck_destination_entry +) diff --git a/unstructured_ingest/v2/processes/connectors/duckdb/base.py b/unstructured_ingest/v2/processes/connectors/duckdb/base.py new file mode 100644 index 000000000..b375af4f3 --- /dev/null +++ b/unstructured_ingest/v2/processes/connectors/duckdb/base.py @@ -0,0 +1,99 @@ +import json +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import pandas as pd + +from unstructured_ingest.v2.interfaces import FileData, UploadStager + +_COLUMNS = ( + "id", + "element_id", + "text", + "embeddings", + "type", + "system", + "layout_width", + "layout_height", + "points", + "url", + "version", + "date_created", + "date_modified", + "date_processed", + "permissions_data", + "record_locator", + "category_depth", + "parent_id", + "attached_filename", + "filetype", + "last_modified", + "file_directory", + "filename", + "languages", + "page_number", + "links", + "page_name", + "link_urls", + "link_texts", + "sent_from", + "sent_to", + "subject", + "section", + "header_footer_type", + "emphasized_text_contents", + "emphasized_text_tags", + "text_as_html", + "regex_metadata", + "detection_class_prob", +) + +# _DATE_COLUMNS = ("date_created", "date_modified", "date_processed", "last_modified") + + +@dataclass +class BaseDuckDBUploadStager(UploadStager): + + def run( + self, + elements_filepath: Path, + file_data: FileData, + output_dir: Path, + output_filename: str, + **kwargs: Any, + ) -> Path: + with open(elements_filepath) as elements_file: + elements_contents: list[dict] = json.load(elements_file) + output_path = Path(output_dir) / Path(f"{output_filename}.json") + output_path.parent.mkdir(parents=True, exist_ok=True) + + output = [] + for data in elements_contents: + metadata: dict[str, Any] = data.pop("metadata", {}) + data_source = metadata.pop("data_source", {}) + coordinates = metadata.pop("coordinates", {}) + + data.update(metadata) + data.update(data_source) + data.update(coordinates) + + data["id"] = str(uuid.uuid4()) + + # remove extraneous, not supported columns + data = {k: v for k, v in data.items() if k in _COLUMNS} + + output.append(data) + + df = pd.DataFrame.from_dict(output) + + for column in filter( + lambda x: x in df.columns, + ("version", "page_number", "regex_metadata"), + ): + df[column] = df[column].apply(str) + + with output_path.open("w") as output_file: + df.to_json(output_file, orient="records", lines=True) + return output_path diff --git a/unstructured_ingest/v2/processes/connectors/duckdb/duckdb.py b/unstructured_ingest/v2/processes/connectors/duckdb/duckdb.py new file mode 100644 index 000000000..3c9b29776 --- /dev/null +++ b/unstructured_ingest/v2/processes/connectors/duckdb/duckdb.py @@ -0,0 +1,118 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Optional + +import pandas as pd +from pydantic import Field, Secret + +from unstructured_ingest.error import DestinationConnectionError +from unstructured_ingest.utils.dep_check import requires_dependencies +from unstructured_ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + FileData, + Uploader, + UploaderConfig, + UploadStagerConfig, +) +from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry +from unstructured_ingest.v2.processes.connectors.duckdb.base import BaseDuckDBUploadStager + +if TYPE_CHECKING: + from duckdb import DuckDBPyConnection as DuckDBConnection + +CONNECTOR_TYPE = "duckdb" + + +class DuckDBAccessConfig(AccessConfig): + pass + + +class DuckDBConnectionConfig(ConnectionConfig): + connector_type: str = Field(default=CONNECTOR_TYPE, init=False) + database: Optional[str] = Field( + default=None, + description="Database name. Path to the DuckDB .db file. If the file does " + "not exist, it will be created at the specified path.", + ) + db_schema: Optional[str] = Field( + default="main", + description="Schema name. Schema in the database where the elements table is located.", + ) + table: Optional[str] = Field( + default="elements", + description="Table name. Table name into which the elements data is inserted.", + ) + access_config: Secret[DuckDBAccessConfig] = Field( + default=DuckDBAccessConfig(), validate_default=True + ) + + def __post_init__(self): + if self.database is None: + raise ValueError( + "A DuckDB connection requires a path to a *.db or *.duckdb file " + "through the `database` argument" + ) + + +class DuckDBUploadStagerConfig(UploadStagerConfig): + pass + + +@dataclass +class DuckDBUploadStager(BaseDuckDBUploadStager): + upload_stager_config: DuckDBUploadStagerConfig = field( + default_factory=lambda: DuckDBUploadStagerConfig() + ) + + +class DuckDBUploaderConfig(UploaderConfig): + batch_size: int = Field(default=50, description="[Not-used] Number of records per batch") + + +@dataclass +class DuckDBUploader(Uploader): + connector_type: str = CONNECTOR_TYPE + upload_config: DuckDBUploaderConfig + connection_config: DuckDBConnectionConfig + + def precheck(self) -> None: + try: + cursor = self.connection().cursor() + cursor.execute("SELECT 1;") + cursor.close() + except Exception as e: + logger.error(f"failed to validate connection: {e}", exc_info=True) + raise DestinationConnectionError(f"failed to validate connection: {e}") + + @property + def connection(self) -> Callable[[], "DuckDBConnection"]: + return self._make_duckdb_connection + + @requires_dependencies(["duckdb"], extras="duckdb") + def _make_duckdb_connection(self) -> "DuckDBConnection": + import duckdb + + return duckdb.connect(self.connection_config.database) + + def upload_contents(self, path: Path) -> None: + df_elements = pd.read_json(path, orient="records", lines=True) + logger.debug(f"uploading {len(df_elements)} entries to {self.connection_config.database} ") + + with self.connection() as conn: + conn.query( + f"INSERT INTO {self.connection_config.db_schema}.{self.connection_config.table} BY NAME SELECT * FROM df_elements" # noqa: E501 + ) + + def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: + self.upload_contents(path=path) + + +duckdb_destination_entry = DestinationRegistryEntry( + connection_config=DuckDBConnectionConfig, + uploader=DuckDBUploader, + uploader_config=DuckDBUploaderConfig, + upload_stager=DuckDBUploadStager, + upload_stager_config=DuckDBUploadStagerConfig, +) diff --git a/unstructured_ingest/v2/processes/connectors/duckdb/motherduck.py b/unstructured_ingest/v2/processes/connectors/duckdb/motherduck.py new file mode 100644 index 000000000..d0d283502 --- /dev/null +++ b/unstructured_ingest/v2/processes/connectors/duckdb/motherduck.py @@ -0,0 +1,133 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Optional + +import pandas as pd +from pydantic import Field, Secret + +from unstructured_ingest.__version__ import __version__ as unstructured_io_ingest_version +from unstructured_ingest.error import DestinationConnectionError +from unstructured_ingest.utils.dep_check import requires_dependencies +from unstructured_ingest.v2.interfaces import ( + AccessConfig, + ConnectionConfig, + FileData, + Uploader, + UploaderConfig, + UploadStagerConfig, +) +from unstructured_ingest.v2.logger import logger +from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry +from unstructured_ingest.v2.processes.connectors.duckdb.base import BaseDuckDBUploadStager + +if TYPE_CHECKING: + from duckdb import DuckDBPyConnection as MotherDuckConnection + +CONNECTOR_TYPE = "motherduck" + + +class MotherDuckAccessConfig(AccessConfig): + md_token: Optional[str] = Field(default=None, description="MotherDuck token") + + +class MotherDuckConnectionConfig(ConnectionConfig): + connector_type: str = Field(default=CONNECTOR_TYPE, init=False) + database: Optional[str] = Field( + default=None, + description="Database name. Name of the MotherDuck database.", + ) + db_schema: Optional[str] = Field( + default="main", + description="Schema name. Schema in the database where the elements table is located.", + ) + table: Optional[str] = Field( + default="elements", + description="Table name. Table name into which the elements data is inserted.", + ) + access_config: Secret[MotherDuckAccessConfig] = Field( + default=MotherDuckAccessConfig(), validate_default=True + ) + + def __post_init__(self): + if self.database is None: + raise ValueError( + "A MotherDuck connection requires a database (string) to be passed " + "through the `database` argument" + ) + if self.access_config.md_token is None: + raise ValueError( + "A MotherDuck connection requires a md_token (MotherDuck token) to be passed " + "using MotherDuckAccessConfig through the `access_config` argument" + ) + + +class MotherDuckUploadStagerConfig(UploadStagerConfig): + pass + + +@dataclass +class MotherDuckUploadStager(BaseDuckDBUploadStager): + upload_stager_config: MotherDuckUploadStagerConfig = field( + default_factory=lambda: MotherDuckUploadStagerConfig() + ) + + +class MotherDuckUploaderConfig(UploaderConfig): + batch_size: int = Field(default=50, description="[Not-used] Number of records per batch") + + +@dataclass +class MotherDuckUploader(Uploader): + connector_type: str = CONNECTOR_TYPE + upload_config: MotherDuckUploaderConfig + connection_config: MotherDuckConnectionConfig + + def precheck(self) -> None: + try: + cursor = self.connection().cursor() + cursor.execute("SELECT 1;") + cursor.close() + except Exception as e: + logger.error(f"failed to validate connection: {e}", exc_info=True) + raise DestinationConnectionError(f"failed to validate connection: {e}") + + @property + def connection(self) -> Callable[[], "MotherDuckConnection"]: + return self._make_motherduck_connection + + @requires_dependencies(["duckdb"], extras="duckdb") + def _make_motherduck_connection(self) -> "MotherDuckConnection": + import duckdb + + access_config = self.connection_config.access_config.get_secret_value() + conn = duckdb.connect( + f"md:?motherduck_token={access_config.md_token}", + config={ + "custom_user_agent": f"unstructured-io-ingest/{unstructured_io_ingest_version}" + }, + ) + + conn.sql(f"USE {self.connection_config.database}") + + return conn + + def upload_contents(self, path: Path) -> None: + df_elements = pd.read_json(path, orient="records", lines=True) + logger.debug(f"uploading {len(df_elements)} entries to {self.connection_config.database} ") + + with self.connection() as conn: + conn.query( + f"INSERT INTO {self.connection_config.db_schema}.{self.connection_config.table} BY NAME SELECT * FROM df_elements" # noqa: E501 + ) + + def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: + self.upload_contents(path=path) + + +motherduck_destination_entry = DestinationRegistryEntry( + connection_config=MotherDuckConnectionConfig, + uploader=MotherDuckUploader, + uploader_config=MotherDuckUploaderConfig, + upload_stager=MotherDuckUploadStager, + upload_stager_config=MotherDuckUploadStagerConfig, +) From 8c8643ec9d7745580b1d529012428bb03415f412 Mon Sep 17 00:00:00 2001 From: mpolomdeepsense <124889668+mpolomdeepsense@users.noreply.github.com> Date: Fri, 6 Dec 2024 11:44:38 +0100 Subject: [PATCH 10/15] =?UTF-8?q?=F0=9F=94=80=20fix:=20DS-328=20Snowflake?= =?UTF-8?q?=20Downloader=20error=20(#287)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix Snowflake downloader * Changelog and version update: Fix Snowflake downloader * Replace Snowflake source connector inheritance with SQL classes * Comment on snowflake dependency name * Get rid of snowflake postgres inheritance. Replaced with SQL. * Fix lint * Version update: Fix Snowflake downloader --- CHANGELOG.md | 6 ++ unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/sql/snowflake.py | 66 +++++++++++++------ 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c97f59f54..1deba237d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.3.7-dev8 + +### Fixes + +* **Fixes Snowflake source `'SnowflakeCursor' object has no attribute 'mogrify'` error** + ## 0.3.7-dev7 ### Features diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 70d8de130..f7c67a947 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev7" # pragma: no cover +__version__ = "0.3.7-dev8" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/sql/snowflake.py b/unstructured_ingest/v2/processes/connectors/sql/snowflake.py index 21ce8cf42..c55b40a54 100644 --- a/unstructured_ingest/v2/processes/connectors/sql/snowflake.py +++ b/unstructured_ingest/v2/processes/connectors/sql/snowflake.py @@ -9,22 +9,24 @@ from unstructured_ingest.utils.data_prep import split_dataframe from unstructured_ingest.utils.dep_check import requires_dependencies +from unstructured_ingest.v2.interfaces.file_data import FileData from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.processes.connector_registry import ( DestinationRegistryEntry, SourceRegistryEntry, ) -from unstructured_ingest.v2.processes.connectors.sql.postgres import ( - PostgresDownloader, - PostgresDownloaderConfig, - PostgresIndexer, - PostgresIndexerConfig, - PostgresUploader, - PostgresUploaderConfig, - PostgresUploadStager, - PostgresUploadStagerConfig, +from unstructured_ingest.v2.processes.connectors.sql.sql import ( + SQLAccessConfig, + SQLConnectionConfig, + SQLDownloader, + SQLDownloaderConfig, + SQLIndexer, + SQLIndexerConfig, + SQLUploader, + SQLUploaderConfig, + SQLUploadStager, + SQLUploadStagerConfig, ) -from unstructured_ingest.v2.processes.connectors.sql.sql import SQLAccessConfig, SQLConnectionConfig if TYPE_CHECKING: from snowflake.connector import SnowflakeConnection @@ -61,6 +63,7 @@ class SnowflakeConnectionConfig(SQLConnectionConfig): connector_type: str = Field(default=CONNECTOR_TYPE, init=False) @contextmanager + # The actual snowflake module package name is: snowflake-connector-python @requires_dependencies(["snowflake"], extras="snowflake") def get_connection(self) -> Generator["SnowflakeConnection", None, None]: # https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-snowflake-connector-methods-connect @@ -91,42 +94,67 @@ def get_cursor(self) -> Generator["SnowflakeCursor", None, None]: cursor.close() -class SnowflakeIndexerConfig(PostgresIndexerConfig): +class SnowflakeIndexerConfig(SQLIndexerConfig): pass @dataclass -class SnowflakeIndexer(PostgresIndexer): +class SnowflakeIndexer(SQLIndexer): connection_config: SnowflakeConnectionConfig index_config: SnowflakeIndexerConfig connector_type: str = CONNECTOR_TYPE -class SnowflakeDownloaderConfig(PostgresDownloaderConfig): +class SnowflakeDownloaderConfig(SQLDownloaderConfig): pass @dataclass -class SnowflakeDownloader(PostgresDownloader): +class SnowflakeDownloader(SQLDownloader): connection_config: SnowflakeConnectionConfig download_config: SnowflakeDownloaderConfig connector_type: str = CONNECTOR_TYPE + values_delimiter: str = "?" - -class SnowflakeUploadStagerConfig(PostgresUploadStagerConfig): + # The actual snowflake module package name is: snowflake-connector-python + @requires_dependencies(["snowflake"], extras="snowflake") + def query_db(self, file_data: FileData) -> tuple[list[tuple], list[str]]: + table_name = file_data.additional_metadata["table_name"] + id_column = file_data.additional_metadata["id_column"] + ids = file_data.additional_metadata["ids"] + + with self.connection_config.get_cursor() as cursor: + query = "SELECT {fields} FROM {table_name} WHERE {id_column} IN ({values})".format( + table_name=table_name, + id_column=id_column, + fields=( + ",".join(self.download_config.fields) if self.download_config.fields else "*" + ), + values=",".join([self.values_delimiter for _ in ids]), + ) + logger.debug(f"running query: {query}\nwith values: {ids}") + cursor.execute(query, ids) + rows = [ + tuple(row.values()) if isinstance(row, dict) else row for row in cursor.fetchall() + ] + columns = [col[0] for col in cursor.description] + return rows, columns + + +class SnowflakeUploadStagerConfig(SQLUploadStagerConfig): pass -class SnowflakeUploadStager(PostgresUploadStager): +class SnowflakeUploadStager(SQLUploadStager): upload_stager_config: SnowflakeUploadStagerConfig -class SnowflakeUploaderConfig(PostgresUploaderConfig): +class SnowflakeUploaderConfig(SQLUploaderConfig): pass @dataclass -class SnowflakeUploader(PostgresUploader): +class SnowflakeUploader(SQLUploader): upload_config: SnowflakeUploaderConfig = field(default_factory=SnowflakeUploaderConfig) connection_config: SnowflakeConnectionConfig connector_type: str = CONNECTOR_TYPE From afb0d95256020b553d122032418d82e9912ac9dd Mon Sep 17 00:00:00 2001 From: mateuszkuprowski <154606096+mateuszkuprowski@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:56:51 +0100 Subject: [PATCH 11/15] feat: Refined box connector to actually use config JSON directly (#258) Refined box connector to actually use config JSON directly --------- Co-authored-by: Mateusz Kuprowski Co-authored-by: Michal Martyniak --- CHANGELOG.md | 6 ++++ test_e2e/src/box.sh | 20 +++++-------- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/fsspec/box.py | 30 +++++++++++++------ 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1deba237d..abe38224f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.3.7-dev9 + +### Fixes + +* **Box source connector can now use raw JSON as access token instead of file path to JSON** + ## 0.3.7-dev8 ### Fixes diff --git a/test_e2e/src/box.sh b/test_e2e/src/box.sh index 441d50218..2e0a3a256 100755 --- a/test_e2e/src/box.sh +++ b/test_e2e/src/box.sh @@ -27,22 +27,18 @@ function cleanup() { } trap cleanup EXIT -if [ -z "$BOX_APP_CONFIG" ] && [ -z "$BOX_APP_CONFIG_PATH" ]; then - echo "Skipping Box ingest test because neither BOX_APP_CONFIG nor BOX_APP_CONFIG_PATH env vars are set." +if [ -z "$BOX_APP_CONFIG" ]; then + echo "Skipping Box ingest test because BOX_APP_CONFIG env var is not set." exit 8 fi -if [ -z "$BOX_APP_CONFIG_PATH" ]; then - # Create temporary service key file - BOX_APP_CONFIG_PATH=$(mktemp) - echo "$BOX_APP_CONFIG" >"$BOX_APP_CONFIG_PATH" - +# Make sure the BOX_APP_CONFIG is valid JSON +echo "Checking valid JSON in BOX_APP_CONFIG environment variable" +if ! echo "$BOX_APP_CONFIG" | jq 'keys' >/dev/null; then + echo "BOX_APP_CONFIG does not contain valid JSON. Exiting." + exit 1 fi -# Make sure valid json -echo "Checking valid json at file $BOX_APP_CONFIG_PATH" -jq 'keys' <"$BOX_APP_CONFIG_PATH" - RUN_SCRIPT=${RUN_SCRIPT:-./unstructured_ingest/main.py} PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \ box \ @@ -50,7 +46,7 @@ PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \ --partition-by-api \ --partition-endpoint "https://api.unstructuredapp.io" \ --download-dir "$DOWNLOAD_DIR" \ - --box-app-config "$BOX_APP_CONFIG_PATH" \ + --box-app-config "$BOX_APP_CONFIG" \ --remote-url box://utic-test-ingest-fixtures \ --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ --output-dir "$OUTPUT_DIR" \ diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index f7c67a947..8c4b9d3d4 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev8" # pragma: no cover +__version__ = "0.3.7-dev9" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/box.py b/unstructured_ingest/v2/processes/connectors/fsspec/box.py index b8d865cd9..aa8b96a8b 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/box.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/box.py @@ -3,10 +3,11 @@ from dataclasses import dataclass, field from pathlib import Path from time import time -from typing import Any, Generator, Optional +from typing import Annotated, Any, Generator, Optional from dateutil import parser from pydantic import Field, Secret +from pydantic.functional_validators import BeforeValidator from unstructured_ingest.utils.dep_check import requires_dependencies from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata @@ -23,7 +24,9 @@ FsspecIndexerConfig, FsspecUploader, FsspecUploaderConfig, + SourceConnectionError, ) +from unstructured_ingest.v2.processes.connectors.utils import conform_string_to_dict CONNECTOR_TYPE = "box" @@ -33,26 +36,35 @@ class BoxIndexerConfig(FsspecIndexerConfig): class BoxAccessConfig(FsspecAccessConfig): - box_app_config: Optional[str] = Field( - default=None, description="Path to Box app credentials as json file." + box_app_config: Annotated[dict, BeforeValidator(conform_string_to_dict)] = Field( + description="Box app credentials as a JSON string." ) class BoxConnectionConfig(FsspecConnectionConfig): supported_protocols: list[str] = field(default_factory=lambda: ["box"], init=False) - access_config: Secret[BoxAccessConfig] = Field(default=BoxAccessConfig(), validate_default=True) + access_config: Secret[BoxAccessConfig] connector_type: str = Field(default=CONNECTOR_TYPE, init=False) def get_access_config(self) -> dict[str, Any]: - # Return access_kwargs with oauth. The oauth object cannot be stored directly in the config - # because it is not serializable. from boxsdk import JWTAuth ac = self.access_config.get_secret_value() + settings_dict = ac.box_app_config + + # Create and authenticate the JWTAuth object + oauth = JWTAuth.from_settings_dictionary(settings_dict) + try: + oauth.authenticate_instance() + except Exception as e: + raise SourceConnectionError(f"Failed to authenticate with Box: {e}") + + if not oauth.access_token: + raise SourceConnectionError("Authentication failed: No access token generated.") + + # Prepare the access configuration with the authenticated oauth access_kwargs_with_oauth: dict[str, Any] = { - "oauth": JWTAuth.from_settings_file( - ac.box_app_config, - ), + "oauth": oauth, } access_config: dict[str, Any] = ac.model_dump() access_config.pop("box_app_config", None) From 7e9909cb51f2d2090cd626c99e437fc52a6f7a69 Mon Sep 17 00:00:00 2001 From: Filip Knefel <158048836+ds-filipknefel@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:31:53 +0100 Subject: [PATCH 12/15] fix: update fsspec upload paths to work independent of OS (#291) When run on windows Path() converts slashes to backward slashes which are not correctly interpreted when passed to (non-local) fsspec filesystem. Instead of using str() use .to_posix() to mitigate this effect in fsspec code. --------- Co-authored-by: Filip Knefel --- CHANGELOG.md | 24 +++++-------------- unstructured_ingest/__version__.py | 2 +- .../v2/processes/connectors/fsspec/fsspec.py | 6 ++--- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abe38224f..290c8a902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,21 +1,4 @@ -## 0.3.7-dev9 - -### Fixes - -* **Box source connector can now use raw JSON as access token instead of file path to JSON** - -## 0.3.7-dev8 - -### Fixes - -* **Fixes Snowflake source `'SnowflakeCursor' object has no attribute 'mogrify'` error** - -## 0.3.7-dev7 - -### Features - -* **Add DuckDB destination connector** Adds support storing artifacts in a local DuckDB database. -* **Add MotherDuck destination connector** Adds support storing artifacts in MotherDuck database. +## 0.3.7-dev10 ### Fixes @@ -25,6 +8,9 @@ * **Fixes issue with SingleStore Source Connector not being available** * **Fixes issue with SQLite Source Connector using wrong Indexer** - Caused indexer config parameter error when trying to use SQLite Source * **Fixes issue with Snowflake Destination Connector `nan` values** - `nan` values were not properly replaced with `None` +* **Fixes Snowflake source `'SnowflakeCursor' object has no attribute 'mogrify'` error** +* **Box source connector can now use raw JSON as access token instead of file path to JSON** +* **Fix fsspec upload paths to be OS independent** ### Enhancements @@ -34,6 +20,8 @@ * **Makes multiple SQL connectors (Snowflake, SingleStore, SQLite) more robust against SQL injection.** * **Optimizes memory usage of Snowflake Destination Connector.** * **Added Qdrant Cloud integration test** +* **Add DuckDB destination connector** Adds support storing artifacts in a local DuckDB database. +* **Add MotherDuck destination connector** Adds support storing artifacts in MotherDuck database. ## 0.3.6 diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 8c4b9d3d4..c649da8b8 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev9" # pragma: no cover +__version__ = "0.3.7-dev10" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py index ca36fd0b2..4005ba41b 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py @@ -297,7 +297,7 @@ def precheck(self) -> None: **self.connection_config.get_access_config(), ) upload_path = Path(self.upload_config.path_without_protocol) / "_empty" - fs.write_bytes(path=str(upload_path), value=b"") + fs.write_bytes(path=upload_path.as_posix(), value=b"") except Exception as e: logger.error(f"failed to validate connection: {e}", exc_info=True) raise DestinationConnectionError(f"failed to validate connection: {e}") @@ -314,11 +314,11 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: path_str = str(path.resolve()) upload_path = self.get_upload_path(file_data=file_data) logger.debug(f"writing local file {path_str} to {upload_path}") - self.fs.upload(lpath=path_str, rpath=str(upload_path)) + self.fs.upload(lpath=path_str, rpath=upload_path.as_posix()) async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None: upload_path = self.get_upload_path(file_data=file_data) path_str = str(path.resolve()) # Odd that fsspec doesn't run exists() as async even when client support async logger.debug(f"writing local file {path_str} to {upload_path}") - self.fs.upload(lpath=path_str, rpath=str(upload_path)) + self.fs.upload(lpath=path_str, rpath=upload_path.as_posix()) From a36ffef466e3740b5145834eb673e10bf81a4868 Mon Sep 17 00:00:00 2001 From: Filip Knefel <158048836+ds-filipknefel@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:48:15 +0100 Subject: [PATCH 13/15] fix: properly log elasticsearch upload errors (#289) Original error logging was never called because by default parallel_bulk re-raises exceptions and raises errors for non 2XX responses and these were not caught. We change the logic to catch, log and re-raise errors on our side. Error log is sanitized to remove the uploaded object contents from it. --------- Co-authored-by: Filip Knefel --- CHANGELOG.md | 4 +- unstructured_ingest/__version__.py | 2 +- .../connectors/elasticsearch/elasticsearch.py | 45 +++++++++++++------ 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 290c8a902..0fd5de964 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ -## 0.3.7-dev10 + +## 0.3.7-dev11 ### Fixes @@ -11,6 +12,7 @@ * **Fixes Snowflake source `'SnowflakeCursor' object has no attribute 'mogrify'` error** * **Box source connector can now use raw JSON as access token instead of file path to JSON** * **Fix fsspec upload paths to be OS independent** +* **Properly log elasticsearch upload errors** ### Enhancements diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index c649da8b8..1aeb4ecdd 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev10" # pragma: no cover +__version__ = "0.3.7-dev11" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py b/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py index 6424b26e3..b5d908109 100644 --- a/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py +++ b/unstructured_ingest/v2/processes/connectors/elasticsearch/elasticsearch.py @@ -1,3 +1,4 @@ +import collections import hashlib import json import sys @@ -425,7 +426,10 @@ def delete_by_record_id(self, client, file_data: FileData) -> None: if failures := delete_resp.get("failures"): raise WriteError(f"failed to delete records: {failures}") - def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: + @requires_dependencies(["elasticsearch"], extras="elasticsearch") + def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: # type: ignore + from elasticsearch.helpers.errors import BulkIndexError + parallel_bulk = self.load_parallel_bulk() with path.open("r") as file: elements_dict = json.load(file) @@ -449,18 +453,33 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: for batch in generator_batching_wbytes( elements_dict, batch_size_limit_bytes=self.upload_config.batch_size_bytes ): - for success, info in parallel_bulk( - client=client, - actions=batch, - thread_count=self.upload_config.num_threads, - ): - if not success: - logger.error( - "upload failed for a batch in " - f"{(self.__class__.__name__).replace('Uploader', '')} " - "destination connector:", - info, - ) + try: + iterator = parallel_bulk( + client=client, + actions=batch, + thread_count=self.upload_config.num_threads, + ) + collections.deque(iterator, maxlen=0) + except BulkIndexError as e: + sanitized_errors = [ + self._sanitize_bulk_index_error(error) for error in e.errors + ] + logger.error( + f"Batch upload failed - {e} - with following errors: {sanitized_errors}" + ) + raise e + except Exception as e: + logger.error(f"Batch upload failed - {e}") + raise e + + def _sanitize_bulk_index_error(self, error: dict[str, dict]) -> dict: + """Remove data uploaded to index from the log, leave only error information. + + Error structure is `{: {..., "data": }}` + """ + for error_data in error.values(): + error_data.pop("data", None) + return error elasticsearch_source_entry = SourceRegistryEntry( From 27caff3dd3fa9ea4ef0157539d6d6b6eb771aa21 Mon Sep 17 00:00:00 2001 From: Filip Knefel <158048836+ds-filipknefel@users.noreply.github.com> Date: Mon, 9 Dec 2024 10:52:21 +0100 Subject: [PATCH 14/15] chore: update weaviate example (#272) Update Weaviate connector example --------- Co-authored-by: Filip Knefel --- CHANGELOG.md | 3 ++- unstructured_ingest/__version__.py | 2 +- unstructured_ingest/v2/examples/weaviate.py | 21 ++++++++++----------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fd5de964..4af98c90c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ -## 0.3.7-dev11 +## 0.3.7-dev12 ### Fixes @@ -24,6 +24,7 @@ * **Added Qdrant Cloud integration test** * **Add DuckDB destination connector** Adds support storing artifacts in a local DuckDB database. * **Add MotherDuck destination connector** Adds support storing artifacts in MotherDuck database. +* **Update weaviate v2 example** ## 0.3.6 diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 1aeb4ecdd..8bd7c9660 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.7-dev11" # pragma: no cover +__version__ = "0.3.7-dev12" # pragma: no cover diff --git a/unstructured_ingest/v2/examples/weaviate.py b/unstructured_ingest/v2/examples/weaviate.py index aedd2e3e1..f8bc08732 100644 --- a/unstructured_ingest/v2/examples/weaviate.py +++ b/unstructured_ingest/v2/examples/weaviate.py @@ -9,11 +9,11 @@ LocalDownloaderConfig, LocalIndexerConfig, ) -from unstructured_ingest.v2.processes.connectors.weaviate import ( +from unstructured_ingest.v2.processes.connectors.weaviate.local import ( CONNECTOR_TYPE, - WeaviateConnectionConfig, - WeaviateUploaderConfig, - WeaviateUploadStagerConfig, + LocalWeaviateConnectionConfig, + LocalWeaviateUploaderConfig, + LocalWeaviateUploadStagerConfig, ) from unstructured_ingest.v2.processes.embedder import EmbedderConfig from unstructured_ingest.v2.processes.partitioner import PartitionerConfig @@ -34,12 +34,11 @@ partitioner_config=PartitionerConfig(strategy="fast"), chunker_config=ChunkerConfig(chunking_strategy="by_title"), embedder_config=EmbedderConfig(embedding_provider="huggingface"), - destination_connection_config=WeaviateConnectionConfig( - host_url="http://localhost:8080", - class_name="elements", - access_config=None, - anonymous=True, + destination_connection_config=LocalWeaviateConnectionConfig( + # Connects to http://localhost:8080 + ), + stager_config=LocalWeaviateUploadStagerConfig(), + uploader_config=LocalWeaviateUploaderConfig( + collection="elements", batch_size=10, dynamic_batch=False ), - stager_config=WeaviateUploadStagerConfig(), - uploader_config=WeaviateUploaderConfig(batch_size=10), ).run() From dfdf5c77b610cf96e585bc8469a94b98158878f9 Mon Sep 17 00:00:00 2001 From: Mateusz Kuprowski Date: Mon, 9 Dec 2024 12:17:03 +0100 Subject: [PATCH 15/15] Reverted order of Chroma clients with regards to importance --- .../v2/processes/connectors/chroma.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/unstructured_ingest/v2/processes/connectors/chroma.py b/unstructured_ingest/v2/processes/connectors/chroma.py index 96db9b92c..aa6489963 100644 --- a/unstructured_ingest/v2/processes/connectors/chroma.py +++ b/unstructured_ingest/v2/processes/connectors/chroma.py @@ -135,15 +135,7 @@ def create_client(self) -> "Client": import chromadb access_config = self.connection_config.access_config.get_secret_value() - if self.connection_config.path: - return chromadb.PersistentClient( - path=self.connection_config.path, - settings=access_config.settings, - tenant=self.connection_config.tenant, - database=self.connection_config.database, - ) - - elif self.connection_config.host and self.connection_config.port: + if self.connection_config.host and self.connection_config.port: return chromadb.HttpClient( host=self.connection_config.host, port=self.connection_config.port, @@ -153,6 +145,13 @@ def create_client(self) -> "Client": tenant=self.connection_config.tenant, database=self.connection_config.database, ) + elif self.connection_config.path: + return chromadb.PersistentClient( + path=self.connection_config.path, + settings=access_config.settings, + tenant=self.connection_config.tenant, + database=self.connection_config.database, + ) else: raise ValueError("Chroma connector requires either path or host and port to be set.")