Skip to content

Commit 244e9f7

Browse files
committed
FIX: Python mockliveserver on Windows
1 parent dea2012 commit 244e9f7

File tree

5 files changed

+52
-52
lines changed

5 files changed

+52
-52
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,4 @@ warn_unused_ignores = true
7676

7777
[tool.pytest.ini_options]
7878
testpaths = ["tests"]
79-
addopts = "--asyncio-mode auto"
79+
asyncio_mode = "auto"

tests/mockliveserver/__main__.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from databento.common.publishers import Dataset
1212
from databento_dbn import Schema
1313

14-
from tests.mockliveserver.controller import CommandProtocol
14+
from tests.mockliveserver.controller import Controller
1515
from tests.mockliveserver.source import ReplayProtocol
1616

1717
from .server import MockLiveServerProtocol
@@ -84,13 +84,11 @@ async def main() -> None:
8484
ip, port, *_ = server._sockets[-1].getsockname() # type: ignore [attr-defined]
8585

8686
# Create command interface for stdin
87-
await loop.connect_read_pipe(
88-
protocol_factory=lambda: CommandProtocol(
89-
api_key_table=api_key_table,
90-
file_replay_table=file_replay_table,
91-
server=server,
92-
),
93-
pipe=sys.stdin,
87+
_ = Controller(
88+
server=server,
89+
api_key_table=api_key_table,
90+
file_replay_table=file_replay_table,
91+
loop=loop,
9492
)
9593

9694
# Log Arguments

tests/mockliveserver/controller.py

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import argparse
44
import asyncio
55
import logging
6+
import sys
67
from collections.abc import Mapping
78
from collections.abc import MutableMapping
89
from pathlib import Path
@@ -18,7 +19,7 @@
1819
logger = logging.getLogger(__name__)
1920

2021

21-
class CommandProtocol(asyncio.Protocol):
22+
class Controller:
2223
command_parser = argparse.ArgumentParser(prog="mockliveserver")
2324
subparsers = command_parser.add_subparsers(dest="command")
2425

@@ -45,47 +46,43 @@ def __init__(
4546
server: asyncio.base_events.Server,
4647
api_key_table: Mapping[str, set[str]],
4748
file_replay_table: MutableMapping[tuple[Dataset, Schema], ReplayProtocol],
49+
loop: asyncio.AbstractEventLoop,
4850
) -> None:
4951
self._server = server
5052
self._api_key_table = api_key_table
5153
self._file_replay_table = file_replay_table
52-
53-
def eof_received(self) -> bool | None:
54-
self._server.close()
55-
return super().eof_received()
56-
57-
def data_received(self, data: bytes) -> None:
58-
logger.debug("%d bytes from stdin", len(data))
59-
try:
60-
command_str = data.decode("utf-8")
61-
except Exception:
62-
logger.error("error parsing command")
63-
raise
64-
65-
for command in command_str.splitlines():
66-
params = self.command_parser.parse_args(command.split())
67-
command_func = getattr(self, f"_command_{params.command}", None)
68-
if command_func is None:
69-
raise ValueError(f"{params.command} does not have a command handler")
54+
self._loop = loop
55+
56+
self._read_task = loop.create_task(self._read_commands())
57+
58+
async def _read_commands(self) -> None:
59+
while self._server.is_serving():
60+
line = await self._loop.run_in_executor(None, sys.stdin.readline)
61+
self.data_received(line.strip())
62+
63+
def data_received(self, command_str: str) -> None:
64+
params = self.command_parser.parse_args(command_str.split())
65+
command_func = getattr(self, f"_command_{params.command}", None)
66+
if command_func is None:
67+
raise ValueError(f"{params.command} does not have a command handler")
68+
else:
69+
logger.info("received command: %s", command_str)
70+
command_params = dict(params._get_kwargs())
71+
command_params.pop("command")
72+
try:
73+
command_func(**command_params)
74+
except Exception:
75+
logger.exception("error processing command: %s", params.command)
76+
print(f"nack: {command_str}", flush=True)
7077
else:
71-
logger.info("received command: %s", command)
72-
command_params = dict(params._get_kwargs())
73-
command_params.pop("command")
74-
try:
75-
command_func(**command_params)
76-
except Exception:
77-
logger.exception("error processing command: %s", params.command)
78-
print(f"nack: {command}", flush=True)
79-
else:
80-
print(f"ack: {command}", flush=True)
81-
82-
return super().data_received(data)
78+
print(f"ack: {command_str}", flush=True)
8379

8480
def _command_close(self, *_: str) -> None:
8581
"""
8682
Close the server.
8783
"""
88-
self._server.close()
84+
self._read_task.cancel()
85+
self._loop.call_soon(self._server.close)
8986

9087
def _command_active_count(self, *_: str) -> None:
9188
"""

tests/mockliveserver/fixture.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import contextlib
55
import os
66
import pathlib
7-
import signal
87
import sys
98
from asyncio.subprocess import Process
109
from collections.abc import AsyncGenerator
@@ -70,13 +69,22 @@ def stdout(self) -> asyncio.StreamReader:
7069
return self._process.stdout
7170
raise RuntimeError("no stream reader for stdout")
7271

73-
async def _send_command(self, command: str) -> None:
72+
async def _send_command(
73+
self,
74+
command: str,
75+
timeout: float = 1.0,
76+
) -> None:
7477
if self._process.stdin is None:
7578
raise RuntimeError("cannot write command to mock live server")
7679
self._process.stdin.write(
7780
f"{command.strip()}\n".encode(),
7881
)
79-
line = await self.stdout.readline()
82+
83+
try:
84+
line = await asyncio.wait_for(self.stdout.readline(), timeout)
85+
except asyncio.TimeoutError:
86+
raise RuntimeError("timeout waiting for command acknowledgement")
87+
8088
line_str = line.decode("utf-8")
8189

8290
if line_str.startswith(f"ack: {command}"):
@@ -140,9 +148,9 @@ async def close(self) -> None:
140148

141149
def kill(self) -> None:
142150
"""
143-
Kill the mock live server by sending SIGKILL.
151+
Kill the mock live server.
144152
"""
145-
self._process.send_signal(signal.SIGKILL)
153+
self._process.kill()
146154

147155
@contextlib.contextmanager
148156
def test_context(self) -> Generator[None, None, None]:
@@ -214,6 +222,7 @@ async def fixture_mock_live_server(
214222
"--echo",
215223
echo_file.resolve(),
216224
"--verbose",
225+
executable=sys.executable,
217226
stdin=asyncio.subprocess.PIPE,
218227
stdout=asyncio.subprocess.PIPE,
219228
stderr=sys.stderr,
@@ -236,5 +245,5 @@ async def fixture_mock_live_server(
236245

237246
yield interface
238247

239-
interface.kill()
240-
await asyncio.wait_for(process.wait(), timeout=1)
248+
process.terminate()
249+
await process.wait()

tests/test_live_client.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from __future__ import annotations
66

77
import pathlib
8-
import platform
98
import random
109
import string
1110
from io import BytesIO
@@ -894,7 +893,6 @@ def test_live_add_stream_path_directory(
894893
live_client.add_stream(tmp_path)
895894

896895

897-
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows")
898896
async def test_live_async_iteration(
899897
live_client: client.Live,
900898
) -> None:
@@ -998,7 +996,6 @@ async def test_live_async_iteration_dropped(
998996
assert live_client._dbn_queue.empty()
999997

1000998

1001-
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows")
1002999
async def test_live_async_iteration_stop(
10031000
live_client: client.Live,
10041001
) -> None:
@@ -1025,7 +1022,6 @@ async def test_live_async_iteration_stop(
10251022
assert live_client._dbn_queue.empty()
10261023

10271024

1028-
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows")
10291025
def test_live_sync_iteration(
10301026
live_client: client.Live,
10311027
) -> None:

0 commit comments

Comments
 (0)