Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.3.12-dev5

### Fixes

* **Added pinecone namespace upset test**

## 0.3.12-dev4

### Enhancements
Expand Down
89 changes: 81 additions & 8 deletions test/integration/connectors/test_pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import time
from pathlib import Path
from typing import Generator
from typing import Generator, Optional
from uuid import uuid4

import pytest
Expand Down Expand Up @@ -109,14 +109,25 @@ def pinecone_index() -> Generator[str, None, None]:


def validate_pinecone_index(
index_name: str, expected_num_of_vectors: int, retries=30, interval=1
index_name: str,
expected_num_of_vectors: int,
namespace: Optional[str] = None,
retries=30,
interval=1,
) -> None:
# Because there's a delay for the index to catch up to the recent writes, add in a retry
pinecone = Pinecone(api_key=get_api_key())
index = pinecone.Index(name=index_name)
"""
Validates that `expected_num_of_vectors` are present in a Pinecone index,
optionally in a specific namespace.
"""
pinecone_client = Pinecone(api_key=get_api_key())
index = pinecone_client.Index(name=index_name)

vector_count = -1
for i in range(retries):
index_stats = index.describe_index_stats()
if namespace:
index_stats = index.describe_index_stats(namespace=namespace)
else:
index_stats = index.describe_index_stats() # all namespaces
vector_count = index_stats["total_vector_count"]
if vector_count == expected_num_of_vectors:
logger.info(f"expected {expected_num_of_vectors} == vector count {vector_count}")
Expand All @@ -125,9 +136,10 @@ def validate_pinecone_index(
f"retry attempt {i}: expected {expected_num_of_vectors} != vector count {vector_count}"
)
time.sleep(interval)

assert vector_count == expected_num_of_vectors, (
f"vector count from index ({vector_count}) doesn't "
f"match expected number: {expected_num_of_vectors}"
f"vector count from index (namespace={namespace}) is {vector_count}, "
f"expected {expected_num_of_vectors}"
)


Expand Down Expand Up @@ -286,3 +298,64 @@ def test_pinecone_stager(
stager=stager,
tmp_dir=tmp_path,
)


@requires_env(API_KEY)
@pytest.mark.asyncio
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
async def test_pinecone_namespace_write_success(
pinecone_index: str, upload_file: Path, temp_dir: Path
):
"""
Test to ensure data is written to a custom namespace successfully and
that everything is properly cleaned up afterward.
"""
test_namespace = "test_namespace_success"

# Prepare test data
file_data = FileData(
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
connector_type=CONNECTOR_TYPE,
identifier="pinecone_mock_id",
)

connection_config = PineconeConnectionConfig(
index_name=pinecone_index,
access_config=PineconeAccessConfig(api_key=get_api_key()),
)
stager_config = PineconeUploadStagerConfig()
stager = PineconeUploadStager(upload_stager_config=stager_config)

new_upload_file = stager.run(
elements_filepath=upload_file,
output_dir=temp_dir,
output_filename=upload_file.name,
file_data=file_data,
)

upload_config = PineconeUploaderConfig(namespace=test_namespace)
uploader = PineconeUploader(connection_config=connection_config, upload_config=upload_config)
uploader.precheck()

uploader.run(path=new_upload_file, file_data=file_data)

with new_upload_file.open() as f:
staged_content = json.load(f)
expected_num_of_vectors = len(staged_content)

validate_pinecone_index(
index_name=pinecone_index,
expected_num_of_vectors=expected_num_of_vectors,
namespace="test_namespace_success", # or your test_namespace variable
)

# --- CLEANUP ---
try:
pinecone_client = Pinecone(api_key=get_api_key())
index = pinecone_client.Index(name=pinecone_index)
# Use deleteAll=True to remove everything in that namespace
delete_resp = index.delete(deleteAll=True, namespace=test_namespace)
logger.info(f"Cleaned up all vectors from namespace '{test_namespace}': {delete_resp}")
except Exception as e:
logger.error(f"Error cleaning up namespace '{test_namespace}': {e}")
pytest.fail(f"Test failed to clean up namespace '{test_namespace}'.")
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.12-dev4" # pragma: no cover
__version__ = "0.3.12-dev5" # pragma: no cover
78 changes: 45 additions & 33 deletions unstructured_ingest/v2/processes/connectors/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ class PineconeUploaderConfig(UploaderConfig):
)
namespace: Optional[str] = Field(
default=None,
description="The namespace to write to. If not specified, the default namespace is used",
description=(
"The namespace to write to. If not specified (None), the Pinecone SDK "
"will fall back to the 'default' namespace automatically."
),
)
record_id_key: str = Field(
default=RECORD_ID_LABEL,
Expand Down Expand Up @@ -173,49 +176,56 @@ def precheck(self):
raise DestinationConnectionError(f"failed to validate connection: {e}")

def pod_delete_by_record_id(self, file_data: FileData) -> None:
"""Deletion for Pinecone Pod-based index."""
logger.debug(
f"deleting any content with metadata "
f"Deleting any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone pod index"
f"from Pinecone pod index"
)
index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS)

