-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadaptive_extraction_api.py
More file actions
199 lines (172 loc) · 6.67 KB
/
adaptive_extraction_api.py
File metadata and controls
199 lines (172 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#!/usr/bin/env python3
"""
Adaptive Extraction API Service
Exposes the Python adaptive extractor via HTTP API
"""
import asyncio
import os
from typing import List, Dict, Any, Optional
import asyncpg
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
from loguru import logger
from src.core.adaptive_extractor import AdaptiveExtractor
# Request/Response models
class ExtractionRequest(BaseModel):
query: str
search_results: List[Dict[str, Any]]
user_id: Optional[str] = "web-user"
agent_id: Optional[str] = "koi-interface"
class ExtractionResponse(BaseModel):
success: bool
extraction_triggered: bool
confidence: float
receipt_rid: Optional[str] = None
facts_extracted: int = 0
entities_extracted: int = 0
relationships_extracted: int = 0
confidence_improvement: float = 0.0
cost_usd: float = 0.0
message: str = ""
# FastAPI app
app = FastAPI(
title="KOI Adaptive Extraction API",
description="Adaptive knowledge extraction service for KOI pipeline",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global extractor instance
extractor = None
db_pool = None
async def init_services():
"""Initialize database and extractor"""
global extractor, db_pool
try:
# Connect to database
POSTGRES_URL = os.getenv("POSTGRES_URL", "postgresql://postgres:postgres@localhost:5433/eliza")
db_pool = await asyncpg.create_pool(POSTGRES_URL)
logger.info("✅ Database connection established")
# Initialize extractor
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
logger.warning("⚠️ OPENAI_API_KEY not set - extraction will fail")
extractor = AdaptiveExtractor(
db_pool=db_pool,
llm_api_key=openai_key
)
logger.info("✅ Adaptive extractor initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize services: {e}")
raise
@app.on_event("startup")
async def startup_event():
await init_services()
@app.on_event("shutdown")
async def shutdown_event():
if db_pool:
await db_pool.close()
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "adaptive-extraction-api",
"extractor_ready": extractor is not None,
"database_ready": db_pool is not None
}
@app.post("/extract", response_model=ExtractionResponse)
async def extract_knowledge(request: ExtractionRequest):
"""
Trigger adaptive knowledge extraction for a query
"""
if not extractor:
raise HTTPException(status_code=503, detail="Extractor not initialized")
try:
logger.info(f"🔧 Processing extraction request for query: '{request.query[:50]}...'")
# Process query through adaptive extractor
enhanced_results, extraction_result = await extractor.process_query(
query=request.query,
search_results=request.search_results,
user_id=request.user_id,
agent_id=request.agent_id
)
# Build response
if extraction_result:
response = ExtractionResponse(
success=True,
extraction_triggered=True,
confidence=extractor.calculate_confidence(request.search_results),
receipt_rid=extraction_result.receipt_rid,
facts_extracted=len(extraction_result.extracted_facts),
entities_extracted=len(extraction_result.extracted_entities),
relationships_extracted=len(extraction_result.extracted_relationships),
confidence_improvement=extraction_result.confidence_improvement,
cost_usd=extraction_result.cost_usd,
message=f"Extraction completed successfully. {len(extraction_result.extracted_facts)} facts extracted."
)
logger.info(f"✅ Extraction completed: {len(extraction_result.extracted_facts)} facts, {len(extraction_result.extracted_entities)} entities")
else:
confidence = extractor.calculate_confidence(request.search_results)
response = ExtractionResponse(
success=True,
extraction_triggered=False,
confidence=confidence,
message=f"Extraction not triggered (confidence {confidence:.3f} >= {extractor.CONFIDENCE_THRESHOLD})"
)
logger.info(f"ℹ️ Extraction not triggered - confidence {confidence:.3f} above threshold")
return response
except Exception as e:
logger.error(f"❌ Extraction failed: {e}")
raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}")
@app.get("/stats")
async def get_extraction_stats():
"""Get extraction statistics"""
if not db_pool:
raise HTTPException(status_code=503, detail="Database not available")
try:
async with db_pool.acquire() as conn:
# Recent extractions
recent_extractions = await conn.fetch("""
SELECT
COUNT(*) as total_extractions,
AVG(confidence_improvement) as avg_improvement,
SUM(extraction_cost_usd) as total_cost,
MAX(extraction_timestamp) as last_extraction
FROM koi_adaptive_extractions
WHERE extraction_timestamp > NOW() - INTERVAL '24 hours'
""")
# Query stats
query_stats = await conn.fetch("""
SELECT
COUNT(*) as total_queries,
AVG(confidence_score) as avg_confidence,
COUNT(*) FILTER (WHERE triggered_extraction) as triggered_count
FROM koi_query_log
WHERE timestamp > NOW() - INTERVAL '24 hours'
""")
return {
"extraction_stats": recent_extractions[0] if recent_extractions else {},
"query_stats": query_stats[0] if query_stats else {},
"timestamp": "recent_24h"
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
raise HTTPException(status_code=500, detail=f"Stats query failed: {str(e)}")
if __name__ == "__main__":
# Run the server
uvicorn.run(
"adaptive_extraction_api:app",
host="0.0.0.0",
port=8351,
reload=True,
log_level="info"
)