Skip to content

Commit 51b56cf

Browse files
authored
Improve error handling and logging for streaming (#562)
* Enhance error handling and logging in RegularRequestHandler and improve process/thread naming in LitServer - Updated `_handle_error_response` to log exceptions more clearly and handle byte responses using pickle. - Changed process and thread names in LitServer to a more consistent format for better identification. - Refactored logging format in `configure_logging` to include additional context information. * fix * handle errors * update * update * Refactor error handling in RegularRequestHandler and OpenAISpec - Changed `_handle_error_response` to a static method for improved clarity and usage. - Updated error handling in OpenAISpec to re-raise HTTPException for better error propagation. - Added unit tests for error handling in RegularRequestHandler to ensure proper exception raising and logging.
1 parent ba66a9d commit 51b56cf

File tree

5 files changed

+143
-84
lines changed

5 files changed

+143
-84
lines changed

src/litserve/server.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import multiprocessing as mp
2121
import os
22+
import pickle
2223
import secrets
2324
import sys
2425
import threading
@@ -317,25 +318,36 @@ async def handle_request(self, request, request_type) -> Response:
317318
response, status = self.server.response_buffer.pop(uid)
318319

319320
if status == LitAPIStatus.ERROR:
320-
await self._handle_error_response(response)
321+
self._handle_error_response(response)
321322

322323
# Trigger callback
323324
self.server._callback_runner.trigger_event(EventTypes.ON_RESPONSE.value, litserver=self.server)
324325

325326
return response
326327

327328
except HTTPException as e:
328-
raise e
329+
raise e from None
329330

330331
except Exception as e:
331-
logger.exception(f"Error handling request: {e}")
332-
raise HTTPException(status_code=500, detail="Internal server error")
332+
logger.error(f"Unhandled exception: {e}", exc_info=True)
333+
raise HTTPException(status_code=500, detail="Internal server error") from e
334+
335+
@staticmethod
336+
def _handle_error_response(response):
337+
"""Raise HTTPException as is and rest as 500 after logging the error."""
338+
try:
339+
if isinstance(response, bytes):
340+
response = pickle.loads(response)
341+
raise HTTPException(status_code=response.status_code, detail=response.detail)
342+
except Exception as e:
343+
logger.debug(f"couldn't unpickle error response {e}")
333344

334-
async def _handle_error_response(self, response):
335-
logger.error("Error in request: %s", response)
336345
if isinstance(response, HTTPException):
337346
raise response
338347

348+
if isinstance(response, Exception):
349+
logger.error(f"Error while handling request: {response}")
350+
339351
raise HTTPException(status_code=500, detail="Internal server error")
340352

341353

@@ -800,7 +812,7 @@ def launch_inference_worker(self, lit_api: LitAPI):
800812
self.workers_setup_status,
801813
self._callback_runner,
802814
),
803-
name=f"lit-inference-{endpoint}_{worker_id}",
815+
name="inference-worker",
804816
)
805817
process.start()
806818
process_list.append(process)
@@ -1363,9 +1375,9 @@ def _start_server(self, port, num_uvicorn_servers, log_level, sockets, uvicorn_w
13631375
server = uvicorn.Server(config=uvicorn_config)
13641376
if uvicorn_worker_type == "process":
13651377
ctx = mp.get_context("fork")
1366-
w = ctx.Process(target=server.run, args=(sockets,), name=f"lit-uvicorn-{response_queue_id}")
1378+
w = ctx.Process(target=server.run, args=(sockets,), name=f"LitServer-{response_queue_id}")
13671379
elif uvicorn_worker_type == "thread":
1368-
w = threading.Thread(target=server.run, args=(sockets,), name=f"lit-uvicorn-{response_queue_id}")
1380+
w = threading.Thread(target=server.run, args=(sockets,), name=f"LitServer-{response_queue_id}")
13691381
else:
13701382
raise ValueError("Invalid value for api_server_worker_type. Must be 'process' or 'thread'")
13711383
w.start()

src/litserve/specs/openai.py

Lines changed: 102 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,26 @@ async def encode_response(self, output):
327327
""" # noqa: E501
328328

329329

330+
def _openai_format_error(error: Exception):
331+
if isinstance(error, HTTPException):
332+
return "data: " + json.dumps({
333+
"error": {
334+
"message": error.detail,
335+
"type": "internal",
336+
"param": None,
337+
"code": "internal_error",
338+
}
339+
})
340+
return "data: " + json.dumps({
341+
"error": {
342+
"message": "Internal server error",
343+
"type": "internal",
344+
"param": None,
345+
"code": "internal_error",
346+
}
347+
})
348+
349+
330350
class OpenAISpec(LitSpec):
331351
def __init__(
332352
self,
@@ -486,82 +506,93 @@ async def chat_completion(self, request: ChatCompletionRequest, background_tasks
486506
return await response_task
487507

488508
async def streaming_completion(self, request: ChatCompletionRequest, pipe_responses: List):
489-
model = request.model
490-
usage_info = None
491-
async for streaming_response in azip(*pipe_responses):
492-
choices = []
493-
usage_infos = []
494-
# iterate over n choices
495-
for i, (response, status) in enumerate(streaming_response):
496-
if status == LitAPIStatus.ERROR and isinstance(response, HTTPException):
497-
raise response
498-
elif status == LitAPIStatus.ERROR:
499-
logger.error("Error in streaming response: %s", response)
500-
raise HTTPException(status_code=500)
501-
encoded_response = json.loads(response)
502-
logger.debug(encoded_response)
503-
chat_msg = ChoiceDelta(**encoded_response)
504-
usage_infos.append(UsageInfo(**encoded_response))
505-
choice = ChatCompletionStreamingChoice(
506-
index=i, delta=chat_msg, system_fingerprint="", finish_reason=None
507-
)
509+
try:
510+
model = request.model
511+
usage_info = None
512+
async for streaming_response in azip(*pipe_responses):
513+
choices = []
514+
usage_infos = []
515+
# iterate over n choices
516+
for i, (response, status) in enumerate(streaming_response):
517+
if status == LitAPIStatus.ERROR and isinstance(response, HTTPException):
518+
raise response
519+
elif status == LitAPIStatus.ERROR:
520+
logger.error("Error in streaming response: %s", response)
521+
raise HTTPException(status_code=500)
522+
encoded_response = json.loads(response)
523+
logger.debug(encoded_response)
524+
chat_msg = ChoiceDelta(**encoded_response)
525+
usage_infos.append(UsageInfo(**encoded_response))
526+
choice = ChatCompletionStreamingChoice(
527+
index=i, delta=chat_msg, system_fingerprint="", finish_reason=None
528+
)
508529

509-
choices.append(choice)
530+
choices.append(choice)
531+
532+
# Only use the last item from encode_response
533+
usage_info = sum(usage_infos)
534+
chunk = ChatCompletionChunk(model=model, choices=choices, usage=None)
535+
logger.debug(chunk)
536+
yield f"data: {chunk.model_dump_json(by_alias=True)}\n\n"
510537

511-
# Only use the last item from encode_response
512-
usage_info = sum(usage_infos)
513-
chunk = ChatCompletionChunk(model=model, choices=choices, usage=None)
514-
logger.debug(chunk)
515-
yield f"data: {chunk.model_dump_json(by_alias=True)}\n\n"
516-
517-
choices = [
518-
ChatCompletionStreamingChoice(
519-
index=i,
520-
delta=ChoiceDelta(),
521-
finish_reason="stop",
538+
choices = [
539+
ChatCompletionStreamingChoice(
540+
index=i,
541+
delta=ChoiceDelta(),
542+
finish_reason="stop",
543+
)
544+
for i in range(request.n)
545+
]
546+
last_chunk = ChatCompletionChunk(
547+
model=model,
548+
choices=choices,
549+
usage=usage_info,
522550
)
523-
for i in range(request.n)
524-
]
525-
last_chunk = ChatCompletionChunk(
526-
model=model,
527-
choices=choices,
528-
usage=usage_info,
529-
)
530-
yield f"data: {last_chunk.model_dump_json(by_alias=True)}\n\n"
531-
yield "data: [DONE]\n\n"
551+
yield f"data: {last_chunk.model_dump_json(by_alias=True)}\n\n"
552+
yield "data: [DONE]\n\n"
553+
except Exception as e:
554+
logger.error("Error in streaming response: %s", e, exc_info=True)
555+
yield _openai_format_error(e)
556+
return
532557

533558
async def non_streaming_completion(self, request: ChatCompletionRequest, generator_list: List[AsyncGenerator]):
534-
model = request.model
535-
usage_infos = []
536-
choices = []
537-
# iterate over n choices
538-
for i, streaming_response in enumerate(generator_list):
539-
msgs = []
540-
tool_calls = None
541-
usage = None
542-
async for response, status in streaming_response:
543-
if status == LitAPIStatus.ERROR and isinstance(response, HTTPException):
544-
raise response
545-
if status == LitAPIStatus.ERROR:
546-
logger.error("Error in OpenAI non-streaming response: %s", response)
547-
raise HTTPException(status_code=500)
548-
549-
# data from LitAPI.encode_response
550-
encoded_response = json.loads(response)
551-
logger.debug(encoded_response)
552-
chat_msg = ChatMessage(**encoded_response)
553-
usage = UsageInfo(**encoded_response)
554-
usage_infos.append(usage) # Aggregate usage info across all choices
555-
msgs.append(chat_msg.content)
556-
if chat_msg.tool_calls:
557-
tool_calls = chat_msg.tool_calls
558-
559-
content = "".join(msg for msg in msgs if msg is not None)
560-
msg = {"role": "assistant", "content": content, "tool_calls": tool_calls}
561-
choice = ChatCompletionResponseChoice(index=i, message=msg, finish_reason="stop")
562-
choices.append(choice)
563-
564-
return ChatCompletionResponse(model=model, choices=choices, usage=sum(usage_infos))
559+
try:
560+
model = request.model
561+
usage_infos = []
562+
choices = []
563+
# iterate over n choices
564+
for i, streaming_response in enumerate(generator_list):
565+
msgs = []
566+
tool_calls = None
567+
usage = None
568+
async for response, status in streaming_response:
569+
if status == LitAPIStatus.ERROR and isinstance(response, HTTPException):
570+
raise response
571+
if status == LitAPIStatus.ERROR:
572+
logger.error("Error in OpenAI non-streaming response: %s", response)
573+
raise HTTPException(status_code=500)
574+
575+
# data from LitAPI.encode_response
576+
encoded_response = json.loads(response)
577+
logger.debug(encoded_response)
578+
chat_msg = ChatMessage(**encoded_response)
579+
usage = UsageInfo(**encoded_response)
580+
usage_infos.append(usage) # Aggregate usage info across all choices
581+
msgs.append(chat_msg.content)
582+
if chat_msg.tool_calls:
583+
tool_calls = chat_msg.tool_calls
584+
585+
content = "".join(msg for msg in msgs if msg is not None)
586+
msg = {"role": "assistant", "content": content, "tool_calls": tool_calls}
587+
choice = ChatCompletionResponseChoice(index=i, message=msg, finish_reason="stop")
588+
choices.append(choice)
589+
590+
return ChatCompletionResponse(model=model, choices=choices, usage=sum(usage_infos))
591+
except HTTPException as e:
592+
raise e
593+
except Exception as e:
594+
logger.error("Error in non-streaming response: %s", e, exc_info=True)
595+
raise HTTPException(status_code=500)
565596

566597

567598
class _AsyncOpenAISpecWrapper(_AsyncSpecWrapper):

src/litserve/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131

3232
logger = logging.getLogger(__name__)
3333

34+
_DEFAULT_LOG_FORMAT = (
35+
"%(asctime)s - %(processName)s[%(process)d] - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
36+
)
37+
3438

3539
class LitAPIStatus:
3640
OK = "OK"
@@ -128,7 +132,7 @@ def _get_default_handler(stream, format):
128132

129133
def configure_logging(
130134
level: Union[str, int] = logging.INFO,
131-
format: str = "%(processName)s[%(process)d] - %(name)s - %(levelname)s - %(message)s",
135+
format: str = _DEFAULT_LOG_FORMAT,
132136
stream: TextIO = sys.stdout,
133137
use_rich: bool = False,
134138
):

tests/unit/test_request_handlers.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from unittest.mock import AsyncMock, MagicMock, patch
1818

1919
import pytest
20-
from fastapi import Request
20+
from fastapi import HTTPException, Request
2121

2222
from litserve.server import BaseRequestHandler, RegularRequestHandler
2323
from litserve.test_examples import SimpleLitAPI
@@ -92,3 +92,15 @@ async def test_request_handler_streaming(mock_event, mock_lit_api):
9292
response = await handler.handle_request(mock_request, Request)
9393
assert mock_server.request_queue.qsize() == 1
9494
assert response == "test-response"
95+
96+
97+
def test_regular_handler_error_response():
98+
with pytest.raises(HTTPException) as e:
99+
RegularRequestHandler._handle_error_response(HTTPException(status_code=500, detail="test error response"))
100+
assert e.value.status_code == 500
101+
assert e.value.detail == "test error response"
102+
103+
with pytest.raises(HTTPException) as e:
104+
RegularRequestHandler._handle_error_response(Exception("test exception"))
105+
assert e.value.status_code == 500
106+
assert e.value.detail == "Internal server error"

tests/unit/test_specs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,13 @@ def predict(self, prompt):
379379

380380
@pytest.mark.asyncio
381381
async def test_fail_http(openai_request_data):
382-
server = ls.LitServer(WrongLitAPI(), spec=ls.OpenAISpec())
382+
server = ls.LitServer(WrongLitAPI(spec=ls.OpenAISpec()))
383383
with wrap_litserve_start(server) as server:
384384
async with LifespanManager(server.app) as manager, AsyncClient(
385385
transport=ASGITransport(app=manager.app), base_url="http://test"
386386
) as ac:
387387
res = await ac.post("/v1/chat/completions", json=openai_request_data, timeout=10)
388-
assert res.status_code == 501, "Server raises 501 error"
388+
assert res.status_code == 501, f"Server raises 501 error: {res.content}"
389389
assert res.text == '{"detail":"test LitAPI.predict error"}'
390390

391391

0 commit comments

Comments
 (0)