-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
239 lines (201 loc) · 8.86 KB
/
app.py
File metadata and controls
239 lines (201 loc) · 8.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import streamlit as st
import os
import asyncio
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext, load_index_from_storage, Document
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_cloud_services import LlamaParse
# Load environment variables from .env (if present)
load_dotenv()
# Backend configuration (from llama_test.ipynb)
# These values are fixed and cannot be changed from the UI
LLM_MODEL = "gpt-5-nano-2025-08-07"
EMBEDDING_MODEL = "text-embedding-3-small"
TEMPERATURE = 0.1
DATA_DIR = "data"
PERSIST_DIR = "./storage"
# Configure Streamlit page
st.set_page_config(
page_title="LlamaIndex RAG Agent",
page_icon="🦙",
layout="centered"
)
# Get API keys from environment variable or Streamlit secrets
# These should be set before running the Streamlit app
openai_api_key = os.getenv('OPENAI_API_KEY') or st.secrets.get("OPENAI_API_KEY")
llama_cloud_api_key = os.getenv('LLAMA_CLOUD_API_KEY') or st.secrets.get("LLAMA_CLOUD_API_KEY")
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# Helper function to load documents with LlamaParse
def load_documents_with_llamaparse(data_dir: str, llama_api_key: str) -> List[Document]:
"""
Load documents from data directory using LlamaParse for complex file types
and SimpleDirectoryReader for basic text files.
Supported complex file types: PDF, DOCX, PPTX, XLSX
"""
data_path = Path(data_dir)
if not data_path.exists():
return []
# File extensions that benefit from LlamaParse
llamaparse_extensions = {'.pdf', '.docx', '.pptx', '.xlsx', '.doc', '.ppt', '.xls'}
# File extensions for simple text reading
simple_extensions = {'.txt', '.md', '.csv', '.json', '.html', '.xml'}
all_files = list(data_path.glob('*'))
llamaparse_files = []
simple_files = []
for file_path in all_files:
if file_path.is_file():
ext = file_path.suffix.lower()
if ext in llamaparse_extensions:
llamaparse_files.append(str(file_path))
elif ext in simple_extensions:
simple_files.append(str(file_path))
documents = []
# Process complex files with LlamaParse
if llamaparse_files:
st.info(f"📄 Processing {len(llamaparse_files)} complex file(s) with LlamaParse: {', '.join([Path(f).name for f in llamaparse_files])}")
try:
# Configure LlamaParse with optimal settings
parser = LlamaParse(
api_key=llama_api_key,
parse_mode="parse_page_with_agent",
model="openai-gpt-4-1-mini",
high_res_ocr=True,
adaptive_long_table=True,
outlined_table_extraction=True,
output_tables_as_HTML=True,
num_workers=4,
verbose=True,
language="en"
)
# Parse files (LlamaParse handles batch processing)
# Use asyncio to run the async parse method
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if len(llamaparse_files) == 1:
result = loop.run_until_complete(parser.aparse(llamaparse_files[0]))
results = [result]
else:
results = loop.run_until_complete(parser.aparse(llamaparse_files))
finally:
loop.close()
# Convert JobResults to LlamaIndex Documents
for result in results:
# Get markdown documents with page splitting for better chunking
llamaparse_docs = result.get_markdown_documents(split_by_page=True)
documents.extend(llamaparse_docs)
except Exception as e:
st.warning(f"LlamaParse processing failed for some files: {str(e)}")
st.info("Falling back to SimpleDirectoryReader for these files...")
# Fall back to simple reader if LlamaParse fails
simple_files.extend(llamaparse_files)
# Process simple text files with SimpleDirectoryReader
if simple_files:
st.info(f"📝 Processing {len(simple_files)} simple file(s) with SimpleDirectoryReader: {', '.join([Path(f).name for f in simple_files])}")
for file_path in simple_files:
try:
file_docs = SimpleDirectoryReader(input_files=[file_path]).load_data()
documents.extend(file_docs)
except Exception as e:
st.warning(f"Failed to load {file_path}: {str(e)}")
return documents
# Initialize query engine
@st.cache_resource
def initialize_query_engine(_openai_api_key, _llama_api_key):
"""Initialize the LlamaIndex query engine with caching"""
# Set API keys
os.environ['OPENAI_API_KEY'] = _openai_api_key
if _llama_api_key:
os.environ['LLAMA_CLOUD_API_KEY'] = _llama_api_key
# Configure models with backend configuration
llm = OpenAI(model=LLM_MODEL, temperature=TEMPERATURE)
embed_model = OpenAIEmbedding(model=EMBEDDING_MODEL)
try:
if not os.path.exists(PERSIST_DIR):
# Load documents and create index
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
return None, "Please add documents to the 'data' directory"
# Use LlamaParse if API key is available, otherwise fall back to SimpleDirectoryReader
if _llama_api_key:
st.info("Using LlamaParse for advanced document processing...")
documents = load_documents_with_llamaparse(DATA_DIR, _llama_api_key)
else:
st.info("Using SimpleDirectoryReader (LlamaParse API key not found)...")
documents = SimpleDirectoryReader(DATA_DIR).load_data()
if not documents:
return None, "No documents found in the 'data' directory"
index = VectorStoreIndex.from_documents(
documents,
llm=llm,
embed_model=embed_model
)
# Store for later
index.storage_context.persist(persist_dir=PERSIST_DIR)
status = f"✅ Index created with {len(documents)} documents"
else:
# Load existing index
storage_context = StorageContext.from_defaults(persist_dir=PERSIST_DIR)
index = load_index_from_storage(storage_context)
# Configure the loaded index with LLM and embedding models
# This ensures the query engine uses the correct models
index._llm = llm
index._embed_model = embed_model
status = "✅ Index loaded from storage"
# Create query engine
query_engine = index.as_query_engine(llm=llm, embed_model=embed_model)
return query_engine, status
except Exception as e:
return None, f"❌ Error: {str(e)}"
# Main chat interface
if not openai_api_key:
st.warning("⚠️ Please set the OPENAI_API_KEY environment variable to get started.")
st.stop()
# Display info about LlamaParse availability
if not llama_cloud_api_key:
st.info("💡 Tip: Set LLAMA_CLOUD_API_KEY to enable advanced parsing of PDFs, DOCX, and other complex documents.")
# Initialize query engine
if "query_engine" not in st.session_state:
with st.spinner("Initializing RAG agent..."):
query_engine, status = initialize_query_engine(openai_api_key, llama_cloud_api_key)
st.session_state.query_engine = query_engine
if query_engine is None:
st.error(status)
st.stop()
else:
st.success(status)
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Ask a question about your documents"):
# Display user message
with st.chat_message("user"):
st.markdown(prompt)
# Add user message to history
st.session_state.messages.append({"role": "user", "content": prompt})
# Generate response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
try:
response = st.session_state.query_engine.query(prompt)
response_text = str(response)
st.markdown(response_text)
# Add assistant response to history
st.session_state.messages.append({
"role": "assistant",
"content": response_text
})
except Exception as e:
error_msg = f"Error generating response: {str(e)}"
st.error(error_msg)
st.session_state.messages.append({
"role": "assistant",
"content": error_msg
})