33import logging
44import asyncio
55import uuid
6+ from requests import request , Response
7+
8+ from urllib .parse import urljoin , urlparse
9+ from typing import Dict
10+
11+ from settings import settings
612from evaluator import EvaluationParams , ExperimentStatus , execute_evaluation
713
814# Configure logging
1117
1218app = FastAPI ()
1319
14- TASK_QUEUE : asyncio .Queue [EvaluationParams ] = asyncio .Queue (maxsize = 10 ) # Maximum number of evaluations in queue
20+ TASK_QUEUE : asyncio .Queue [EvaluationParams ] = asyncio .Queue (
21+ maxsize = 10
22+ ) # Maximum number of evaluations in queue
1523MAX_CONCURRENT_TASKS = 5 # Number of concurrent evaluations
1624
25+
1726class RunEvaluationsRequest (BaseModel ):
1827 workspace_name : str
1928 dataset_name : str
@@ -22,10 +31,21 @@ class RunEvaluationsRequest(BaseModel):
2231 base_prompt_name : str
2332 workflow : str
2433
34+
2535class RunEvaluationsResponse (BaseModel ):
2636 status : str
2737 task_id : str
2838
39+
40+ class HealthResponse (BaseModel ):
41+ ellmental : Dict [str , str ]
42+ opik : Dict [str , str ]
43+
44+
45+ def get_domain (url : str ) -> str :
46+ return urlparse (url ).netloc .split (":" )[0 ]
47+
48+
2949async def process_queue ():
3050 """Background task to process queued evaluations"""
3151 while True :
@@ -40,12 +60,58 @@ async def process_queue():
4060 finally :
4161 TASK_QUEUE .task_done ()
4262
63+
4364@app .on_event ("startup" )
4465async def startup_event ():
4566 # Start background workers to process the queue
4667 for _ in range (MAX_CONCURRENT_TASKS ):
4768 asyncio .create_task (process_queue ())
4869
70+
71+ @app .get ("/health" , response_model = HealthResponse )
72+ async def health (timeout : int = 5 ):
73+ ellmental_health_url : str = urljoin (settings .ELLMENTAL_API_URL , "/health" )
74+ opik_health_url : str = urljoin (settings .OPIK_URL_OVERRIDE , "/is-alive/ping" )
75+
76+ ellm_health = {"status" : "healthy" , "message" : "eLLMental is healthy" }
77+ opik_health = {"status" : "healthy" , "message" : "OPIK is healthy" }
78+ try :
79+ ellm_response : Response = request (
80+ method = "GET" , url = ellmental_health_url , timeout = timeout
81+ )
82+ if ellm_response .status_code != 200 :
83+ ellm_health ["status" ] = "unhealthy"
84+ ellm_health ["message" ] = str (ellm_response .text ).replace (
85+ get_domain (settings .ELLMENTAL_API_URL ), "***"
86+ )
87+ except Exception as e :
88+ ellm_health ["status" ] = "unhealthy"
89+ ellm_health ["message" ] = str (e ).replace (
90+ get_domain (settings .ELLMENTAL_API_URL ), "***"
91+ )
92+ try :
93+ opik_response : Response = request (
94+ method = "GET" , url = opik_health_url , timeout = timeout
95+ )
96+ if opik_response .status_code != 200 :
97+ opik_health ["status" ] = "unhealthy"
98+ opik_health ["message" ] = str (opik_response .text ).replace (
99+ get_domain (settings .OPIK_URL_OVERRIDE ), "***"
100+ )
101+ except Exception as e :
102+ opik_health ["status" ] = "unhealthy"
103+ opik_health ["message" ] = str (e ).replace (
104+ get_domain (settings .OPIK_URL_OVERRIDE ), "***"
105+ )
106+
107+ response = {"ellmental" : ellm_health , "opik" : opik_health }
108+
109+ if all (health ["status" ] == "healthy" for health in [ellm_health , opik_health ]):
110+ return HealthResponse (** response )
111+ else :
112+ raise HTTPException (status_code = 503 , detail = response )
113+
114+
49115@app .post ("/evaluations/run" , response_model = RunEvaluationsResponse )
50116async def run_evaluation (input : RunEvaluationsRequest , req : Request ):
51117 try :
@@ -67,11 +133,17 @@ async def run_evaluation(input: RunEvaluationsRequest, req: Request):
67133 TASK_QUEUE .put_nowait (evaluation_params )
68134 logger .info ("Evaluation task added to queue" )
69135 except asyncio .QueueFull :
70- logger .error (f"Queue is full. Evaluation task not added to the queue: { evaluation_params } " )
71- raise HTTPException (status_code = 503 , detail = "Server is currently at maximum capacity. Please try again later." )
72-
73- return RunEvaluationsResponse (status = ExperimentStatus .RUNNING .value , task_id = task_id )
136+ logger .error (
137+ f"Queue is full. Evaluation task not added to the queue: { evaluation_params } "
138+ )
139+ raise HTTPException (
140+ status_code = 503 ,
141+ detail = "Server is currently at maximum capacity. Please try again later." ,
142+ )
143+
144+ return RunEvaluationsResponse (
145+ status = ExperimentStatus .RUNNING .value , task_id = task_id
146+ )
74147 except Exception as e :
75148 logger .error (f"Error processing request: { str (e )} " )
76- raise HTTPException (status_code = 500 , detail = str (e ))
77-
149+ raise HTTPException (status_code = 500 , detail = str (e ))
0 commit comments