Skip to content

Commit f193f3e

Browse files
committed
Add Oracle DB 23ai integration with secure credential handling. Implements OraDBVectorStore as an alternative vector storage backend with ChromaDB fallback support. Adds credential loading from config.yaml, comprehensive documentation, and testing tools.
1 parent b00aca4 commit f193f3e

File tree

8 files changed

+775
-13
lines changed

8 files changed

+775
-13
lines changed

agentic_rag/OraDBVectorStore.py

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
from typing import List, Dict, Any
2+
import json
3+
import argparse
4+
from sentence_transformers import SentenceTransformer
5+
import array
6+
import oracledb
7+
import yaml
8+
import os
9+
from pathlib import Path
10+
11+
12+
class OraDBVectorStore:
13+
def __init__(self, persist_directory: str = "embeddings"):
14+
"""Initialize Oracle DB Vector Store
15+
16+
Args:
17+
persist_directory: Not used for Oracle DB connection but kept for compatibility
18+
"""
19+
# Load Oracle DB credentials from config.yaml
20+
credentials = self._load_config()
21+
22+
username = credentials.get("ORACLE_DB_USERNAME", "ADMIN")
23+
password = credentials.get("ORACLE_DB_PASSWORD", "")
24+
dsn = credentials.get("ORACLE_DB_DSN", "")
25+
26+
if not password or not dsn:
27+
raise ValueError("Oracle DB credentials not found in config.yaml. Please set ORACLE_DB_USERNAME, ORACLE_DB_PASSWORD, and ORACLE_DB_DSN.")
28+
29+
# Connect to the database
30+
try:
31+
conn23c = oracledb.connect(user=username, password=password, dsn=dsn)
32+
print("Oracle DB Connection successful!")
33+
except Exception as e:
34+
print("Oracle DB Connection failed!", e)
35+
raise
36+
37+
# Create a table to store the data
38+
cursor = conn23c.cursor()
39+
40+
self.connection = conn23c
41+
self.cursor = cursor
42+
43+
sql = """CREATE TABLE IF NOT EXISTS PDFCollection (
44+
id VARCHAR2(4000 BYTE) PRIMARY KEY,
45+
text VARCHAR2(4000 BYTE),
46+
metadata VARCHAR2(4000 BYTE),
47+
embedding VECTOR
48+
)"""
49+
50+
cursor.execute(sql)
51+
52+
sql = """CREATE TABLE IF NOT EXISTS WebCollection (
53+
id VARCHAR2(4000 BYTE) PRIMARY KEY,
54+
text VARCHAR2(4000 BYTE),
55+
metadata VARCHAR2(4000 BYTE),
56+
embedding VECTOR
57+
)"""
58+
59+
cursor.execute(sql)
60+
61+
sql = """CREATE TABLE IF NOT EXISTS RepoCollection (
62+
id VARCHAR2(4000 BYTE) PRIMARY KEY,
63+
text VARCHAR2(4000 BYTE),
64+
metadata VARCHAR2(4000 BYTE),
65+
embedding VECTOR
66+
)"""
67+
68+
cursor.execute(sql)
69+
70+
71+
sql = """CREATE TABLE IF NOT EXISTS GeneralCollection (
72+
id VARCHAR2(4000 BYTE) PRIMARY KEY,
73+
text VARCHAR2(4000 BYTE),
74+
metadata VARCHAR2(4000 BYTE),
75+
embedding VECTOR
76+
)"""
77+
78+
cursor.execute(sql)
79+
80+
self.encoder = SentenceTransformer('all-MiniLM-L12-v2')
81+
82+
83+
def _load_config(self) -> Dict[str, str]:
84+
"""Load configuration from config.yaml"""
85+
try:
86+
config_path = Path("config.yaml")
87+
if not config_path.exists():
88+
print("Warning: config.yaml not found. Using empty configuration.")
89+
return {}
90+
91+
with open(config_path, 'r') as f:
92+
config = yaml.safe_load(f)
93+
return config if config else {}
94+
except Exception as e:
95+
print(f"Warning: Error loading config: {str(e)}")
96+
return {}
97+
98+
def _sanitize_metadata(self, metadata: Dict) -> Dict:
99+
"""Sanitize metadata to ensure all values are valid types for Oracle DB"""
100+
sanitized = {}
101+
for key, value in metadata.items():
102+
if isinstance(value, (str, int, float, bool)):
103+
sanitized[key] = value
104+
elif isinstance(value, list):
105+
# Convert list to string representation
106+
sanitized[key] = str(value)
107+
elif value is None:
108+
# Replace None with empty string
109+
sanitized[key] = ""
110+
else:
111+
# Convert any other type to string
112+
sanitized[key] = str(value)
113+
return sanitized
114+
115+
def add_pdf_chunks(self, chunks: List[Dict[str, Any]], document_id: str):
116+
"""Add chunks from a PDF document to the vector store"""
117+
if not chunks:
118+
return
119+
120+
# Prepare data for Oracle DB
121+
texts = [chunk["text"] for chunk in chunks]
122+
metadatas = [self._sanitize_metadata(chunk["metadata"]) for chunk in chunks]
123+
ids = [f"{document_id}_{i}" for i in range(len(chunks))]
124+
125+
# Encode all texts in a batch
126+
embeddings = self.encoder.encode(texts, batch_size=32, show_progress_bar=True)
127+
128+
table_name = "PDFCollection"
129+
# Truncate the table
130+
self.cursor.execute(f"truncate table {table_name}")
131+
132+
# Insert embeddings into Oracle
133+
for i, (docid, text, metadata, embedding) in enumerate(zip(ids, texts, metadatas, embeddings), start=1):
134+
json_metadata = json.dumps(metadata) # Convert to JSON string
135+
vector = array.array("f", embedding)
136+
137+
self.cursor.execute(
138+
"INSERT INTO PDFCollection (id, text, metadata, embedding) VALUES (:1, :2, :3, :4)",
139+
(docid, text, json_metadata, vector)
140+
)
141+
142+
self.connection.commit()
143+
144+
def add_web_chunks(self, chunks: List[Dict[str, Any]], source_id: str):
145+
"""Add chunks from web content to the vector store"""
146+
if not chunks:
147+
return
148+
149+
# Prepare data for Oracle DB
150+
texts = [chunk["text"] for chunk in chunks]
151+
metadatas = [self._sanitize_metadata(chunk["metadata"]) for chunk in chunks]
152+
ids = [f"{source_id}_{i}" for i in range(len(chunks))]
153+
154+
# Encode all texts in a batch
155+
embeddings = self.encoder.encode(texts, batch_size=32, show_progress_bar=True)
156+
157+
table_name = "WebCollection"
158+
# No truncation for web chunks, just append new ones
159+
160+
# Insert embeddings into Oracle
161+
for i, (docid, text, metadata, embedding) in enumerate(zip(ids, texts, metadatas, embeddings), start=1):
162+
json_metadata = json.dumps(metadata) # Convert to JSON string
163+
vector = array.array("f", embedding)
164+
165+
self.cursor.execute(
166+
"INSERT INTO WebCollection (id, text, metadata, embedding) VALUES (:1, :2, :3, :4)",
167+
(docid, text, json_metadata, vector)
168+
)
169+
170+
self.connection.commit()
171+
172+
def add_general_knowledge(self, chunks: List[Dict[str, Any]], source_id: str):
173+
"""Add general knowledge chunks to the vector store"""
174+
if not chunks:
175+
return
176+
177+
# Prepare data for Oracle DB
178+
texts = [chunk["text"] for chunk in chunks]
179+
metadatas = [self._sanitize_metadata(chunk["metadata"]) for chunk in chunks]
180+
ids = [f"{source_id}_{i}" for i in range(len(chunks))]
181+
182+
# Encode all texts in a batch
183+
embeddings = self.encoder.encode(texts, batch_size=32, show_progress_bar=True)
184+
185+
table_name = "GeneralCollection"
186+
187+
# Insert embeddings into Oracle
188+
for i, (docid, text, metadata, embedding) in enumerate(zip(ids, texts, metadatas, embeddings), start=1):
189+
json_metadata = json.dumps(metadata) # Convert to JSON string
190+
vector = array.array("f", embedding)
191+
192+
self.cursor.execute(
193+
"INSERT INTO GeneralCollection (id, text, metadata, embedding) VALUES (:1, :2, :3, :4)",
194+
(docid, text, json_metadata, vector)
195+
)
196+
197+
self.connection.commit()
198+
199+
def add_repo_chunks(self, chunks: List[Dict[str, Any]], document_id: str):
200+
"""Add chunks from a repository to the vector store"""
201+
if not chunks:
202+
return
203+
204+
# Prepare data for Oracle DB
205+
texts = [chunk["text"] for chunk in chunks]
206+
metadatas = [self._sanitize_metadata(chunk["metadata"]) for chunk in chunks]
207+
ids = [f"{document_id}_{i}" for i in range(len(chunks))]
208+
209+
# Encode all texts in a batch
210+
embeddings = self.encoder.encode(texts, batch_size=32, show_progress_bar=True)
211+
212+
table_name = "RepoCollection"
213+
214+
# Insert embeddings into Oracle
215+
for i, (docid, text, metadata, embedding) in enumerate(zip(ids, texts, metadatas, embeddings), start=1):
216+
json_metadata = json.dumps(metadata) # Convert to JSON string
217+
vector = array.array("f", embedding)
218+
219+
self.cursor.execute(
220+
"INSERT INTO RepoCollection (id, text, metadata, embedding) VALUES (:1, :2, :3, :4)",
221+
(docid, text, json_metadata, vector)
222+
)
223+
224+
self.connection.commit()
225+
226+
def query_pdf_collection(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]:
227+
"""Query the PDF documents collection"""
228+
# Generate Embeddings
229+
embeddings = self.encoder.encode(query, batch_size=32, show_progress_bar=True)
230+
new_vector = array.array("f", embeddings)
231+
232+
sql = """
233+
SELECT Id, Text, MetaData, Embedding
234+
FROM PDFCOLLECTION
235+
ORDER BY VECTOR_DISTANCE(EMBEDDING, :nv, EUCLIDEAN)
236+
FETCH FIRST 10 ROWS ONLY
237+
"""
238+
239+
self.cursor.execute(sql, {"nv": new_vector})
240+
241+
# Fetch all rows
242+
rows = self.cursor.fetchall()
243+
244+
# Format results
245+
formatted_results = []
246+
for row in rows:
247+
result = {
248+
"content": row[1],
249+
"metadata": json.loads(row[2]) if isinstance(row[2], str) else row[2]
250+
}
251+
formatted_results.append(result)
252+
253+
return formatted_results
254+
255+
def query_web_collection(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]:
256+
"""Query the web documents collection"""
257+
# Generate Embeddings
258+
embeddings = self.encoder.encode(query, batch_size=32, show_progress_bar=True)
259+
new_vector = array.array("f", embeddings)
260+
261+
sql = """
262+
SELECT Id, Text, MetaData, Embedding
263+
FROM WebCOLLECTION
264+
ORDER BY VECTOR_DISTANCE(EMBEDDING, :nv, EUCLIDEAN)
265+
FETCH FIRST 10 ROWS ONLY
266+
"""
267+
268+
self.cursor.execute(sql, {"nv": new_vector})
269+
270+
# Fetch all rows
271+
rows = self.cursor.fetchall()
272+
273+
# Format results
274+
formatted_results = []
275+
for row in rows:
276+
result = {
277+
"content": row[1],
278+
"metadata": json.loads(row[2]) if isinstance(row[2], str) else row[2]
279+
}
280+
formatted_results.append(result)
281+
282+
return formatted_results
283+
284+
def query_general_collection(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]:
285+
"""Query the general knowledge collection"""
286+
# Generate Embeddings
287+
embeddings = self.encoder.encode(query, batch_size=32, show_progress_bar=True)
288+
new_vector = array.array("f", embeddings)
289+
290+
sql = """
291+
SELECT Id, Text, MetaData, Embedding
292+
FROM GeneralCollection
293+
ORDER BY VECTOR_DISTANCE(EMBEDDING, :nv, EUCLIDEAN)
294+
FETCH FIRST 10 ROWS ONLY
295+
"""
296+
297+
self.cursor.execute(sql, {"nv": new_vector})
298+
299+
# Fetch all rows
300+
rows = self.cursor.fetchall()
301+
302+
# Format results
303+
formatted_results = []
304+
for row in rows:
305+
result = {
306+
"content": row[1],
307+
"metadata": json.loads(row[2]) if isinstance(row[2], str) else row[2]
308+
}
309+
formatted_results.append(result)
310+
311+
return formatted_results
312+
313+
def query_repo_collection(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]:
314+
"""Query the repository documents collection"""
315+
# Generate Embeddings
316+
embeddings = self.encoder.encode(query, batch_size=32, show_progress_bar=True)
317+
new_vector = array.array("f", embeddings)
318+
319+
sql = """
320+
SELECT Id, Text, MetaData, Embedding
321+
FROM RepoCOLLECTION
322+
ORDER BY VECTOR_DISTANCE(EMBEDDING, :nv, EUCLIDEAN)
323+
FETCH FIRST 10 ROWS ONLY
324+
"""
325+
326+
self.cursor.execute(sql, {"nv": new_vector})
327+
328+
# Fetch all rows
329+
rows = self.cursor.fetchall()
330+
331+
# Format results
332+
formatted_results = []
333+
for row in rows:
334+
result = {
335+
"content": row[1],
336+
"metadata": json.loads(row[2]) if isinstance(row[2], str) else row[2]
337+
}
338+
formatted_results.append(result)
339+
340+
return formatted_results
341+
342+
def main():
343+
parser = argparse.ArgumentParser(description="Manage Oracle DB vector store")
344+
parser.add_argument("--add", help="JSON file containing chunks to add")
345+
parser.add_argument("--add-web", help="JSON file containing web chunks to add")
346+
parser.add_argument("--query", help="Query to search for")
347+
348+
args = parser.parse_args()
349+
store = OraDBVectorStore()
350+
351+
if args.add:
352+
with open(args.add, 'r', encoding='utf-8') as f:
353+
chunks = json.load(f)
354+
store.add_pdf_chunks(chunks, document_id=args.add)
355+
print(f"✓ Added {len(chunks)} PDF chunks to Oracle DB vector store")
356+
357+
if args.add_web:
358+
with open(args.add_web, 'r', encoding='utf-8') as f:
359+
chunks = json.load(f)
360+
store.add_web_chunks(chunks, source_id=args.add_web)
361+
print(f"✓ Added {len(chunks)} web chunks to Oracle DB vector store")
362+
363+
if args.query:
364+
# Query both collections
365+
pdf_results = store.query_pdf_collection(args.query)
366+
web_results = store.query_web_collection(args.query)
367+
368+
print("\nPDF Results:")
369+
print("-" * 50)
370+
for result in pdf_results:
371+
print(f"Content: {result['content'][:200]}...")
372+
print(f"Source: {result['metadata'].get('source', 'Unknown')}")
373+
print(f"Pages: {result['metadata'].get('page_numbers', [])}")
374+
print("-" * 50)
375+
376+
print("\nWeb Results:")
377+
print("-" * 50)
378+
for result in web_results:
379+
print(f"Content: {result['content'][:200]}...")
380+
print(f"Source: {result['metadata'].get('source', 'Unknown')}")
381+
print(f"Title: {result['metadata'].get('title', 'Unknown')}")
382+
print("-" * 50)
383+
384+
if __name__ == "__main__":
385+
main()

0 commit comments

Comments
 (0)