-
Notifications
You must be signed in to change notification settings - Fork 82
add pipe_duplex for socket <-> websocket #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b1b9266
3be5fff
23989a2
e8c255d
1b50f3c
16c20c9
a00b676
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| import asyncio | ||
| import socket | ||
| from typing import Optional, Protocol | ||
| from starlette.websockets import WebSocket, WebSocketDisconnect | ||
|
|
||
|
|
||
| class AsyncDuplex(Protocol): | ||
| async def read(self, n: int = -1) -> bytes: ... | ||
| async def write(self, data: bytes): ... | ||
| async def close(self): ... | ||
|
|
||
|
|
||
| async def pipe_duplex(a: AsyncDuplex, b: AsyncDuplex, label_a="A", label_b="B"): | ||
| """双向管道:a <-> b""" | ||
codeskyblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| task_ab = asyncio.create_task(_pipe_oneway(a, b, f"{label_a}->{label_b}")) | ||
| task_ba = asyncio.create_task(_pipe_oneway(b, a, f"{label_b}->{label_a}")) | ||
| done, pending = await asyncio.wait( | ||
| [task_ab, task_ba], | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
| for t in pending: | ||
| t.cancel() | ||
codeskyblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if pending: | ||
| await asyncio.gather(*pending, return_exceptions=True) | ||
|
|
||
|
|
||
| async def _pipe_oneway(src: AsyncDuplex, dst: AsyncDuplex, name: str): | ||
| try: | ||
| while True: | ||
| data = await src.read(4096) | ||
| if not data: | ||
| break | ||
| await dst.write(data) | ||
| except asyncio.CancelledError: | ||
| pass | ||
codeskyblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| except Exception as e: | ||
| print(f"[{name}] error:", e) | ||
codeskyblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| finally: | ||
| await dst.close() | ||
|
|
||
| class RWSocketDuplex: | ||
codeskyblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def __init__(self, rsock: socket.socket, wsock: socket.socket, loop=None): | ||
| self.rsock = rsock | ||
| self.wsock = wsock | ||
| self._same = rsock is wsock | ||
| self.loop = loop or asyncio.get_running_loop() | ||
| self._closed = False | ||
|
|
||
| self.rsock.setblocking(False) | ||
| if not self._same: | ||
| self.wsock.setblocking(False) | ||
|
|
||
| async def read(self, n: int = 4096) -> bytes: | ||
| if self._closed: | ||
| return b'' | ||
| try: | ||
| data = await self.loop.sock_recv(self.rsock, n) | ||
| if not data: | ||
| await self.close() | ||
| return b'' | ||
| return data | ||
| except (ConnectionResetError, OSError): | ||
| await self.close() | ||
| return b'' | ||
|
|
||
| async def write(self, data: bytes): | ||
| if not data or self._closed: | ||
| return | ||
| try: | ||
| await self.loop.sock_sendall(self.wsock, data) | ||
| except (ConnectionResetError, OSError): | ||
| await self.close() | ||
|
|
||
| async def close(self): | ||
| if self._closed: | ||
| return | ||
| self._closed = True | ||
| try: | ||
| self.rsock.close() | ||
| except Exception: | ||
| pass | ||
| if not self._same: | ||
| try: | ||
| self.wsock.close() | ||
| except Exception: | ||
| pass | ||
|
|
||
| def is_closed(self): | ||
| return self._closed | ||
|
|
||
| class SocketDuplex(RWSocketDuplex): | ||
| """封装 socket.socket 为 AsyncDuplex 接口""" | ||
| def __init__(self, sock: socket.socket, loop: Optional[asyncio.AbstractEventLoop] = None): | ||
| super().__init__(sock, sock, loop) | ||
|
|
||
|
|
||
| class WebSocketDuplex: | ||
| """将 starlette.websockets.WebSocket 封装为 AsyncDuplex""" | ||
| def __init__(self, ws: WebSocket): | ||
| self.ws = ws | ||
| self._closed = False | ||
|
|
||
| async def read(self, n: int = -1) -> bytes: | ||
| """读取二进制消息,如果是文本则自动转 bytes""" | ||
| if self._closed: | ||
| return b'' | ||
| try: | ||
| msg = await self.ws.receive() | ||
| except WebSocketDisconnect: | ||
| self._closed = True | ||
| return b'' | ||
| except Exception: | ||
| self._closed = True | ||
| return b'' | ||
|
Comment on lines
+112
to
+114
|
||
|
|
||
| if msg["type"] == "websocket.disconnect": | ||
| self._closed = True | ||
| return b'' | ||
| elif msg["type"] == "websocket.receive": | ||
| data = msg.get("bytes") | ||
| if data is not None: | ||
| return data | ||
| text = msg.get("text") | ||
| return text.encode("utf-8") if text else b'' | ||
| return b'' | ||
|
|
||
| async def write(self, data: bytes): | ||
| if self._closed: | ||
| return | ||
| try: | ||
| await self.ws.send_bytes(data) | ||
| except Exception: | ||
| self._closed = True | ||
|
Comment on lines
+132
to
+133
|
||
|
|
||
| async def close(self): | ||
| if not self._closed: | ||
| self._closed = True | ||
| try: | ||
| await self.ws.close() | ||
| except Exception: | ||
| pass | ||
|
Comment on lines
+140
to
+141
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import logging | ||
| from pathlib import Path | ||
| import socket | ||
| from adbutils import AdbConnection, AdbDevice, AdbError, Network | ||
| from fastapi import WebSocket | ||
| from retry import retry | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class ScrcpyServer3: | ||
| VERSION = "3.3.3" | ||
|
|
||
| def __init__(self, device: AdbDevice): | ||
| self._device = device | ||
| self._shell_conn: AdbConnection | ||
| self._video_sock: socket.socket | ||
| self._control_sock: socket.socket | ||
|
|
||
| self._shell_conn = self._start_scrcpy3() | ||
| self._video_sock = self._connect_scrcpy(dummy_byte=True) | ||
| self._control_sock = self._connect_scrcpy() | ||
|
|
||
| def _start_scrcpy3(self): | ||
| device = self._device | ||
| jar_path = Path(__file__).parent.joinpath(f'../binaries/scrcpy-server-v{self.VERSION}.jar') | ||
| device.sync.push(jar_path, '/data/local/tmp/scrcpy_server.jar', check=True) | ||
| logger.info(f'{jar_path.name} pushed to device') | ||
|
|
||
| # 构建启动 scrcpy 服务器的命令 | ||
| cmds = [ | ||
| 'CLASSPATH=/data/local/tmp/scrcpy_server.jar', | ||
| 'app_process', '/', | ||
| f'com.genymobile.scrcpy.Server', self.VERSION, | ||
| 'log_level=info', 'max_size=1024', 'max_fps=30', | ||
| 'video_bit_rate=8000000', 'tunnel_forward=true', | ||
| 'send_frame_meta=true', | ||
| f'control=true', | ||
| 'audio=false', 'show_touches=false', 'stay_awake=false', | ||
| 'power_off_on_close=false', 'clipboard_autosync=false' | ||
| ] | ||
| conn = device.shell(cmds, stream=True) | ||
| logger.debug("scrcpy output: %s", conn.conn.recv(100)) | ||
| return conn | ||
|
|
||
| @retry(exceptions=AdbError, tries=20, delay=0.1) | ||
| def _connect_scrcpy(self, dummy_byte: bool = False) -> socket.socket: | ||
| sock = self._device.create_connection(Network.LOCAL_ABSTRACT, 'scrcpy') | ||
| if dummy_byte: | ||
| received = sock.recv(1) | ||
| if not received or received != b"\x00": | ||
| raise ConnectionError("Did not receive Dummy Byte!") | ||
| logger.debug('Received Dummy Byte!') | ||
| return sock | ||
|
|
||
| def stream_to_websocket(self, ws: WebSocket): | ||
| from .pipe import RWSocketDuplex, WebSocketDuplex, AsyncDuplex, pipe_duplex | ||
| socket_duplex = RWSocketDuplex(self._video_sock, self._control_sock) | ||
| websocket_duplex = WebSocketDuplex(ws) | ||
| return pipe_duplex(socket_duplex, websocket_duplex) | ||
|
|
||
| def close(self): | ||
| self._safe_close_sock(self._control_sock) | ||
| self._safe_close_sock(self._video_sock) | ||
| self._shell_conn.close() | ||
|
|
||
| def _safe_close_sock(self, sock: socket.socket): | ||
| try: | ||
| sock.close() | ||
| except: | ||
| pass | ||
|
|
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,7 +1,7 @@ | ||||||
| # prefix for /api/android/{serial}/shell | ||||||
|
|
||||||
| import logging | ||||||
| from typing import Dict, Optional | ||||||
| from typing import Dict, Optional, Union | ||||||
|
||||||
| from typing import Dict, Optional, Union | |
| from typing import Union |
Uh oh!
There was an error while loading. Please reload this page.