Skip to content

Commit 0acf677

Browse files
committed
Migrate to service approach for file ingestion
1 parent 3bc314c commit 0acf677

File tree

6 files changed

+103
-105
lines changed

6 files changed

+103
-105
lines changed

genai/rag/ingestion_pipeline.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

genai/service/ingestion_service.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from typing import List
2+
from uuid import uuid4
3+
from time import perf_counter
4+
5+
from langchain_qdrant import QdrantVectorStore
6+
from langchain_core.documents import Document
7+
from langchain_community.document_loaders import PyPDFLoader
8+
from langchain_text_splitters import RecursiveCharacterTextSplitter
9+
10+
from logger import logger
11+
from metrics import file_ingested_counter, file_ingestion_duration
12+
13+
14+
def _load_document(file_path: str) -> List[Document]:
15+
"""Load a PDF document from the given file path"""
16+
loader = PyPDFLoader(file_path)
17+
docs = loader.load()
18+
return docs
19+
20+
21+
def _chunk_documents(docs: List[Document], filename: str) -> List[Document]:
22+
"""Chunk documents into smaller pieces for
23+
better processing and storage."""
24+
text_splitter = RecursiveCharacterTextSplitter(
25+
chunk_size=1000,
26+
chunk_overlap=100
27+
)
28+
chunked_docs = text_splitter.split_documents(docs)
29+
return [
30+
Document(
31+
page_content=chunk.page_content,
32+
metadata={
33+
**chunk.metadata,
34+
"source": filename
35+
}
36+
)
37+
for chunk in chunked_docs
38+
]
39+
40+
41+
def _store_documents(vector_store: QdrantVectorStore, docs: List[Document]):
42+
"""Store all chunks in vector database"""
43+
uuids = [str(uuid4()) for _ in range(len(docs))]
44+
vector_store.add_documents(docs, ids=uuids)
45+
46+
47+
def ingest(vector_store: QdrantVectorStore, file_path: str, filename: str):
48+
"""Ingestion method to load, chunk, and store pdf data"""
49+
start_time = perf_counter()
50+
51+
unchunked_docs = _load_document(file_path)
52+
logger.info("Documents are loaded for file %s", filename)
53+
54+
chunked_docs = _chunk_documents(unchunked_docs, filename)
55+
logger.info("Documents are chunked for file %s", filename)
56+
57+
_store_documents(vector_store, chunked_docs)
58+
logger.info(
59+
"Ingested %s chunks into Qdrant for file %s.",
60+
len(chunked_docs),
61+
filename
62+
)
63+
64+
duration = perf_counter() - start_time
65+
file_ingestion_duration.observe(duration)
66+
file_ingested_counter.inc()
67+
logger.info("File ingestion duration: %.2f seconds", duration)

genai/service/qdrant_service.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from rag.ingestion_pipeline import IngestionPipeline
1+
from langchain_qdrant import QdrantVectorStore
2+
3+
from service.ingestion_service import ingest
24
from vector_database.qdrant_vdb import QdrantVDB
35
from logger import logger
46

@@ -24,7 +26,7 @@ def file_already_uploaded(collection_name: str, filename: str) -> bool:
2426
return False
2527

2628

27-
def get_vector_store(collection_name: str):
29+
def get_vector_store(collection_name: str) -> QdrantVectorStore:
2830
"""
2931
Returns the vector store for the specified collection name.
3032
This function is used to retrieve the vector store instance for a specific
@@ -53,7 +55,5 @@ def ingest_file(collection_name: str, file_path: str, filename: str):
5355
This function is used to process and store the file content in the vector
5456
database for later retrieval.
5557
"""
56-
pipeline = IngestionPipeline(
57-
vector_store=get_vector_store(collection_name)
58-
)
59-
pipeline.ingest(file_path, filename)
58+
vector_store = get_vector_store(collection_name)
59+
ingest(vector_store, file_path, filename)

genai/tests/integration/ingestion_test.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from pathlib import Path
22
from fpdf import FPDF
3-
from rag.ingestion_pipeline import IngestionPipeline
3+
from service.ingestion_service import ingest
44
from vector_database.qdrant_vdb import QdrantVDB
55

66

@@ -28,10 +28,11 @@ def test_real_ingestion_pipeline(tmp_path):
2828
generate_sample_pdf(pdf_path)
2929
filename = pdf_path.name
3030

31-
# Ingestion
31+
# Get vector store
3232
vector_store = qdrant.create_and_get_vector_storage(collection_name)
33-
pipeline = IngestionPipeline(vector_store=vector_store)
34-
pipeline.ingest(str(pdf_path), filename)
33+
34+
# Ingestion
35+
ingest(vector_store, str(pdf_path), filename)
3536

