Skip to content
4 changes: 4 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,10 @@ def _fetch_request():
except Exception as e:
err_msg = "Error happened while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
self.llm_logger.error(err_msg)
# Failed to connect to engine worker queue, retry after 5 seconds
if self.engine_worker_queue.is_broken():
self.llm_logger.error("Failed to connect to engine worker queue, retry after 5 seconds")

This comment was marked as outdated.

time.sleep(5)

This comment was marked as outdated.


def _get_scheduler_unhandled_request_num(self) -> int:
"""
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _validate_split_kv_size(value: int) -> int:
"PREFILL_CONTINUOUS_REQUEST_DECODE_RESOURCES": lambda: int(
os.getenv("PREFILL_CONTINUOUS_REQUEST_DECODE_RESOURCES", "1")
),
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")),
"FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")),
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "1")),
"FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "1")),

This comment was marked as outdated.

"FD_FILL_BITMASK_BATCH": lambda: int(os.getenv("FD_FILL_BITMASK_BATCH", "4")),
"FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")),
"FD_ENABLE_ASYNC_LLM": lambda: int(os.getenv("FD_ENABLE_ASYNC_LLM", "0")),
Expand Down
10 changes: 10 additions & 0 deletions fastdeploy/inter_communicator/engine_worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,13 @@ def cleanup(self):
"""
if self.manager is not None and self.is_server:
self.manager.shutdown()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 BaseManager.connect() 的语义是「初始化/重建连接」,而非「探测连接是否存活」。若管理器端口仍在监听但内部状态已损坏,connect() 可能成功握手但后续业务请求仍会失败,导致 is_broken() 误判为正常。

另外请确认:当 FD_ENGINE_TASK_QUEUE_WITH_SHM=0(TCP 模式)时,self.manager.connect() 是否也能正确反映队列状态?建议作者补充说明两种模式下的行为一致性。

def is_broken(self):

This comment was marked as outdated.

try:
self.manager.connect()
return False
except (ConnectionRefusedError, ConnectionResetError, BrokenPipeError, EOFError, OSError):
llm_logger.error("Failed to connect to engine worker queue")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 except Exception: return False 静默吞掉所有非预期异常,无任何日志(命中 §C:except Exception:logger.error → 错误静默)。

self.manager.connect() 因非网络原因抛出异常时,此处会掩盖真实错误,使 is_broken() 误报为「队列正常」。

建议改为:except Exception as e: llm_logger.warning("is_broken: unexpected error: %s", e); return False

return True
except Exception:

This comment was marked as outdated.

return False

This comment was marked as outdated.

35 changes: 35 additions & 0 deletions fastdeploy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ def is_port_available(host, port):
import errno
import socket

# If FD_ENGINE_TASK_QUEUE_WITH_SHM is enabled, then check the file socket is available
if envs.FD_ENGINE_TASK_QUEUE_WITH_SHM:
socket_path = f"/dev/shm/fd_task_queue_{port}.sock"
if not is_file_socket_available(socket_path):
return False

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand All @@ -564,6 +570,35 @@ def is_port_available(host, port):
return True


def is_file_socket_available(socket_path):
"""
Check the Unix domain socket (file socket) is available.

Args:
socket_path: Path to the socket file, e.g. /dev/shm/fd_task_queue_8000.sock

Returns:
True if the socket is available (not in use), False otherwise.
"""
import errno
import os
import socket

if not os.path.exists(socket_path):
return True

# File exists, try to connect to see if someone is listening
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
try:
s.connect(socket_path)
return False
except OSError as e:
if e.errno in (errno.ECONNREFUSED, errno.ENOENT):
# Stale socket file: exists but nobody is listening
return True
return False


def find_free_ports(
port_range: tuple[int, int] = (8000, 65535),
num_ports: int = 1,
Expand Down
95 changes: 10 additions & 85 deletions tests/ci_use/EB_Lite_with_adapter/test_eblite_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import queue
import shutil
import signal
import socket
import subprocess
import sys
import time
Expand All @@ -30,6 +29,7 @@
sys.path.insert(0, project_root)

from ci_use.EB_Lite_with_adapter.zmq_client import LLMControlClient, LLMReqClient
from e2e.utils.serving_utils import clean_ports, is_port_open

env = os.environ.copy()

Expand Down Expand Up @@ -79,88 +79,6 @@ def zmq_control_client():
return client


def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False


def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()

# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass

# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass

# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass


def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)

# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)


