-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocument_processor.py
More file actions
139 lines (124 loc) · 5.05 KB
/
document_processor.py
File metadata and controls
139 lines (124 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import fitz # PyMuPDF
from supabase import create_client, Client
import streamlit as st
from sentence_transformers import SentenceTransformer
import numpy as np
import requests
import json
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()
# Initialize Supabase client
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_API_KEY = os.getenv("SUPABASE_API_KEY")
API_URL = "https://api.deepseek.com/chat/completions"
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_API_KEY)
# Initialize Sentence Transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
# Function to process the uploaded document and create chunks
def process_document(file, file_type):
if file_type == "pdf":
content = extract_text_from_pdf(file)
else:
try:
content = file.read().decode("utf-8")
except UnicodeDecodeError:
try:
content = file.read().decode("latin1")
except UnicodeDecodeError as e:
st.error(f"Error decoding file: {e}")
return []
chunks = [content[i:i+500] for i in range(0, len(content), 500)]
return chunks
# Function to extract text from PDF
def extract_text_from_pdf(file):
doc = fitz.open(stream=file.read(), filetype="pdf")
text = ""
for page in doc:
text += page.get_text()
return text
# Function to generate embeddings for chunks
def generate_embeddings(chunks):
embeddings = model.encode(chunks)
return embeddings
# Function to save chunks and embeddings to Supabase
def save_chunks_to_supabase(chunks, embeddings, project_name):
try:
for chunk, embedding in zip(chunks, embeddings):
response = supabase.table("messages").insert([{
"project": project_name,
"role": "document",
"content": chunk,
"embedding": embedding.tolist()
}]).execute()
return True
except Exception as e:
st.error(f"Error saving document chunks: {e}")
return False
# File uploader for document upload
def upload_document():
uploaded_file = st.sidebar.file_uploader("Upload Document", type=["txt", "pdf", "docx"])
if uploaded_file and st.session_state["current_project"]:
file_type = uploaded_file.name.split(".")[-1]
chunks = process_document(uploaded_file, file_type)
if chunks:
embeddings = generate_embeddings(chunks)
if save_chunks_to_supabase(chunks, embeddings, st.session_state["current_project"]):
st.sidebar.success("Document uploaded and processed successfully.")
# Function to retrieve relevant chunks from Supabase
def retrieve_relevant_chunks(question, project_name):
try:
response = supabase.table("messages") \
.select("content, embedding") \
.eq("project", project_name) \
.execute()
messages = response.data
if not messages:
return []
# Generate embedding for the question
question_embedding = model.encode([question])[0]
# Calculate cosine similarity between question embedding and document embeddings
similarities = []
for msg in messages:
embedding = np.array(msg["embedding"])
#debug Check if embedding is not None
if embedding is not None:
similarity = np.dot(question_embedding, embedding) / (np.linalg.norm(question_embedding) * np.linalg.norm(embedding))
similarities.append((similarity, msg["content"]))
# Sort by similarity and return the most relevant chunks
similarities.sort(reverse=True, key=lambda x: x[0])
relevant_chunks = [content for _, content in similarities[:5]]
return relevant_chunks
except Exception as e:
st.error(f"An error occurred while retrieving relevant chunks: {e}")
return []
# Function to generate answer using DeepSeek API
def generate_answer(question, relevant_chunks):
context = [{"role": "user", "content": question}] + [{"role": "system", "content": chunk} for chunk in relevant_chunks]
try:
response = requests.post(
API_URL,
headers={
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
"Accept": "application/json"
},
data=json.dumps({
"messages": context,
"model": "deepseek-chat",
"max_tokens": 2048
})
)
if response.status_code == 200:
response_data = response.json()
choices = response_data.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "No response received.")
else:
return "No valid choices in response."
else:
return f"API Error: {response.status_code} - {response.text}"
except Exception as e:
return f"An error occurred while calling the API: {e}"