1717from fastapi import FastAPI , HTTPException
1818from fastapi .responses import JSONResponse
1919import uvicorn
20+ import httpx
21+ from fastapi .responses import StreamingResponse
2022import logging
2123
2224logging .basicConfig (level = logging .INFO )
@@ -466,6 +468,68 @@ def PullModel(self, request, context):
466468 yield daemon_pb2 .PullModelProgress (message = f"Failed to pull model: { str (e )} " , percent = 100 , done = True , success = False )
467469 return
468470
471+ async def proxy_streaming_request (request , path , port , model_name , model_id , model_manager ):
472+ stream = request .get ("stream" , False )
473+ logger .info (f"Proxying request for model { model_name } on port { port } , stream={ stream } for request: { request } " )
474+ url = f"http://localhost:{ port } { path } "
475+
476+ try :
477+ if stream :
478+ async def generate_chunks ():
479+ async with httpx .AsyncClient () as client :
480+ async with client .stream ('POST' , url , json = request , timeout = None ) as upstream_response :
481+ async for chunk in upstream_response .aiter_text ():
482+ if chunk .startswith ("data: " ):
483+ try :
484+ data0 = json .loads (chunk [6 :].strip ())
485+ data0 ["model" ] = model_name
486+ chunk = f"data: { json .dumps (data0 )} \n \n "
487+ model_manager .last_access [model_id ] = time .time ()
488+ except json .JSONDecodeError :
489+ pass
490+ yield chunk
491+ return StreamingResponse (generate_chunks (), media_type = "text/event-stream" )
492+ else :
493+ async with httpx .AsyncClient () as client :
494+ vllm_response = await client .post (url , json = request , timeout = 30.0 )
495+ rst = vllm_response .json ()
496+ rst ['model' ] = model_name
497+ return JSONResponse (rst , status_code = vllm_response .status_code )
498+ except Exception as e :
499+ raise HTTPException (status_code = 500 , detail = f"vLLM API error: { str (e )} " )
500+
501+ def get_model_ready (model_name , model_manager ):
502+ local , model_id , model_path = model_manager .is_local (model_name )
503+ if not model_id :
504+ raise HTTPException (status_code = 400 , detail = "Missing model name" )
505+ logger .info (f"Checking if model { model_id } is running" )
506+ # Update last access time for model
507+ model_manager .last_access [model_id ] = time .time ()
508+ if not model_manager .is_running (model_id ):
509+ logger .info (f"Model { model_id } is not running, starting it" )
510+ if local :
511+ ok , msg , port , pid = model_manager .start_model (model_id )
512+ logger .info (f"Model start response: { msg } " )
513+ if not ok :
514+ raise HTTPException (status_code = 500 , detail = msg )
515+ else :
516+ raise HTTPException (status_code = 404 , detail = "Model not available locally" )
517+ port = model_manager .get_port (model_id )
518+ logger .info (f"Model { model_id } is running on port { port } " )
519+ if model_manager .wait_for_model (port ):
520+ logger .info (f"Model { model_id } is ready on port { port } " )
521+ if not port :
522+ raise HTTPException (status_code = 500 , detail = "Model port not found" )
523+ return port , model_id
524+
525+
526+ async def proxied_api (request , path , model_manager ):
527+ model_name = request .get ("model" )
528+ logger .info (f"Received request@ path: { path } for model: { model_name } " )
529+ del request ["model" ]
530+ port , model_id = get_model_ready (model_name , model_manager )
531+ return await proxy_streaming_request (request , path , port , model_name , model_id , model_manager )
532+
469533# FastAPI OpenAI-Compatible API
470534def create_api_app (model_manager ):
471535 app = FastAPI ()
@@ -479,63 +543,19 @@ def create_api_app(model_manager):
479543 )
480544
481545
482- import httpx
483-
484- from fastapi .responses import StreamingResponse
485- from fastapi import BackgroundTasks
546+
486547 @app .post ("/v1/chat/completions" )
487- async def chat_completions (request : dict , background_tasks : BackgroundTasks ):
488- model_name = request .get ("model" )
489- logger .info (f"Received chat completion request for model: { model_name } " )
490- local , model_id , model_path = model_manager .is_local (model_name )
491- if not model_id :
492- raise HTTPException (status_code = 400 , detail = "Missing model name" )
493- logger .info (f"Checking if model { model_id } is running" )
494- # Update last access time for model
495- model_manager .last_access [model_id ] = time .time ()
496- if not model_manager .is_running (model_id ):
497- logger .info (f"Model { model_id } is not running, starting it" )
498- if local :
499- ok , msg , port , pid = model_manager .start_model (model_id )
500- logger .info (f"Model start response: { msg } " )
501- if not ok :
502- raise HTTPException (status_code = 500 , detail = msg )
503- else :
504- raise HTTPException (status_code = 404 , detail = "Model not available locally" )
505- port = model_manager .get_port (model_id )
506- logger .info (f"Model { model_id } is running on port { port } " )
507- if model_manager .wait_for_model (port ):
508- logger .info (f"Model { model_id } is ready on port { port } " )
509- if not port :
510- raise HTTPException (status_code = 500 , detail = "Model port not found" )
511- stream = request .get ("stream" , False )
512- url = f"http://localhost:{ port } /v1/chat/completions"
513- del request ["model" ]
514- try :
515- if stream :
516- async def generate_chunks ():
517- async with httpx .AsyncClient () as client :
518- async with client .stream ('POST' , url , json = request , timeout = None ) as upstream_response :
519- async for chunk in upstream_response .aiter_text ():
520- if chunk .startswith ("data: " ):
521- try :
522- data0 = json .loads (chunk [6 :].strip ())
523- data0 ["model" ] = model_name
524- chunk = f"data: { json .dumps (data0 )} \n \n "
525- model_manager .last_access [model_id ] = time .time ()
526- except json .JSONDecodeError :
527- pass
528- yield chunk
529- return StreamingResponse (generate_chunks (), media_type = "text/event-stream" )
530- else :
531- async with httpx .AsyncClient () as client :
532- vllm_response = await client .post (url , json = request , timeout = 30.0 )
533- rst = vllm_response .json ()
534- rst ['model' ] = model_name
535- return JSONResponse (rst , status_code = vllm_response .status_code )
536- except Exception as e :
537- raise HTTPException (status_code = 500 , detail = f"vLLM API error: { str (e )} " )
538-
548+ async def chat_completions (request : dict ):
549+ return await proxied_api (request , "/v1/chat/completions" , model_manager )
550+
551+ @app .post ("/v1/completions" )
552+ async def completions (request : dict ):
553+ return await proxied_api (request , "/v1/completions" , model_manager )
554+
555+ @app .post ("/v1/responses" )
556+ async def responses (request : dict ):
557+ return await proxied_api (request , "/v1/responses" , model_manager )
558+
539559 @app .get ("/v1/models" )
540560 async def list_models ():
541561 logger .info ("Listing available models" )
0 commit comments