Skip to content

Commit 082a35b

Browse files
committed
feat: adding qdrant vector db
# Conflicts: # core/api.py # Conflicts: # core/api.py
1 parent bbeae2c commit 082a35b

File tree

12 files changed

+303
-35
lines changed

12 files changed

+303
-35
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,5 @@ ee/ui-component/.next
4242

4343
ui-component/notebook-storage/notebooks.json
4444
ee/ui-component/package-lock.json
45+
46+
morphik.dev.toml

core/api.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2050,8 +2050,7 @@ async def set_folder_rule(
20502050
except Exception as rule_apply_error:
20512051
last_error = rule_apply_error
20522052
logger.warning(
2053-
f"Metadata extraction attempt {retry_count + 1} failed: "
2054-
f"{rule_apply_error}"
2053+
f"Metadata extraction attempt {retry_count + 1} failed: {rule_apply_error}"
20552054
)
20562055
if retry_count == max_retries - 1: # Last attempt
20572056
logger.error(f"All {max_retries} metadata extraction attempts failed")

core/config.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,13 @@ class Settings(BaseSettings):
101101
S3_BUCKET: Optional[str] = None
102102

103103
# Vector store configuration
104-
VECTOR_STORE_PROVIDER: Literal["pgvector"]
104+
VECTOR_STORE_PROVIDER: Literal["pgvector", "qdrant"]
105105
VECTOR_STORE_DATABASE_NAME: Optional[str] = None
106+
QDRANT_HOST: Optional[str] = None
107+
QDRANT_PORT: int = 6333
108+
QDRANT_HTTPS: bool = False
106109

107110
# Colpali configuration
108-
ENABLE_COLPALI: bool
109111
# Colpali embedding mode: off, local, or api
110112
COLPALI_MODE: Literal["off", "local", "api"] = "local"
111113

@@ -139,7 +141,8 @@ def get_settings() -> Settings:
139141
load_dotenv(override=True)
140142

141143
# Load config.toml
142-
with open("morphik.toml", "rb") as f:
144+
cfg_path = os.environ.get("MORPHIK_CONFIG_PATH", "morphik.toml")
145+
with open(cfg_path, "rb") as f:
143146
config = tomli.load(f)
144147

145148
em = "'{missing_value}' needed if '{field}' is set to '{value}'"
@@ -281,14 +284,12 @@ def get_settings() -> Settings:
281284
raise ValueError(f"Unknown storage provider selected: '{prov}'")
282285

283286
# load vector store config
284-
vector_store_config = {"VECTOR_STORE_PROVIDER": config["vector_store"]["provider"]}
285-
if vector_store_config["VECTOR_STORE_PROVIDER"] != "pgvector":
286-
prov = vector_store_config["VECTOR_STORE_PROVIDER"]
287-
raise ValueError(f"Unknown vector store provider selected: '{prov}'")
288-
289-
if "POSTGRES_URI" not in os.environ:
290-
msg = em.format(missing_value="POSTGRES_URI", field="vector_store.provider", value="pgvector")
291-
raise ValueError(msg)
287+
vector_store_config = {
288+
"VECTOR_STORE_PROVIDER": config["vector_store"]["provider"],
289+
"QDRANT_HOST": config["vector_store"]["qdrant_host"],
290+
"QDRANT_PORT": config["vector_store"]["qdrant_port"],
291+
"QDRANT_HTTPS": config["vector_store"]["qdrant_https"],
292+
}
292293

293294
# load rules config
294295
rules_config = {
@@ -303,7 +304,6 @@ def get_settings() -> Settings:
303304

304305
# load morphik config
305306
morphik_config = {
306-
"ENABLE_COLPALI": config["morphik"]["enable_colpali"],
307307
"COLPALI_MODE": config["morphik"].get("colpali_mode", "local"),
308308
"MODE": config["morphik"].get("mode", "cloud"), # Default to "cloud" mode
309309
# API domain for core server

core/services/document_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async def retrieve_chunks(
244244
chunks = await self.reranker.rerank(query, chunks)
245245
chunks.sort(key=lambda x: x.score, reverse=True)
246246
chunks = chunks[:k]
247-
logger.debug(f"Reranked {k*10} chunks and selected the top {k}")
247+
logger.debug(f"Reranked {k * 10} chunks and selected the top {k}")
248248

249249
# Combine multiple chunk sources if needed
250250
chunks = await self._combine_multi_and_regular_chunks(
@@ -1210,7 +1210,7 @@ async def store_document_with_retry():
12101210
current_retry_delay *= 2
12111211
else:
12121212
logger.error(
1213-
f"All database connection attempts failed " f"after {max_retries} retries: {error_msg}"
1213+
f"All database connection attempts failed after {max_retries} retries: {error_msg}"
12141214
)
12151215
raise Exception("Failed to store document metadata after multiple retries")
12161216
else:

core/vector_store/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from core.config import Settings
2+
from .base_vector_store import BaseVectorStore
3+
from .pgvector_store import PGVectorStore
4+
from .qdrant_store import QdrantVectorStore
5+
6+
7+
def vector_store_factory(settings: Settings) -> BaseVectorStore:
8+
prov = settings.VECTOR_STORE_PROVIDER
9+
if prov == "pgvector":
10+
if not settings.POSTGRES_URI:
11+
raise ValueError("PostgreSQL URI is required for pgvector store")
12+
return PGVectorStore(uri=settings.POSTGRES_URI)
13+
elif prov == "qdrant":
14+
if not settings.QDRANT_HOST:
15+
raise ValueError("Qdrant host is required for qdrant store")
16+
return QdrantVectorStore(host=settings.QDRANT_HOST, port=settings.QDRANT_PORT, https=settings.QDRANT_HTTPS)
17+
else:
18+
raise ValueError(f"Unknown vector store provider selected: '{prov}'")

core/vector_store/pgvector_store.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ async def initialize(self):
201201

202202
# Continue with the rest of the initialization
203203
async with self.engine.begin() as conn:
204-
205204
# Check if vector_embeddings table exists
206205
check_table_sql = """
207206
SELECT EXISTS (

core/vector_store/qdrant_store.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import json
2+
import logging
3+
from typing import List, Literal, Optional, Tuple, cast
4+
import uuid
5+
6+
from qdrant_client import AsyncQdrantClient
7+
from qdrant_client.models import models
8+
9+
from core.models.chunk import DocumentChunk
10+
11+
from .base_vector_store import BaseVectorStore
12+
13+
logger = logging.getLogger(__name__)
14+
QDRANT_COLLECTION_NAME = "vector_embeddings"
15+
16+
17+
def _to_point_id(doc_id: str, chunk_number: int):
18+
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"{chunk_number}.{doc_id}.internal"))
19+
20+
21+
def _get_qdrant_distance(metric: Literal["cosine", "dotProduct"]) -> models.Distance:
22+
match metric:
23+
case "cosine":
24+
return models.Distance.COSINE
25+
case "dotProduct":
26+
return models.Distance.DOT
27+
28+
29+
class QdrantVectorStore(BaseVectorStore):
30+
def __init__(self, host: str, port: int, https: bool) -> None:
31+
from core.config import get_settings
32+
33+
settings = get_settings()
34+
35+
self.dimensions = settings.VECTOR_DIMENSIONS
36+
self.collection_name = QDRANT_COLLECTION_NAME
37+
self.distance = _get_qdrant_distance(settings.EMBEDDING_SIMILARITY_METRIC)
38+
self.client = AsyncQdrantClient(
39+
host=host,
40+
port=port,
41+
https=https,
42+
)
43+
44+
async def _create_collection(self):
45+
return await self.client.create_collection(
46+
collection_name=self.collection_name,
47+
vectors_config=models.VectorParams(
48+
size=self.dimensions,
49+
distance=self.distance,
50+
on_disk=True,
51+
),
52+
quantization_config=models.ScalarQuantization(
53+
scalar=models.ScalarQuantizationConfig(
54+
type=models.ScalarType.INT8,
55+
always_ram=True,
56+
),
57+
),
58+
)
59+
60+
async def _check_collection_vector_size(self):
61+
collection = await self.client.get_collection(self.collection_name)
62+
params = collection.config.params
63+
assert params.vectors is not None
64+
vectors = cast(models.VectorParams, params.vectors)
65+
if vectors.size != self.dimensions:
66+
msg = f"Vector collection changed from {vectors.size} to {self.dimensions}. This requires recreating tables and will delete all existing vector data."
67+
logger.error(msg)
68+
raise ValueError(msg)
69+
return True
70+
71+
async def initialize(self):
72+
logger.info("Initialize qdrant vector collection")
73+
try:
74+
if not await self.client.collection_exists(self.collection_name):
75+
logger.info("Detected no collection exists. Creating qdrant collection")
76+
await self._create_collection()
77+
else:
78+
await self._check_collection_vector_size()
79+
80+
await self.client.create_payload_index(
81+
self.collection_name,
82+
"document_id",
83+
models.PayloadSchemaType.UUID,
84+
)
85+
return True
86+
except Exception as e:
87+
logger.error(f"Error initializing Qdrant store: {str(e)}")
88+
return False
89+
90+
async def store_embeddings(self, chunks: List[DocumentChunk]) -> Tuple[bool, List[str]]:
91+
try:
92+
batch = [
93+
models.PointStruct(
94+
id=_to_point_id(chunk.document_id, chunk.chunk_number),
95+
vector=cast(List[float], chunk.embedding),
96+
payload={
97+
"document_id": chunk.document_id,
98+
"chunk_number": chunk.chunk_number,
99+
"content": chunk.content,
100+
"metadata": json.dumps(chunk.metadata) if chunk.metadata is not None else "{}",
101+
},
102+
)
103+
for chunk in chunks
104+
]
105+
await self.client.upsert(collection_name=self.collection_name, points=batch)
106+
return True, [cast(str, p.id) for p in batch]
107+
except Exception as e:
108+
logger.error(f"Error storing embeddings: {str(e)}")
109+
return False, []
110+
111+
async def query_similar(
112+
self,
113+
query_embedding: List[float],
114+
k: int,
115+
doc_ids: Optional[List[str]] = None,
116+
) -> List[DocumentChunk]:
117+
try:
118+
query = None
119+
if doc_ids is not None:
120+
query = models.Filter(
121+
must=models.FieldCondition(
122+
key="document_id",
123+
match=models.MatchAny(any=doc_ids),
124+
),
125+
)
126+
127+
resp = await self.client.query_points(
128+
self.collection_name,
129+
query=query_embedding,
130+
limit=k,
131+
query_filter=query,
132+
with_payload=True,
133+
)
134+
return [
135+
DocumentChunk(
136+
document_id=p.payload["document_id"],
137+
chunk_number=p.payload["chunk_number"],
138+
content=p.payload["content"],
139+
embedding=[],
140+
metadata=json.loads(p.payload["metadata"]),
141+
score=p.score,
142+
)
143+
for p in resp.points
144+
if p.payload is not None
145+
]
146+
except Exception as e:
147+
logger.error(f"Error querying similar chunks: {str(e)}")
148+
return []
149+
150+
async def get_chunks_by_id(
151+
self,
152+
chunk_identifiers: List[Tuple[str, int]],
153+
) -> List[DocumentChunk]:
154+
try:
155+
if not chunk_identifiers:
156+
return []
157+
158+
ids = [_to_point_id(doc_id, chunk_number) for (doc_id, chunk_number) in chunk_identifiers]
159+
resp = await self.client.retrieve(
160+
self.collection_name,
161+
ids=ids,
162+
)
163+
return [
164+
DocumentChunk(
165+
document_id=p.payload["document_id"],
166+
chunk_number=p.payload["chunk_number"],
167+
content=p.payload["content"],
168+
embedding=[],
169+
metadata=json.loads(p.payload["metadata"]),
170+
score=0,
171+
)
172+
for p in resp
173+
if p.payload is not None
174+
]
175+
except Exception as e:
176+
logger.error(f"Error retrieving chunks by ID: {str(e)}")
177+
return []
178+
179+
async def delete_chunks_by_document_id(self, document_id: str) -> bool:
180+
try:
181+
await self.client.delete(
182+
self.collection_name,
183+
points_selector=models.Filter(
184+
must=models.FieldCondition(
185+
key="document_id",
186+
match=models.MatchValue(value=document_id),
187+
),
188+
),
189+
)
190+
return True
191+
except Exception as e:
192+
logger.error(f"Error deleting chunks for document {document_id}: {str(e)}")
193+
return False

core/workers/ingestion_worker.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from core.storage.local_storage import LocalStorage
2626
from core.storage.s3_storage import S3Storage
2727
from core.vector_store.multi_vector_store import MultiVectorStore
28-
from core.vector_store.pgvector_store import PGVectorStore
28+
from core.vector_store import vector_store_factory
2929

3030
# Enterprise routing helpers
3131
from ee.db_router import get_database_for_app, get_vector_store_for_app
@@ -71,7 +71,7 @@ async def get_document_with_retry(document_service, document_id, auth, max_retri
7171
try:
7272
doc = await document_service.db.get_document(document_id, auth)
7373
if doc:
74-
logger.debug(f"Successfully retrieved document {document_id} on attempt {attempt+1}")
74+
logger.debug(f"Successfully retrieved document {document_id} on attempt {attempt + 1}")
7575
return doc
7676

7777
# Document not found but no exception raised
@@ -221,7 +221,7 @@ async def process_ingestion_job(
221221
file_content = file_content.read()
222222
download_time = time.time() - download_start
223223
phase_times["download_file"] = download_time
224-
logger.info(f"File download took {download_time:.2f}s for {len(file_content)/1024/1024:.2f}MB")
224+
logger.info(f"File download took {download_time:.2f}s for {len(file_content) / 1024 / 1024:.2f}MB")
225225

226226
# 4. Parse file to text
227227
parse_start = time.time()
@@ -417,9 +417,10 @@ async def process_ingestion_job(
417417
# Only process if it's an image chunk - pass the image content to the rule
418418
if chunk_obj.metadata.get("is_image", False):
419419
# Get metadata *and* the potentially modified chunk
420-
chunk_rule_metadata, processed_chunk = (
421-
await document_service.rules_processor.process_chunk_rules(chunk_obj, image_rules)
422-
)
420+
(
421+
chunk_rule_metadata,
422+
processed_chunk,
423+
) = await document_service.rules_processor.process_chunk_rules(chunk_obj, image_rules)
423424
processed_chunks_multivector.append(processed_chunk)
424425
# Aggregate the metadata extracted from this chunk
425426
aggregated_chunk_metadata.update(chunk_rule_metadata)
@@ -602,7 +603,7 @@ async def startup(ctx):
602603

603604
# Initialize vector store
604605
logger.info("Initializing primary vector store...")
605-
vector_store = PGVectorStore(uri=settings.POSTGRES_URI)
606+
vector_store = vector_store_factory(settings)
606607
success = await vector_store.initialize()
607608
if success:
608609
logger.info("Primary vector store initialization successful")

0 commit comments

Comments
 (0)