diff --git a/core/api.py b/core/api.py index 2d1986d1..dcf4ba7f 100644 --- a/core/api.py +++ b/core/api.py @@ -35,6 +35,7 @@ from core.models.prompts import validate_prompt_overrides_with_http_exception from core.models.request import ( AgentQueryRequest, + BatchDeleteRequest, BatchIngestResponse, CompletionQueryRequest, CreateGraphRequest, @@ -1058,6 +1059,42 @@ async def delete_document(document_id: str, auth: AuthContext = Depends(verify_t raise HTTPException(status_code=403, detail=str(e)) +MAX_BATCH_DELETE = 100 + + +@app.post("/documents/batch_delete") +@telemetry.track(operation_type="batch_delete_documents", metadata_resolver=telemetry.document_delete_metadata) +async def batch_delete_documents(request: BatchDeleteRequest, auth: AuthContext = Depends(verify_token)): + """ + Batch delete documents by their IDs. + + Args: + request: List of document IDs in JSON. + auth: AuthContext + + Returns: + Dict: Status and count of deleted documents + """ + document_ids = request.document_ids + if not document_ids: + raise HTTPException(status_code=400, detail="No document IDs provided for deletion") + if len(document_ids) > MAX_BATCH_DELETE: + raise HTTPException(status_code=400, detail=f"Batch size exceeds maximum limit of {MAX_BATCH_DELETE}") + + try: + success, failed = await document_service.delete_documents(request.document_ids, auth) + return { + "status": "partial_success" if failed else "success", + "deleted_count": len(success), + "failed_count": len(failed), + "deleted_ids": success, + "failed_ids": failed, + } + except Exception as e: + logger.error(f"Batch deletion failed: {e}") + raise HTTPException(status_code=500, detail="Batch deletion failed") + + @app.get("/documents/filename/{filename}", response_model=Document) async def get_document_by_filename( filename: str, @@ -2050,8 +2087,7 @@ async def set_folder_rule( except Exception as rule_apply_error: last_error = rule_apply_error logger.warning( - f"Metadata extraction attempt {retry_count + 1} failed: " - f"{rule_apply_error}" + f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}" ) if retry_count == max_retries - 1: # Last attempt logger.error(f"All {max_retries} metadata extraction attempts failed") diff --git a/core/models/request.py b/core/models/request.py index f4220fdf..b0bc6988 100644 --- a/core/models/request.py +++ b/core/models/request.py @@ -143,3 +143,12 @@ class AgentQueryRequest(BaseModel): """Request model for agent queries""" query: str = Field(..., description="Natural language query for the Morphik agent") + + +class BatchDeleteRequest(BaseModel): + """Request model for delete batch documents""" + + document_ids: List[str] = Field( + ..., + description="List of document IDs to be deleted. Must be a list of strings.", + ) diff --git a/core/services/document_service.py b/core/services/document_service.py index 1a78e5a3..a6763c04 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -244,7 +244,7 @@ async def retrieve_chunks( chunks = await self.reranker.rerank(query, chunks) chunks.sort(key=lambda x: x.score, reverse=True) chunks = chunks[:k] - logger.debug(f"Reranked {k*10} chunks and selected the top {k}") + logger.debug(f"Reranked {k * 10} chunks and selected the top {k}") # Combine multiple chunk sources if needed chunks = await self._combine_multi_and_regular_chunks( @@ -852,7 +852,7 @@ async def ingest_file_content( if folder_name: try: await self._ensure_folder_exists(folder_name, doc.external_id, auth) - logger.debug(f"Ensured folder '{folder_name}' exists " f"and contains document {doc.external_id}") + logger.debug(f"Ensured folder '{folder_name}' exists and contains document {doc.external_id}") except Exception as e: logger.error( f"Error during _ensure_folder_exists for doc {doc.external_id}" @@ -1210,7 +1210,7 @@ async def store_document_with_retry(): current_retry_delay *= 2 else: logger.error( - f"All database connection attempts failed " f"after {max_retries} retries: {error_msg}" + f"All database connection attempts failed after {max_retries} retries: {error_msg}" ) raise Exception("Failed to store document metadata after multiple retries") else: @@ -2036,6 +2036,35 @@ def _update_metadata_and_version( history.append(entry) + async def delete_documents( + self, + document_ids: List[str], + auth: AuthContext, + ) -> tuple[list[str | None], list[str | None]]: + """ + Batch delete documents and their associated data. + + Args: + document_ids: List of document IDs to delete. + auth: Authentication context. + + Returns: + tuple: A tuple containing two lists: + - List of successfully deleted document IDs. + - List of failed document IDs. + """ + success = [] + failed = [] + for doc_id in document_ids: + try: + success = await self.delete_document(doc_id, auth) + if success: + success.append(doc_id) + except Exception as e: + logger.warning(f"Failed to delete document {doc_id}: {e}") + failed.append(doc_id) + return success, failed + # ------------------------------------------------------------------ # Helper – choose bucket per app (isolation) # ------------------------------------------------------------------ diff --git a/core/services/telemetry.py b/core/services/telemetry.py index cf759c79..e872c77f 100644 --- a/core/services/telemetry.py +++ b/core/services/telemetry.py @@ -295,7 +295,7 @@ def export(self, spans): # Use exponential backoff delay = self.retry_delay * (2 ** (retries - 1)) self.logger.warning( - f"Honeycomb trace export attempt {retries} failed: {str(e)}. " f"Retrying in {delay}s..." + f"Honeycomb trace export attempt {retries} failed: {str(e)}. Retrying in {delay}s..." ) time.sleep(delay) else: @@ -809,6 +809,18 @@ def _setup_metadata_extractors(self): ] ) + self.batch_document_delete_metadata = MetadataExtractor( + [ + MetadataField( + "document_count", + "request", + transform=lambda req: len(req.document_ids) if req and hasattr(req, "document_ids") else 0, + ), + MetadataField("folder_name", "request"), + MetadataField("end_user_id", "request"), + ] + ) + def track(self, operation_type: Optional[str] = None, metadata_resolver: Optional[Callable] = None): """ Decorator for tracking API operations with telemetry. diff --git a/sdks/python/morphik/async_.py b/sdks/python/morphik/async_.py index d33d3138..53adcfd2 100644 --- a/sdks/python/morphik/async_.py +++ b/sdks/python/morphik/async_.py @@ -2540,3 +2540,8 @@ async def wait_for_graph_completion( raise RuntimeError(graph.error or "Graph processing failed") await asyncio.sleep(check_interval_seconds) raise TimeoutError("Timed out waiting for graph completion") + + async def batch_delete_documents(self, document_ids: List[str]) -> int: + """Delete multiple documents by their IDs (async).""" + response = await self._request("POST", "documents/batch_delete", data={"document_ids": document_ids}) + return response["deleted"] diff --git a/sdks/python/morphik/sync.py b/sdks/python/morphik/sync.py index 577a97d3..fc34d33e 100644 --- a/sdks/python/morphik/sync.py +++ b/sdks/python/morphik/sync.py @@ -2735,3 +2735,8 @@ def wait_for_graph_completion( raise RuntimeError(graph.error or "Graph processing failed") time.sleep(check_interval_seconds) raise TimeoutError("Timed out waiting for graph completion") + + def batch_delete_documents(self, document_ids: List[str]) -> int: + """Delete multiple documents by their IDs.""" + response = self._request("POST", "documents/batch_delete", data={"document_ids": document_ids}) + return response["deleted"] diff --git a/sdks/python/morphik/tests/test_async.py b/sdks/python/morphik/tests/test_async.py index 1880be49..91acc6c8 100644 --- a/sdks/python/morphik/tests/test_async.py +++ b/sdks/python/morphik/tests/test_async.py @@ -382,3 +382,42 @@ async def test_query_with_dict_schema(self, db): finally: await db.delete_document(doc.external_id) + + @pytest.mark.asyncio + async def test_batch_delete_documents(self, db): + """Test batch deleting multiple documents""" + # Given + doc1 = await db.ingest_text( + content="This is batch delete document 1", + filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + doc2 = await db.ingest_text( + content="This is batch delete document 2", + filename=f"batch_del_2_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + + # Then + assert doc1.external_id and doc2.external_id + + # When + success, failed = await db.batch_delete_documents([doc1.external_id, doc2.external_id]) + + # Then + assert len(success) == 2 + assert len(failed) == 0 + + @pytest.mark.asyncio + async def test_batch_delete_with_invalid_id(self, db): + doc = await db.ingest_text( + content="This is batch delete document 1", + filename=f"batch_del_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_case": "batch_delete"}, + ) + + # Include a fake ID + success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"]) + + assert doc.external_id in success + assert "nonexistent_id_123" in failed diff --git a/sdks/python/morphik/tests/test_sync.py b/sdks/python/morphik/tests/test_sync.py index 6a73b22a..94e9ea8b 100644 --- a/sdks/python/morphik/tests/test_sync.py +++ b/sdks/python/morphik/tests/test_sync.py @@ -369,3 +369,41 @@ def test_query_with_dict_schema(self, db): finally: db.delete_document(doc.external_id) + + def test_batch_delete_documents(self, db): + """Test batch deleting multiple documents (sync)""" + # Given + doc1 = db.ingest_text( + content="First document to test batch delete", + filename=f"batch_delete_1_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "sync_batch_delete_test"}, + ) + doc2 = db.ingest_text( + content="Second document to test batch delete", + filename=f"batch_delete_2_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "sync_batch_delete_test"}, + ) + + # Then + assert doc1.external_id is not None + assert doc2.external_id is not None + + # When + success, failed = db.batch_delete_documents([doc1.external_id, doc2.external_id]) + + # Then + assert len(success) == 2 + assert len(failed) == 0 + + def test_batch_delete_with_invalid_id(self, db): + doc = db.ingest_text( + content="Valid document", + filename=f"batch_delete_valid_{uuid.uuid4().hex[:6]}.txt", + metadata={"test_id": "partial_batch_delete_test"}, + ) + + # Include a fake ID + success, failed = db.batch_delete_documents([doc.external_id, "nonexistent_id_123"]) + + assert doc.external_id in success + assert "nonexistent_id_123" in failed diff --git a/shell.py b/shell.py index ed101666..c29e036e 100644 --- a/shell.py +++ b/shell.py @@ -811,6 +811,18 @@ def list_graphs(self) -> list: graphs = self._client.list_graphs() return [graph.model_dump() for graph in graphs] if graphs else [] + def batch_delete_documents(self, document_ids: List[str]) -> dict: + """ + Delete multiple documents in a single batch. + + Args: + document_ids: List of document external_ids to delete + + Returns: + dict: Deletion result from the server + """ + return self._client.batch_delete_documents(document_ids) + def close(self): """Close the client connection""" self._client.close() @@ -899,6 +911,7 @@ def query(self, query: str, max_tokens: int = None, temperature: float = None) - print(" db.create_graph('knowledge_graph', filters={'category': 'research'})") print(" db.query('How does X relate to Y?', graph_name='knowledge_graph', include_paths=True)") print("Type help(db) for documentation.") + print(" db.batch_delete_documents(['doc_id1', 'doc_id2'])") # Start the shell shell.interact(banner="")