Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8bc070d
Fix bug on windows with uvicorn when multiple workers.
FrsECM Dec 4, 2024
dff57bc
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 4, 2024
af14a0d
Force socket to listen before starting server
FrsECM Dec 5, 2024
768a56f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 5, 2024
ca0c165
Fix Ctrl+C on windows
FrsECM Dec 8, 2024
e698915
Update src/litserve/server.py
aniketmaurya Dec 6, 2024
23ddf26
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2024
2d231a1
Fix comments - Ctrl+C on Windows
FrsECM Dec 8, 2024
bf19f16
Update src/litserve/server.py
FrsECM Dec 8, 2024
92b9d7e
Update src/litserve/server.py
FrsECM Dec 8, 2024
9015697
Fix threading import Thread
FrsECM Dec 8, 2024
ce29f57
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2024
d9c7df2
Increase test timeout => 30mn
FrsECM Dec 13, 2024
1ef2707
Fix default self._uvicorn_servers
FrsECM Jan 24, 2025
dae6f5c
Fix sockets iteration
FrsECM Jan 24, 2025
ea61b10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 11, 2025
d148a1e
No Need to catch Keyboard Interrupt on windows. just close Threads
FrsECM Feb 18, 2025
ef75962
Merge remote-tracking branch 'upstream/main' into bugfix/windows_mult…
FrsECM Feb 20, 2025
8038d7f
Update Timeout + testing CICD
FrsECM Feb 20, 2025
250c6b1
Merge branch 'main' into bugfix/windows_multiple_workers
aniketmaurya Feb 27, 2025
2d404c4
Fix timeout for gpu tests
FrsECM Feb 28, 2025
0d487d7
Merge remote-tracking branch 'upstream/main' into bugfix/windows_mult…
FrsECM Apr 8, 2025
7534794
Merge remote-tracking branch 'upstream/main' into bugfix/windows_mult…
FrsECM Apr 29, 2025
bf15d9f
Fix KeyboardInterrupt Windows
FrsECM Apr 29, 2025
c37229c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2025
8eebacf
KeyboardInterrupt Windows - MultipleWorkers
FrsECM Apr 29, 2025
a3d0d33
Merge branch 'bugfix/windows_multiple_workers' of https://github.com/…
FrsECM Apr 29, 2025
84e15b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2025
261f866
Move pid detection to LitLoop for less intrusivity
FrsECM Apr 29, 2025
56e8718
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2025
75d1dda
Revert changes in CICD
FrsECM Apr 30, 2025
8d63735
Remove lock (useless)
FrsECM Apr 30, 2025
2daba46
Apply suggestions from code review
aniketmaurya Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/litserve/loops/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import asyncio
import inspect
import logging
import multiprocessing as mp
import os
import pickle
import signal
import sys
import time
from abc import ABC
Expand Down Expand Up @@ -212,6 +215,17 @@ def run(
class LitLoop(_BaseLoop):
def __init__(self):
self._context = {}
self._server_pid = os.getpid()
self._lock = mp.Lock()

def kill(self):
with self._lock:
try:
print(f"Stop Server Requested - Kill parent pid [{self._server_pid}] from [{os.getpid()}]")
os.kill(self._server_pid, signal.SIGTERM)
except PermissionError:
# Access Denied because pid already killed...
return

def get_batch_requests(
self,
Expand Down
7 changes: 6 additions & 1 deletion src/litserve/loops/simple_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def run_single_loop(
response_queue_id, uid, timestamp, x_enc = request_queue.get(timeout=1.0)
except (Empty, ValueError):
continue
except KeyboardInterrupt: # pragma: no cover
self.kill()
return

if (lit_api.request_timeout and lit_api.request_timeout != -1) and (
time.monotonic() - timestamp > lit_api.request_timeout
Expand Down Expand Up @@ -213,7 +216,9 @@ def run_batched_loop(
PickleableHTTPException.from_exception(e),
LitAPIStatus.ERROR,
)

except KeyboardInterrupt: # pragma: no cover
self.kill()
return
except Exception as e:
logger.exception(
"LitAPI ran into an error while processing the batched request.\n"
Expand Down
6 changes: 6 additions & 0 deletions src/litserve/loops/streaming_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def run_streaming_loop(
PickleableHTTPException.from_exception(e),
LitAPIStatus.ERROR,
)
except KeyboardInterrupt: # pragma: no cover
self.kill()
return
except Exception as e:
logger.exception(
"LitAPI ran into an error while processing the streaming request uid=%s.\n"
Expand Down Expand Up @@ -185,6 +188,9 @@ def run_batched_streaming_loop(

for response_queue_id, uid in zip(response_queue_ids, uids):
self.put_response(transport, response_queue_id, uid, "", LitAPIStatus.FINISH_STREAMING)
except KeyboardInterrupt: # pragma: no cover
self.kill()
return

except HTTPException as e:
for response_queue_id, uid in zip(response_queue_ids, uids):
Expand Down
45 changes: 33 additions & 12 deletions src/litserve/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import warnings
from collections import deque
from contextlib import asynccontextmanager
from multiprocessing.context import Process
from threading import Thread
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union

import uvicorn
import uvicorn.server
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import APIKeyHeader
Expand Down Expand Up @@ -176,7 +179,6 @@ def __init__(
DeprecationWarning,
stacklevel=2,
)

lit_api.max_batch_size = max_batch_size
lit_api.batch_timeout = batch_timeout
if isinstance(spec, LitSpec):
Expand Down Expand Up @@ -270,6 +272,7 @@ def __init__(
self._callback_runner = CallbackRunner(callbacks)
self.use_zmq = fast_queue
self.transport_config = None
self._uvicorn_servers: List[uvicorn.Server] = []

specs = spec if spec is not None else []
self._specs = specs if isinstance(specs, Sequence) else [specs]
Expand Down Expand Up @@ -341,6 +344,7 @@ def launch_inference_worker(self, num_uvicorn_servers: int):
),
)
process.start()
print(f"Inference Worker {worker_id} - [{process.pid}]")
process_list.append(process)
return manager, process_list

Expand Down Expand Up @@ -599,20 +603,28 @@ def run(
elif api_server_worker_type is None:
api_server_worker_type = "process"

manager, litserve_workers = self.launch_inference_worker(num_api_servers)
manager, inference_workers = self.launch_inference_worker(num_api_servers)

self.verify_worker_status()
try:
servers = self._start_server(port, num_api_servers, log_level, sockets, api_server_worker_type, **kwargs)
uvicorn_workers = self._start_server(
port, num_api_servers, log_level, sockets, api_server_worker_type, **kwargs
)
print(f"Swagger UI is available at http://0.0.0.0:{port}/docs")
for s in servers:
s.join()
# On Linux, kill signal will be captured by uvicorn.
# => They will join and raise a KeyboardInterrupt, allowing to Shutdown server.
for i, uw in enumerate(uvicorn_workers):
uw: Union[Process, Thread]
if isinstance(uw, Process):
print(f"Uvicorn worker {i} : [{uw.pid}]")
uw.join()
finally:
print("Shutting down LitServe")
self._transport.close()
for w in litserve_workers:
w.terminate()
w.join()
for iw in inference_workers:
iw: Process
iw.terminate()
iw.join()
manager.shutdown()

def _prepare_app_run(self, app: FastAPI):
Expand All @@ -622,16 +634,24 @@ def _prepare_app_run(self, app: FastAPI):
app.add_middleware(RequestCountMiddleware, active_counter=active_counter)

def _start_server(self, port, num_uvicorn_servers, log_level, sockets, uvicorn_worker_type, **kwargs):
servers = []
workers = []
for response_queue_id in range(num_uvicorn_servers):
self.app.response_queue_id = response_queue_id
if self.lit_spec:
self.lit_spec.response_queue_id = response_queue_id
app: FastAPI = copy.copy(self.app)

self._prepare_app_run(app)

config = uvicorn.Config(app=app, host="0.0.0.0", port=port, log_level=log_level, **kwargs)
if sys.platform == "win32" and num_uvicorn_servers > 1:
logger.debug("Enable Windows explicit socket sharing...")
# We make sure sockets is listening...
# It prevents further [WinError 10022]
for sock in sockets:
sock.listen(config.backlog)
# We add worker to say unicorn to use a shared socket (win32)
# https://github.com/encode/uvicorn/pull/802
config.workers = num_uvicorn_servers
server = uvicorn.Server(config=config)
if uvicorn_worker_type == "process":
ctx = mp.get_context("fork")
Expand All @@ -641,8 +661,9 @@ def _start_server(self, port, num_uvicorn_servers, log_level, sockets, uvicorn_w
else:
raise ValueError("Invalid value for api_server_worker_type. Must be 'process' or 'thread'")
w.start()
servers.append(w)
return servers
workers.append(w)
self._uvicorn_servers.append(server)
return workers

def setup_auth(self):
if hasattr(self.lit_api, "authorize") and callable(self.lit_api.authorize):
Expand Down
Loading