3637
found = qdrant.collection_contains_file(
3738
qdrant.client,

genai/tests/integration/upload_test.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,22 @@
11
import io
2-
from unittest.mock import patch, MagicMock
2+
from unittest.mock import patch
33
from fastapi.testclient import TestClient
44
from main import app
55
from service.auth_service import UserInfo, get_current_user
66

77
client = TestClient(app)
88

99

10-
@patch(
11-
"service.qdrant_service.qdrant.client.collection_exists",
12-
return_value=False
13-
)
14-
@patch("service.qdrant_service.qdrant.create_and_get_vector_storage")
15-
@patch("service.qdrant_service.IngestionPipeline")
10+
@patch("routes.routes.ingest_file")
11+
@patch("routes.routes.file_already_uploaded", return_value=False)
1612
def test_upload_file_success(
17-
mock_pipeline_class,
18-
_mock_vector_store,
19-
_mock_exists
13+
mock_file_uploaded,
14+
mock_ingest_file
2015
):
2116
# Mock the authentication by overriding the dependency
2217
mock_user = UserInfo(user_id=123, username="test_user")
2318
app.dependency_overrides[get_current_user] = lambda: mock_user
2419

25-
mock_pipeline = MagicMock()
26-
mock_pipeline_class.return_value = mock_pipeline
27-
2820
file_content = b"%PDF-1.4 dummy content"
2921
file = io.BytesIO(file_content)
3022
file.name = "test.pdf"
@@ -37,8 +29,8 @@ def test_upload_file_success(
3729
assert response.status_code == 200
3830
assert response.json() == {"message": "File processed successfully."}
3931

40-
mock_pipeline_class.assert_called_once()
41-
mock_pipeline.ingest.assert_called_once()
32+
mock_ingest_file.assert_called_once()
33+
mock_file_uploaded.assert_called_once()
4234

4335
# Clean up the dependency override
4436
app.dependency_overrides.clear()

genai/tests/unit/test_ingestion_pipeline.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,40 @@
11
from unittest.mock import MagicMock, patch
22
from uuid import UUID
3-
from rag.ingestion_pipeline import IngestionPipeline
3+
from service.ingestion_service import (
4+
ingest,
5+
_load_document,
6+
_chunk_documents,
7+
_store_documents
8+
)
49
from langchain_core.documents import Document
510

611

712
def test_load_document_returns_documents():
8-
with patch("rag.ingestion_pipeline.PyPDFLoader") as mock_loader:
13+
with patch("service.ingestion_service.PyPDFLoader") as mock_loader:
914
mock_loader.return_value.load.return_value = [
1015
Document(page_content="Test")
1116
]
12-
pipeline = IngestionPipeline(vector_store=MagicMock())
13-
docs = pipeline.load_document("fake_path.pdf")
17+
docs = _load_document("fake_path.pdf")
1418
assert isinstance(docs, list)
1519
assert isinstance(docs[0], Document)
1620
assert docs[0].page_content == "Test"
1721

1822

1923
def test_chunk_documents_returns_chunks():
20-
pipeline = IngestionPipeline(vector_store=MagicMock())
2124
dummy_doc = Document(
2225
page_content="This is a long text. " * 100,
2326
metadata={}
2427
)
25-
chunks = pipeline.chunk_documents([dummy_doc], filename="sample.pdf")
28+
chunks = _chunk_documents([dummy_doc], filename="sample.pdf")
2629
assert isinstance(chunks, list)
2730
assert all(isinstance(doc, Document) for doc in chunks)
2831
assert all(doc.metadata["source"] == "sample.pdf" for doc in chunks)
2932

3033

3134
def test_store_documents_calls_add_documents_with_uuids():
3235
mock_vector_store = MagicMock()
33-
pipeline = IngestionPipeline(vector_store=mock_vector_store)
3436
docs = [Document(page_content="Chunk", metadata={}) for _ in range(3)]
35-
pipeline.store_documents(docs)
37+
_store_documents(mock_vector_store, docs)
3638
args, kwargs = mock_vector_store.add_documents.call_args
3739
passed_docs = args[0]
3840
passed_ids = kwargs["ids"]
@@ -42,19 +44,18 @@ def test_store_documents_calls_add_documents_with_uuids():
4244

4345

4446
def test_ingest_calls_all_steps():
45-
pipeline = IngestionPipeline(vector_store=MagicMock())
46-
47-
with patch.object(pipeline, "load_document") as mock_load, \
48-
patch.object(pipeline, "chunk_documents") as mock_chunk, \
49-
patch.object(pipeline, "store_documents") as mock_store, \
50-
patch("rag.ingestion_pipeline.logger") as mock_logger, \
51-
patch("rag.ingestion_pipeline.file_ingestion_duration.observe"), \
52-
patch("rag.ingestion_pipeline.file_ingested_counter.inc"):
47+
mock_vector_store = MagicMock()
48+
with patch("service.ingestion_service._load_document") as mock_load, \
49+
patch("service.ingestion_service._chunk_documents") as mock_chunk, \
50+
patch("service.ingestion_service._store_documents") as mock_store, \
51+
patch("service.ingestion_service.logger") as mock_logger, \
52+
patch("service.ingestion_service.file_ingestion_duration.observe"), \
53+
patch("service.ingestion_service.file_ingested_counter.inc"):
5354

5455
mock_load.return_value = [Document(page_content="Doc")]
5556
mock_chunk.return_value = [Document(page_content="Chunk")]
5657

57-
pipeline.ingest("test.pdf", "testfile.pdf")
58+
ingest(mock_vector_store, "test.pdf", "testfile.pdf")
5859

5960
mock_load.assert_called_once_with("test.pdf")
6061
mock_chunk.assert_called_once()

0 commit comments

Comments
 (0)