-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
225 lines (191 loc) · 7.21 KB
/
main.py
File metadata and controls
225 lines (191 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from llama_index.core import VectorStoreIndex, Document
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM
from qdrant_client import QdrantClient
import requests
import re
import logging
import os
try:
from dspy.agent import FeedbackAgent
feedback_agent = FeedbackAgent()
DSPY_AVAILABLE = True
except ModuleNotFoundError:
logging.warning("⚠️ DSPy not installed. Feedback endpoint disabled.")
feedback_agent = None
DSPY_AVAILABLE = False
app = FastAPI(title="Full Assignment Math Agent")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ----------------------------
# Console KB
# ----------------------------
console_kb = [
{"question": "Solve x^2 + 5x + 6 = 0",
"answer": "Step1: Factor (x+2)(x+3)=0; Step2: x=-2, x=-3"},
{"question": "Integrate x^2 dx",
"answer": "Step1: Increase power by 1 → x^3; Step2: Divide by new power → x^3/3 + C"}
]
kb_documents = [Document(text=f"Q: {item['question']}\nA: {item['answer']}") for item in console_kb]
# ----------------------------
# Qdrant Vector Store
# ----------------------------
client = QdrantClient(":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="math_agent")
# ----------------------------
# Embedding + LLM
# ----------------------------
embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")
os.makedirs("./offload", exist_ok=True)
llm = HuggingFaceLLM(
model_name="EleutherAI/gpt-neo-125M",
tokenizer_name="EleutherAI/gpt-neo-125M",
max_new_tokens=150,
generate_kwargs={"temperature": 0.7},
device_map="auto",
model_kwargs={
"torch_dtype": "auto",
"low_cpu_mem_usage": True,
"use_safetensors": False,
"offload_folder": "./offload"
}
)
# ----------------------------
# VectorStoreIndex
# ----------------------------
index = VectorStoreIndex.from_documents(
kb_documents,
vector_store=vector_store,
embed_model=embed_model
)
query_engine = index.as_query_engine(llm=llm)
# ----------------------------
# Helper Functions
# ----------------------------
def is_math_question(question: str) -> bool:
math_keywords = ["solve", "integrate", "derivative", "probability", "find", "equation", "calculate"]
return any(k.lower() in question.lower() for k in math_keywords)
def validate_answer(answer: str) -> str:
if answer and any(k in answer for k in ["Step", "=", "+", "-", "*", "/", "^"]):
return answer
return "Sorry, could not generate a valid math solution."
def serper_search(query: str) -> str:
API_KEY = "ca58cb81f6d9676cde10d87468b4344b1b4006c1"
url = "https://google.serper.dev/search"
headers = {"X-API-KEY": API_KEY}
data = {"q": f"{query} step by step solution math"}
try:
r = requests.post(url, headers=headers, json=data, timeout=5)
r.raise_for_status()
result = r.json()
if "organic" in result and len(result["organic"]) > 0:
snippet = result["organic"][0]["snippet"]
return snippet
except Exception as e:
logging.warning(f"Serper search failed: {e}")
return ""
def mcp_process(snippet: str) -> str:
cleaned = re.sub(r"[\n]+", "\n", snippet).strip()
math_lines = [line for line in cleaned.split("\n") if re.search(r"[0-9x+=\^*/]", line)]
return "Context extracted from web: " + " ".join(math_lines)
# Feedback model
class FeedbackModel(BaseModel):
question: str
corrected_answer: str
# ----------------------------
# API Endpoints
# ----------------------------
@app.get("/")
def root():
return {"message": "Math Agent API running. Use /solve?question=... to get answers."}
@app.get("/solve")
def solve(question: str):
if not is_math_question(question):
raise HTTPException(status_code=400, detail="Only math questions allowed.")
answer = ""
snippet = ""
# Console KB
for item in console_kb:
if item["question"].strip().lower() == question.strip().lower():
answer = item["answer"]
# Vector KB
if not answer:
try:
kb_response = query_engine.query(question)
answer = str(kb_response).strip()
if not answer or "No relevant documents" in answer.lower() or answer.lower() == "none":
answer = ""
except Exception as e:
logging.warning(f"Vector KB query failed: {e}")
answer = ""
# Web Search + MCP
if not answer:
try:
snippet = serper_search(question)
if snippet:
context = mcp_process(snippet)
prompt = f"Use the following context to solve the math problem step by step:\n{context}\nProblem: {question}"
answer_obj = llm.predict(prompt)
answer = str(answer_obj).strip()
except Exception as e:
logging.warning(f"Web Search + MCP + LLM failed: {e}")
answer = ""
# Final LLM fallback
if not answer:
try:
prompt = f"Solve this math problem step by step:\n{question}"
answer_obj = llm.predict(prompt)
answer = str(answer_obj).strip()
except Exception as e:
logging.warning(f"LLM final fallback failed: {e}")
answer = "Sorry, could not generate a solution."
if snippet:
source = "Web"
elif answer and any(item["question"].strip().lower() == question.strip().lower() for item in console_kb):
source = "KB"
else:
source = "LLM"
return {"answer": answer, "source": source}
@app.post("/feedback")
def feedback(feedback: FeedbackModel):
if DSPY_AVAILABLE:
feedback_agent.submit_feedback(
question=feedback.question,
proposed_answer="",
human_corrected_answer=feedback.corrected_answer
)
new_doc = Document(text=f"Q: {feedback.question}\nA: {feedback.corrected_answer}")
index.insert_documents([new_doc])
return {"status": "Feedback recorded and KB updated"}
@app.post("/jee_bench_eval")
def jee_bench_eval(jee_questions: list):
results = []
for item in jee_questions:
question = item.get("question")
gold_answer = item.get("answer", "")
agent_answer = solve(question)["answer"]
results.append({
"question": question,
"agent_answer": agent_answer,
"gold_answer": gold_answer,
"kb_hit": "Step" in agent_answer
})
total = len(results)
kb_hits = sum(1 for r in results if r["kb_hit"])
metrics = {
"total_questions": total,
"kb_hits": kb_hits,
"kb_hit_percentage": kb_hits / total * 100 if total > 0 else 0
}
return {"metrics": metrics, "results": results}