|
6 | 6 | from pydantic import Field, Secret |
7 | 7 |
|
8 | 8 | from unstructured_ingest.error import DestinationConnectionError |
9 | | -from unstructured_ingest.utils.data_prep import flatten_dict, generator_batching_wbytes |
| 9 | +from unstructured_ingest.utils.data_prep import ( |
| 10 | + flatten_dict, |
| 11 | + generator_batching_wbytes, |
| 12 | +) |
10 | 13 | from unstructured_ingest.utils.dep_check import requires_dependencies |
11 | 14 | from unstructured_ingest.v2.constants import RECORD_ID_LABEL |
12 | 15 | from unstructured_ingest.v2.interfaces import ( |
@@ -148,8 +151,10 @@ def conform_dict(self, element_dict: dict, file_data: FileData) -> dict: |
148 | 151 |
|
149 | 152 | metadata[RECORD_ID_LABEL] = file_data.identifier |
150 | 153 |
|
| 154 | + # To support more optimal deletes, a prefix is suggested for each record: |
| 155 | + # https://docs.pinecone.io/guides/data/manage-rag-documents#delete-all-records-for-a-parent-document |
151 | 156 | return { |
152 | | - "id": get_enhanced_element_id(element_dict=element_dict, file_data=file_data), |
| 157 | + "id": f"{file_data.identifier}#{get_enhanced_element_id(element_dict=element_dict, file_data=file_data)}", # noqa:E501 |
153 | 158 | "values": embeddings, |
154 | 159 | "metadata": metadata, |
155 | 160 | } |
@@ -215,45 +220,28 @@ def pod_delete_by_record_id(self, file_data: FileData) -> None: |
215 | 220 | f"from pinecone index: {resp}" |
216 | 221 | ) |
217 | 222 |
|
218 | | - def delete_by_query(self, index: "PineconeIndex", query_params: dict) -> None: |
219 | | - while True: |
220 | | - query_results = index.query(**query_params) |
221 | | - matches = query_results.get("matches", []) |
222 | | - if not matches: |
223 | | - break |
224 | | - ids = [match["id"] for match in matches] |
225 | | - delete_params = {"ids": ids} |
226 | | - if namespace := self.upload_config.namespace: |
227 | | - delete_params["namespace"] = namespace |
228 | | - index.delete(**delete_params) |
229 | | - |
230 | 223 | def serverless_delete_by_record_id(self, file_data: FileData) -> None: |
231 | 224 | logger.debug( |
232 | 225 | f"deleting any content with metadata " |
233 | 226 | f"{self.upload_config.record_id_key}={file_data.identifier} " |
234 | 227 | f"from pinecone serverless index" |
235 | 228 | ) |
236 | 229 | index = self.connection_config.get_index(pool_threads=MAX_POOL_THREADS) |
237 | | - index_stats = index.describe_index_stats() |
238 | | - dimension = index_stats["dimension"] |
239 | | - total_vectors = index_stats["total_vector_count"] |
240 | | - if total_vectors == 0: |
241 | | - return |
242 | | - while total_vectors > 0: |
243 | | - top_k = min(total_vectors, MAX_QUERY_RESULTS) |
244 | | - query_params = { |
245 | | - "filter": {self.upload_config.record_id_key: {"$eq": file_data.identifier}}, |
246 | | - "vector": [0] * dimension, |
247 | | - "top_k": top_k, |
248 | | - } |
| 230 | + list_kwargs = {"prefix": f"{file_data.identifier}#"} |
| 231 | + deleted_ids = 0 |
| 232 | + if namespace := self.upload_config.namespace: |
| 233 | + list_kwargs["namespace"] = namespace |
| 234 | + for ids in index.list(**list_kwargs): |
| 235 | + deleted_ids += len(ids) |
| 236 | + delete_kwargs = {"ids": ids} |
249 | 237 | if namespace := self.upload_config.namespace: |
250 | | - query_params["namespace"] = namespace |
251 | | - self.delete_by_query(index=index, query_params=query_params) |
252 | | - index_stats = index.describe_index_stats() |
253 | | - total_vectors = index_stats["total_vector_count"] |
254 | | - |
| 238 | + delete_resp = delete_kwargs["namespace"] = namespace |
| 239 | + # delete_resp should be an empty dict if there were no errors |
| 240 | + if delete_resp: |
| 241 | + logger.error(f"failed to delete batch of ids: {delete_resp}") |
| 242 | + index.delete(**delete_kwargs) |
255 | 243 | logger.info( |
256 | | - f"deleted {total_vectors} records with metadata " |
| 244 | + f"deleted {deleted_ids} records with metadata " |
257 | 245 | f"{self.upload_config.record_id_key}={file_data.identifier} " |
258 | 246 | f"from pinecone index" |
259 | 247 | ) |
|
0 commit comments