|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
3 | 3 | import random
|
| 4 | +import select |
4 | 5 | import socket as stdlib_socket
|
| 6 | +import sys |
5 | 7 | from collections.abc import Awaitable, Callable
|
6 | 8 | from contextlib import suppress
|
7 | 9 | from typing import TYPE_CHECKING, TypeVar
|
@@ -343,6 +345,7 @@ def check(*, expected_readers: int, expected_writers: int) -> None:
|
343 | 345 | assert iostats.tasks_waiting_write == expected_writers
|
344 | 346 | else:
|
345 | 347 | assert iostats.backend == "kqueue"
|
| 348 | + assert iostats.monitors == 0 |
346 | 349 | assert iostats.tasks_waiting == expected_readers + expected_writers
|
347 | 350 |
|
348 | 351 | a1, b1 = stdlib_socket.socketpair()
|
@@ -381,6 +384,44 @@ def check(*, expected_readers: int, expected_writers: int) -> None:
|
381 | 384 | check(expected_readers=1, expected_writers=0)
|
382 | 385 |
|
383 | 386 |
|
| 387 | +@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning") |
| 388 | +async def test_io_manager_kqueue_monitors_statistics() -> None: |
| 389 | + def check( |
| 390 | + *, |
| 391 | + expected_monitors: int, |
| 392 | + expected_readers: int, |
| 393 | + expected_writers: int, |
| 394 | + ) -> None: |
| 395 | + statistics = _core.current_statistics() |
| 396 | + print(statistics) |
| 397 | + iostats = statistics.io_statistics |
| 398 | + assert iostats.backend == "kqueue" |
| 399 | + assert iostats.monitors == expected_monitors |
| 400 | + assert iostats.tasks_waiting == expected_readers + expected_writers |
| 401 | + |
| 402 | + a1, b1 = stdlib_socket.socketpair() |
| 403 | + for sock in [a1, b1]: |
| 404 | + sock.setblocking(False) |
| 405 | + |
| 406 | + with a1, b1: |
| 407 | + # let the call_soon_task settle down |
| 408 | + await wait_all_tasks_blocked() |
| 409 | + |
| 410 | + if sys.platform != "win32" and sys.platform != "linux": |
| 411 | + # 1 for call_soon_task |
| 412 | + check(expected_monitors=0, expected_readers=1, expected_writers=0) |
| 413 | + |
| 414 | + with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ): |
| 415 | + with ( |
| 416 | + pytest.raises(_core.BusyResourceError), |
| 417 | + _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ), |
| 418 | + ): |
| 419 | + pass # pragma: no cover |
| 420 | + check(expected_monitors=1, expected_readers=1, expected_writers=0) |
| 421 | + |
| 422 | + check(expected_monitors=0, expected_readers=1, expected_writers=0) |
| 423 | + |
| 424 | + |
384 | 425 | async def test_can_survive_unnotified_close() -> None:
|
385 | 426 | # An "unnotified" close is when the user closes an fd/socket/handle
|
386 | 427 | # directly, without calling notify_closing first. This should never happen
|
|
0 commit comments