# Build the delete_kwargs, only include 'namespace' if it's not None
delete_kwargs = {
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}}
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}},
}
if namespace := self.upload_config.namespace:
delete_kwargs["namespace"] = namespace
if self.upload_config.namespace is not None:
delete_kwargs["namespace"] = self.upload_config.namespace

resp = index.delete(**delete_kwargs)
logger.debug(
f"deleted any content with metadata "
f"Deleted any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone index: {resp}"
f"from Pinecone index: {resp}"
)

def serverless_delete_by_record_id(self, file_data: FileData) -> None:
"""Deletion for Pinecone Serverless index."""
logger.debug(
f"deleting any content with metadata "
f"Deleting any content with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone serverless index"
f"from Pinecone serverless index"
)
index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS)

# Build the list_kwargs, only include 'namespace' if it's not None
list_kwargs = {"prefix": f"{file_data.identifier}#"}
if self.upload_config.namespace is not None:
list_kwargs["namespace"] = self.upload_config.namespace

deleted_ids = 0
if namespace := self.upload_config.namespace:
list_kwargs["namespace"] = namespace
for ids in index.list(**list_kwargs):
deleted_ids += len(ids)
delete_kwargs = {"ids": ids}
if namespace := self.upload_config.namespace:
delete_resp = delete_kwargs["namespace"] = namespace
# delete_resp should be an empty dict if there were no errors
if delete_resp:
logger.error(f"failed to delete batch of ids: {delete_resp}")
index.delete(**delete_kwargs)
if self.upload_config.namespace is not None:
delete_kwargs["namespace"] = self.upload_config.namespace
delete_resp = index.delete(**delete_kwargs)
if delete_resp:
logger.error(f"Failed to delete batch of IDs: {delete_resp}")

logger.info(
f"deleted {deleted_ids} records with metadata "
f"Deleted {deleted_ids} records with metadata "
f"{self.upload_config.record_id_key}={file_data.identifier} "
f"from pinecone index"
f"from Pinecone index"
)

@requires_dependencies(["pinecone"], extras="pinecone")
Expand All @@ -229,26 +239,28 @@ def upsert_batches_async(self, elements_dict: list[dict]):
max_batch_size=self.upload_config.batch_size,
)
)
logger.info(f"split doc with {len(elements_dict)} elements into {len(chunks)} batches")
logger.info(f"Split doc with {len(elements_dict)} elements into {len(chunks)} batches")

max_pool_threads = min(len(chunks), MAX_POOL_THREADS)
if self.upload_config.pool_threads:
pool_threads = min(self.upload_config.pool_threads, max_pool_threads)
else:
pool_threads = max_pool_threads
pool_threads = min(self.upload_config.pool_threads or max_pool_threads, max_pool_threads)
index = self.connection_config.get_index(pool_threads=pool_threads)

# Build upsert_kwargs for each chunk
upsert_kwargs_list = []
for chunk in chunks:
kwargs = {"vectors": chunk, "async_req": True}
if self.upload_config.namespace is not None:
kwargs["namespace"] = self.upload_config.namespace
upsert_kwargs_list.append(kwargs)

with index:
upsert_kwargs = [{"vectors": chunk, "async_req": True} for chunk in chunks]
if namespace := self.upload_config.namespace:
for kwargs in upsert_kwargs:
kwargs["namespace"] = namespace
async_results = [index.upsert(**kwarg) for kwarg in upsert_kwargs]
# Wait for and retrieve responses (this raises in case of error)
# Execute async upserts
async_results = [index.upsert(**kwargs) for kwargs in upsert_kwargs_list]
try:
results = [async_result.get() for async_result in async_results]
except PineconeApiException as api_error:
raise DestinationConnectionError(f"http error: {api_error}") from api_error
logger.debug(f"results: {results}")
raise DestinationConnectionError(f"HTTP error: {api_error}") from api_error
logger.debug(f"Results: {results}")

def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None:
logger.info(
Expand Down
Loading