-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
142 lines (115 loc) · 4.54 KB
/
server.py
File metadata and controls
142 lines (115 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
# 禁用 OpenCV 硬件加速(某些设备会崩溃)
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
import snappy
import anyio
import asyncio
import logging
import uvicorn
from typing import Dict, Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from websockets.exceptions import ConnectionClosedOK
# 初始化日志
logger = logging.getLogger(__name__)
# FastAPI 应用及静态资源挂载
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
# 摄像头管理类
class CameraStreamer:
def __init__(self, camera_id: int, width=640, height=480, fps=30):
self.camera_id = camera_id
self.width = width
self.height = height
self.fps = fps
self.frame: Optional[bytes] = None
self.lock = asyncio.Lock()
self.subscriber_count = 0
self.cap: Optional[cv2.VideoCapture] = None
self.running = False
self._thread_stream: Optional[asyncio.Task] = None
async def start(self):
if self.running:
return
self.cap = cv2.VideoCapture(self.camera_id)
if not self.cap.isOpened():
raise RuntimeError(f"无法打开摄像头 {self.camera_id}")
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
logger.info(f"Camera {self.camera_id} opened: {self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}x"
f"{self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} @ {self.cap.get(cv2.CAP_PROP_FPS)} FPS")
self.running = True
self._thread_stream = asyncio.create_task(self._update_frame())
async def _update_frame(self):
delay = 1.0 / self.fps
while self.running:
ret, frame = await anyio.to_thread.run_sync(lambda: self.cap.read())
if ret:
async with self.lock:
self.frame = frame
else:
logger.warning(f"读取摄像头 {self.camera_id} 帧失败")
await asyncio.sleep(delay)
async def get_frame(self) -> Optional[bytes]:
async with self.lock:
if self.frame is None:
return None
ret, jpeg = cv2.imencode(".jpg", self.frame, [int(cv2.IMWRITE_JPEG_QUALITY), 60])
return jpeg.tobytes() if ret else None
async def stop(self):
self.running = False
if self._thread_stream:
await self._thread_stream
if self.cap:
self.cap.release()
self.cap = None
logger.info(f"Camera {self.camera_id} released")
# 全局摄像头实例缓存
camera_streamers: Dict[int, CameraStreamer] = {}
camera_streamers_lock = asyncio.Lock()
@app.get("/", response_class=HTMLResponse)
async def index():
return RedirectResponse("/static/index.html")
@app.websocket("/ws/camera")
async def websocket_camera(websocket: WebSocket, camera_id: int = 0, width: int = 640, height: int = 480,
fps: int = 30):
await websocket.accept()
async with camera_streamers_lock:
streamer = camera_streamers.get(camera_id)
if not streamer:
streamer = CameraStreamer(camera_id, width, height, fps)
try:
await streamer.start()
except RuntimeError as e:
await websocket.send_text(f"ERROR: {e}")
await websocket.close()
return
camera_streamers[camera_id] = streamer
streamer.subscriber_count += 1
try:
delay = 1.0 / fps
while True:
frame = await streamer.get_frame()
if frame:
await websocket.send_bytes(snappy.compress(frame))
await asyncio.sleep(delay)
except (WebSocketDisconnect, ConnectionClosedOK):
logger.info(f"WebSocket 关闭 (camera_id={camera_id})")
except Exception:
logger.exception(f"WebSocket 错误 (camera_id={camera_id})")
finally:
async with camera_streamers_lock:
streamer.subscriber_count -= 1
if streamer.subscriber_count <= 0:
await streamer.stop()
camera_streamers.pop(camera_id, None)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
handlers=[logging.StreamHandler()],
)
uvicorn.run(app, host="0.0.0.0", port=8011)