diff --git a/CHANGELOG.md b/CHANGELOG.md index 89432e986..b5b55645f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.3.13-dev3 + +### Fixes + +* **Added pinecone namespace upset test** + + ## 0.3.13-dev2 ### Fixes diff --git a/test/integration/connectors/test_pinecone.py b/test/integration/connectors/test_pinecone.py index 3c393bcfd..92612e49e 100644 --- a/test/integration/connectors/test_pinecone.py +++ b/test/integration/connectors/test_pinecone.py @@ -4,7 +4,7 @@ import re import time from pathlib import Path -from typing import Generator +from typing import Generator, Optional from uuid import uuid4 import pytest @@ -109,14 +109,25 @@ def pinecone_index() -> Generator[str, None, None]: def validate_pinecone_index( - index_name: str, expected_num_of_vectors: int, retries=30, interval=1 + index_name: str, + expected_num_of_vectors: int, + namespace: Optional[str] = None, + retries=30, + interval=1, ) -> None: - # Because there's a delay for the index to catch up to the recent writes, add in a retry - pinecone = Pinecone(api_key=get_api_key()) - index = pinecone.Index(name=index_name) + """ + Validates that `expected_num_of_vectors` are present in a Pinecone index, + optionally in a specific namespace. + """ + pinecone_client = Pinecone(api_key=get_api_key()) + index = pinecone_client.Index(name=index_name) + vector_count = -1 for i in range(retries): - index_stats = index.describe_index_stats() + if namespace: + index_stats = index.describe_index_stats(namespace=namespace) + else: + index_stats = index.describe_index_stats() # all namespaces vector_count = index_stats["total_vector_count"] if vector_count == expected_num_of_vectors: logger.info(f"expected {expected_num_of_vectors} == vector count {vector_count}") @@ -125,9 +136,10 @@ def validate_pinecone_index( f"retry attempt {i}: expected {expected_num_of_vectors} != vector count {vector_count}" ) time.sleep(interval) + assert vector_count == expected_num_of_vectors, ( - f"vector count from index ({vector_count}) doesn't " - f"match expected number: {expected_num_of_vectors}" + f"vector count from index (namespace={namespace}) is {vector_count}, " + f"expected {expected_num_of_vectors}" ) @@ -286,3 +298,64 @@ def test_pinecone_stager( stager=stager, tmp_dir=tmp_path, ) + + +@requires_env(API_KEY) +@pytest.mark.asyncio +@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG) +async def test_pinecone_namespace_write_success( + pinecone_index: str, upload_file: Path, temp_dir: Path +): + """ + Test to ensure data is written to a custom namespace successfully and + that everything is properly cleaned up afterward. + """ + test_namespace = "test_namespace_success" + + # Prepare test data + file_data = FileData( + source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name), + connector_type=CONNECTOR_TYPE, + identifier="pinecone_mock_id", + ) + + connection_config = PineconeConnectionConfig( + index_name=pinecone_index, + access_config=PineconeAccessConfig(api_key=get_api_key()), + ) + stager_config = PineconeUploadStagerConfig() + stager = PineconeUploadStager(upload_stager_config=stager_config) + + new_upload_file = stager.run( + elements_filepath=upload_file, + output_dir=temp_dir, + output_filename=upload_file.name, + file_data=file_data, + ) + + upload_config = PineconeUploaderConfig(namespace=test_namespace) + uploader = PineconeUploader(connection_config=connection_config, upload_config=upload_config) + uploader.precheck() + + uploader.run(path=new_upload_file, file_data=file_data) + + with new_upload_file.open() as f: + staged_content = json.load(f) + expected_num_of_vectors = len(staged_content) + + validate_pinecone_index( + index_name=pinecone_index, + expected_num_of_vectors=expected_num_of_vectors, + namespace="test_namespace_success", # or your test_namespace variable + ) + + # --- CLEANUP --- + try: + pinecone_client = Pinecone(api_key=get_api_key()) + index = pinecone_client.Index(name=pinecone_index) + # Use deleteAll=True to remove everything in that namespace + delete_resp = index.delete(deleteAll=True, namespace=test_namespace) + logger.info(f"Cleaned up all vectors from namespace '{test_namespace}': {delete_resp}") + except Exception as e: + logger.error(f"Error cleaning up namespace '{test_namespace}': {e}") + pytest.fail(f"Test failed to clean up namespace '{test_namespace}'.") diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index dff5c63d1..f53758827 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "0.3.13-dev2" # pragma: no cover +__version__ = "0.3.13-dev3" # pragma: no cover diff --git a/unstructured_ingest/v2/processes/connectors/pinecone.py b/unstructured_ingest/v2/processes/connectors/pinecone.py index fe62f97ed..76c85a5d6 100644 --- a/unstructured_ingest/v2/processes/connectors/pinecone.py +++ b/unstructured_ingest/v2/processes/connectors/pinecone.py @@ -105,7 +105,10 @@ class PineconeUploaderConfig(UploaderConfig): ) namespace: Optional[str] = Field( default=None, - description="The namespace to write to. If not specified, the default namespace is used", + description=( + "The namespace to write to. If not specified (None), the Pinecone SDK " + "will fall back to the 'default' namespace automatically." + ), ) record_id_key: str = Field( default=RECORD_ID_LABEL, @@ -173,14 +176,17 @@ def precheck(self): raise DestinationConnectionError(f"failed to validate connection: {e}") def pod_delete_by_record_id(self, file_data: FileData) -> None: + """Deletion for Pinecone Pod-based index.""" logger.debug( f"deleting any content with metadata " f"{self.upload_config.record_id_key}={file_data.identifier} " f"from pinecone pod index" ) index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS) + + # Build the delete_kwargs, only include 'namespace' if it's not None delete_kwargs = { - "filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}} + "filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}}, } if namespace := self.upload_config.namespace: delete_kwargs["namespace"] = namespace @@ -193,25 +199,29 @@ def pod_delete_by_record_id(self, file_data: FileData) -> None: ) def serverless_delete_by_record_id(self, file_data: FileData) -> None: + """Deletion for Pinecone Serverless index.""" logger.debug( f"deleting any content with metadata " f"{self.upload_config.record_id_key}={file_data.identifier} " f"from pinecone serverless index" ) index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS) + + # Build the list_kwargs, only include 'namespace' if it's not None list_kwargs = {"prefix": f"{file_data.identifier}#"} + if self.upload_config.namespace is not None: + list_kwargs["namespace"] = self.upload_config.namespace + deleted_ids = 0 - if namespace := self.upload_config.namespace: - list_kwargs["namespace"] = namespace for ids in index.list(**list_kwargs): deleted_ids += len(ids) delete_kwargs = {"ids": ids} - if namespace := self.upload_config.namespace: - delete_resp = delete_kwargs["namespace"] = namespace - # delete_resp should be an empty dict if there were no errors - if delete_resp: - logger.error(f"failed to delete batch of ids: {delete_resp}") - index.delete(**delete_kwargs) + if self.upload_config.namespace is not None: + delete_kwargs["namespace"] = self.upload_config.namespace + delete_resp = index.delete(**delete_kwargs) + if delete_resp: + logger.error(f"failed to delete batch of IDs: {delete_resp}") + logger.info( f"deleted {deleted_ids} records with metadata " f"{self.upload_config.record_id_key}={file_data.identifier} " @@ -229,26 +239,28 @@ def upsert_batches_async(self, elements_dict: list[dict]): max_batch_size=self.upload_config.batch_size, ) ) - logger.info(f"split doc with {len(elements_dict)} elements into {len(chunks)} batches") + logger.info(f"Split doc with {len(elements_dict)} elements into {len(chunks)} batches") max_pool_threads = min(len(chunks), MAX_POOL_THREADS) - if self.upload_config.pool_threads: - pool_threads = min(self.upload_config.pool_threads, max_pool_threads) - else: - pool_threads = max_pool_threads + pool_threads = min(self.upload_config.pool_threads or max_pool_threads, max_pool_threads) index = self.connection_config.get_index(pool_threads=pool_threads) + + # Build upsert_kwargs for each chunk + upsert_kwargs_list = [] + for chunk in chunks: + kwargs = {"vectors": chunk, "async_req": True} + if self.upload_config.namespace is not None: + kwargs["namespace"] = self.upload_config.namespace + upsert_kwargs_list.append(kwargs) + with index: - upsert_kwargs = [{"vectors": chunk, "async_req": True} for chunk in chunks] - if namespace := self.upload_config.namespace: - for kwargs in upsert_kwargs: - kwargs["namespace"] = namespace - async_results = [index.upsert(**kwarg) for kwarg in upsert_kwargs] - # Wait for and retrieve responses (this raises in case of error) + # Execute async upserts + async_results = [index.upsert(**kwargs) for kwargs in upsert_kwargs_list] try: results = [async_result.get() for async_result in async_results] except PineconeApiException as api_error: - raise DestinationConnectionError(f"http error: {api_error}") from api_error - logger.debug(f"results: {results}") + raise DestinationConnectionError(f"HTTP error: {api_error}") from api_error + logger.debug(f"Results: {results}") def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None: logger.info(