@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
Expand All @@ -170,8 +88,15 @@ def setup_and_run_server():
- Waits for server port to open (up to 30 seconds)
- Tears down server after all tests finish
"""
# 清理/dev/shm中的临时文件
try:
subprocess.run("rm -rf /dev/shm/*", shell=True)

This comment was marked as outdated.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 rm -rf /dev/shm/* 会删除 /dev/shm所有文件,在 CI 并发场景下可能误删其他测试任务或系统服务正在使用的共享内存段,引起不相关测试失败。

建议只清理本项目的 socket 文件,与 clean_ports 的实现保持一致:
glob.glob("/dev/shm/fd_task_queue_*.sock") 逐一删除,而非全量清理。

print("Successfully cleaned up /dev/shm.")
except Exception as e:
print(f"Failed to cleanup /dev/shm: {e}")

print("Pre-test port cleanup...")
clean_ports()
clean_ports(PORTS_TO_CLEAN)

base_path = os.getenv("MODEL_PATH")
if base_path:
Expand Down Expand Up @@ -236,7 +161,7 @@ def setup_and_run_server():
print("\n===== Post-test server cleanup... =====")
try:
os.killpg(process.pid, signal.SIGTERM)
clean_ports()
clean_ports(PORTS_TO_CLEAN)
print(f"API server (pid={process.pid}) terminated")
except Exception as e:
print(f"Failed to terminate API server: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,25 @@
# limitations under the License.

import os
import signal
import socket
import subprocess
import sys
import time
import traceback

import pytest

from fastdeploy import LLM, SamplingParams

FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
MAX_WAIT_SECONDS = 60

current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, "..", ".."))
sys.path.insert(0, project_root)
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
clean_ports,
)

def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
MAX_WAIT_SECONDS = 60


def format_chat_prompt(messages):
Expand Down Expand Up @@ -74,35 +68,23 @@ def llm(model_path):
"""
Fixture to initialize the LLM model with a given model path
"""
try:
output = subprocess.check_output(f"lsof -i:{FD_ENGINE_QUEUE_PORT} -t", shell=True).decode().strip()
for pid in output.splitlines():
os.kill(int(pid), signal.SIGKILL)
print(f"Killed process on port {FD_ENGINE_QUEUE_PORT}, pid={pid}")
except subprocess.CalledProcessError:
pass
# Clean ports before starting the test

This comment was marked as outdated.

clean_ports()

try:
start = time.time()
llm = LLM(
model=model_path,
tensor_parallel_size=1,
port=FD_API_PORT,
engine_worker_queue_port=FD_ENGINE_QUEUE_PORT,
cache_queue_port=FD_CACHE_QUEUE_PORT,
max_model_len=32768,
quantization="wint8",
logits_processors=["LogitBiasLogitsProcessor"],
)

# Wait for the port to be open
wait_start = time.time()
while not is_port_open("127.0.0.1", FD_ENGINE_QUEUE_PORT):
if time.time() - wait_start > MAX_WAIT_SECONDS:
pytest.fail(
f"Model engine did not start within {MAX_WAIT_SECONDS} seconds on port {FD_ENGINE_QUEUE_PORT}"
)
time.sleep(1)

time.sleep(2)

This comment was marked as outdated.

This comment was marked as outdated.

print(f"Model loaded successfully from {model_path} in {time.time() - start:.2f}s.")
yield llm
except Exception:
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/utils/serving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,60 @@ def kill_process_on_port(port: int):
pass


def kill_process_by_unix_socket(
socket_path: str,
force: bool = True,
):
"""
根据 unix socket 文件路径杀掉对应进程
cmd: ss -xlpn | grep /dev/shm/fd_task_queue_8664.sock
Args:
socket_path: 例如 /dev/shm/fd_task_queue_8664.sock
force:
True -> SIGKILL
False -> SIGTERM
Returns:
pid 或 None
"""
try:
output = subprocess.check_output(
["ss", "-xlpn"],
text=True,
)
for line in output.splitlines():
if socket_path not in line:
continue
m = re.search(r"pid=(\d+)", line)
if not m:
continue
pid = int(m.group(1))
os.kill(
pid,
signal.SIGKILL if force else signal.SIGTERM,
)
return pid
except Exception:
pass
return None


def cleanup_unix_socket(socket_path: str):
if not os.path.exists(socket_path):
return
try:
pid = kill_process_by_unix_socket(socket_path)
print(f"Killed process by unix socket: {socket_path}, pid={pid}")
except Exception as e:
print(f"Failed to kill process by unix socket: {socket_path}, error={e}")
finally:
try:
if os.path.exists(socket_path):
os.remove(socket_path)
print(f"Cleaned unix socket: {socket_path}")
except Exception:
pass


def clean_ports(ports=None):
"""
Kill all processes occupying the ports
Expand All @@ -117,6 +171,11 @@ def clean_ports(ports=None):
kill_process_on_port(port)
time.sleep(1)

# Clean unix socket, fd_task_queue_*.sock, for FD_ENGINE_TASK_QUEUE_WITH_SHM = 1
print("Cleaning unix socket")
for port in ports:
cleanup_unix_socket(f"/dev/shm/fd_task_queue_{port}.sock")


def clean(ports=None):
"""
Expand Down
Loading
Loading