Skip to content

Commit bd52fc6

Browse files
authored
bugfix/pinecone large index (#274)
* add check for count being greater than 10000 * bump changelog * skip new int test for now
1 parent 1758214 commit bd52fc6

File tree

4 files changed

+95
-31
lines changed

4 files changed

+95
-31
lines changed

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
## 0.3.5-dev2
1+
## 0.3.5-dev3
2+
3+
### Enhancements
4+
5+
* **Persist record id in dedicated LanceDB column, use it to delete previous content to prevent duplicates.**
26

37
### Fixes
48

59
* **Remove client.ping() from the Elasticsearch precheck.**
6-
* **Persist record id in dedicated LanceDB column, use it to delete previous content to prevent duplicates.**
710
* **Pinecone metadata fixes** - Fix CLI's --metadata-fields default. Always preserve record ID tracking metadata.
11+
* **Add check to prevent querying for more than pinecone limit when deleting records**
812

913
## 0.3.4
1014

test/integration/connectors/test_pinecone.py

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import math
23
import os
34
import re
45
import time
@@ -19,6 +20,7 @@
1920
from unstructured_ingest.v2.logger import logger
2021
from unstructured_ingest.v2.processes.connectors.pinecone import (
2122
CONNECTOR_TYPE,
23+
MAX_QUERY_RESULTS,
2224
PineconeAccessConfig,
2325
PineconeConnectionConfig,
2426
PineconeUploader,
@@ -118,7 +120,10 @@ def validate_pinecone_index(
118120
f"retry attempt {i}: expected {expected_num_of_vectors} != vector count {vector_count}"
119121
)
120122
time.sleep(interval)
121-
assert vector_count == expected_num_of_vectors
123+
assert vector_count == expected_num_of_vectors, (
124+
f"vector count from index ({vector_count}) doesn't "
125+
f"match expected number: {expected_num_of_vectors}"
126+
)
122127

123128

124129
@requires_env(API_KEY)
@@ -147,10 +152,7 @@ async def test_pinecone_destination(pinecone_index: str, upload_file: Path, temp
147152
uploader = PineconeUploader(connection_config=connection_config, upload_config=upload_config)
148153
uploader.precheck()
149154

150-
if uploader.is_async():
151-
await uploader.run_async(path=new_upload_file, file_data=file_data)
152-
else:
153-
uploader.run(path=new_upload_file, file_data=file_data)
155+
uploader.run(path=new_upload_file, file_data=file_data)
154156
with new_upload_file.open() as f:
155157
staged_content = json.load(f)
156158
expected_num_of_vectors = len(staged_content)
@@ -160,10 +162,59 @@ async def test_pinecone_destination(pinecone_index: str, upload_file: Path, temp
160162
)
161163

162164
# Rerun uploader and make sure no duplicates exist
163-
if uploader.is_async():
164-
await uploader.run_async(path=new_upload_file, file_data=file_data)
165-
else:
166-
uploader.run(path=new_upload_file, file_data=file_data)
165+
uploader.run(path=new_upload_file, file_data=file_data)
166+
logger.info("validating second upload")
167+
validate_pinecone_index(
168+
index_name=pinecone_index, expected_num_of_vectors=expected_num_of_vectors
169+
)
170+
171+
172+
@requires_env(API_KEY)
173+
@pytest.mark.asyncio
174+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG)
175+
@pytest.mark.skip(reason="TODO: get this to work")
176+
async def test_pinecone_destination_large_index(
177+
pinecone_index: str, upload_file: Path, temp_dir: Path
178+
):
179+
new_file = temp_dir / "large_file.json"
180+
with upload_file.open() as f:
181+
upload_content = json.load(f)
182+
183+
min_entries = math.ceil((MAX_QUERY_RESULTS * 2) / len(upload_content))
184+
new_content = (upload_content * min_entries)[: (2 * MAX_QUERY_RESULTS)]
185+
print(f"Creating large index content with {len(new_content)} records")
186+
with new_file.open("w") as f:
187+
json.dump(new_content, f)
188+
189+
expected_num_of_vectors = len(new_content)
190+
file_data = FileData(
191+
source_identifiers=SourceIdentifiers(fullpath=new_file.name, filename=new_file.name),
192+
connector_type=CONNECTOR_TYPE,
193+
identifier="pinecone_mock_id",
194+
)
195+
connection_config = PineconeConnectionConfig(
196+
index_name=pinecone_index,
197+
access_config=PineconeAccessConfig(api_key=get_api_key()),
198+
)
199+
stager_config = PineconeUploadStagerConfig()
200+
stager = PineconeUploadStager(upload_stager_config=stager_config)
201+
new_upload_file = stager.run(
202+
elements_filepath=new_file,
203+
output_dir=temp_dir,
204+
output_filename=new_file.name,
205+
file_data=file_data,
206+
)
207+
208+
upload_config = PineconeUploaderConfig()
209+
uploader = PineconeUploader(connection_config=connection_config, upload_config=upload_config)
210+
uploader.precheck()
211+
212+
uploader.run(path=new_upload_file, file_data=file_data)
213+
validate_pinecone_index(
214+
index_name=pinecone_index, expected_num_of_vectors=expected_num_of_vectors
215+
)
216+
# Rerun uploader and make sure no duplicates exist
217+
uploader.run(path=new_upload_file, file_data=file_data)
167218
logger.info("validating second upload")
168219
validate_pinecone_index(
169220
index_name=pinecone_index, expected_num_of_vectors=expected_num_of_vectors

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.5-dev2" # pragma: no cover
1+
__version__ = "0.3.5-dev3" # pragma: no cover

unstructured_ingest/v2/processes/connectors/pinecone.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
MAX_PAYLOAD_SIZE = 2 * 1024 * 1024 # 2MB
3232
MAX_POOL_THREADS = 100
3333
MAX_METADATA_BYTES = 40960 # 40KB https://docs.pinecone.io/reference/quotas-and-limits#hard-limits
34+
MAX_QUERY_RESULTS = 10000
3435

3536

3637
class PineconeAccessConfig(AccessConfig):
@@ -214,6 +215,18 @@ def pod_delete_by_record_id(self, file_data: FileData) -> None:
214215
f"from pinecone index: {resp}"
215216
)
216217

218+
def delete_by_query(self, index: "PineconeIndex", query_params: dict) -> None:
219+
while True:
220+
query_results = index.query(**query_params)
221+
matches = query_results.get("matches", [])
222+
if not matches:
223+
break
224+
ids = [match["id"] for match in matches]
225+
delete_params = {"ids": ids}
226+
if namespace := self.upload_config.namespace:
227+
delete_params["namespace"] = namespace
228+
index.delete(**delete_params)
229+
217230
def serverless_delete_by_record_id(self, file_data: FileData) -> None:
218231
logger.debug(
219232
f"deleting any content with metadata "
@@ -222,29 +235,25 @@ def serverless_delete_by_record_id(self, file_data: FileData) -> None:
222235
)
223236
index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS)
224237
index_stats = index.describe_index_stats()
238+
dimension = index_stats["dimension"]
225239
total_vectors = index_stats["total_vector_count"]
226240
if total_vectors == 0:
227241
return
228-
dimension = index_stats["dimension"]
229-
query_params = {
230-
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}},
231-
"vector": [0] * dimension,
232-
"top_k": total_vectors,
233-
}
234-
if namespace := self.upload_config.namespace:
235-
query_params["namespace"] = namespace
236-
while True:
237-
query_results = index.query(**query_params)
238-
matches = query_results.get("matches", [])
239-
if not matches:
240-
break
241-
ids = [match["id"] for match in matches]
242-
delete_params = {"ids": ids}
242+
while total_vectors > 0:
243+
top_k = min(total_vectors, MAX_QUERY_RESULTS)
244+
query_params = {
245+
"filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}},
246+
"vector": [0] * dimension,
247+
"top_k": top_k,
248+
}
243249
if namespace := self.upload_config.namespace:
244-
delete_params["namespace"] = namespace
245-
index.delete(**delete_params)
246-
logger.debug(
247-
f"deleted any content with metadata "
250+
query_params["namespace"] = namespace
251+
self.delete_by_query(index=index, query_params=query_params)
252+
index_stats = index.describe_index_stats()
253+
total_vectors = index_stats["total_vector_count"]
254+
255+
logger.info(
256+
f"deleted {total_vectors} records with metadata "
248257
f"{self.upload_config.record_id_key}={file_data.identifier} "
249258
f"from pinecone index"
250259
)

0 commit comments

Comments
 (0)