1
1
from __future__ import annotations
2
2
3
3
import errno
4
+ import math
4
5
import select
5
6
import sys
6
7
from contextlib import contextmanager
9
10
import attr
10
11
import outcome
11
12
12
- from .. import _core
13
+ from .. import _channel , _core
13
14
from ._run import _public
14
15
from ._wakeup_socketpair import WakeupSocketpair
15
16
16
17
if TYPE_CHECKING :
17
18
from typing_extensions import TypeAlias
18
19
19
- from .._core import Abort , RaiseCancelT , Task , UnboundedQueue
20
+ from .._core import Abort , RaiseCancelT , Task
20
21
from .._file_io import _HasFileNo
21
22
22
23
assert not TYPE_CHECKING or (sys .platform != "linux" and sys .platform != "win32" )
@@ -34,10 +35,9 @@ class _KqueueStatistics:
34
35
@attr .s (slots = True , eq = False )
35
36
class KqueueIOManager :
36
37
_kqueue : select .kqueue = attr .ib (factory = select .kqueue )
37
- # {(ident, filter): Task or UnboundedQueue}
38
- _registered : dict [tuple [int , int ], Task | UnboundedQueue [select .kevent ]] = attr .ib (
39
- factory = dict
40
- )
38
+ _registered : dict [
39
+ tuple [int , int ], Task | _channel .MemorySendChannel [select .kevent ]
40
+ ] = attr .ib (factory = dict )
41
41
_force_wakeup : WakeupSocketpair = attr .ib (factory = WakeupSocketpair )
42
42
_force_wakeup_fd : int | None = attr .ib (default = None )
43
43
@@ -94,7 +94,7 @@ def process_events(self, events: EventResult) -> None:
94
94
if isinstance (receiver , _core .Task ):
95
95
_core .reschedule (receiver , outcome .Value (event ))
96
96
else :
97
- receiver .put_nowait (event )
97
+ receiver .send_nowait (event )
98
98
99
99
# kevent registration is complicated -- e.g. aio submission can
100
100
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
@@ -119,7 +119,7 @@ def current_kqueue(self) -> select.kqueue:
119
119
@_public
120
120
def monitor_kevent (
121
121
self , ident : int , filter : int
122
- ) -> Iterator [_core . UnboundedQueue [select .kevent ]]:
122
+ ) -> Iterator [_channel . MemoryRecvChannel [select .kevent ]]:
123
123
"""TODO: these are implemented, but are currently more of a sketch than
124
124
anything real. See `#26
125
125
<https://github.com/python-trio/trio/issues/26>`__.
@@ -129,11 +129,12 @@ def monitor_kevent(
129
129
raise _core .BusyResourceError (
130
130
"attempt to register multiple listeners for same ident/filter pair"
131
131
)
132
- q = _core . UnboundedQueue [select .kevent ]()
133
- self ._registered [key ] = q
132
+ send , recv = _channel . open_memory_channel [select .kevent ](math . inf )
133
+ self ._registered [key ] = send
134
134
try :
135
- yield q
135
+ yield recv
136
136
finally :
137
+ send .close ()
137
138
del self ._registered [key ]
138
139
139
140
@_public
@@ -274,8 +275,5 @@ def notify_closing(self, fd: int | _HasFileNo) -> None:
274
275
_core .reschedule (receiver , outcome .Error (exc ))
275
276
del self ._registered [key ]
276
277
else :
277
- # XX this is an interesting example of a case where being able
278
- # to close a queue would be useful...
279
- raise NotImplementedError (
280
- "can't close an fd that monitor_kevent is using"
281
- )
278
+ receiver .close ()
279
+ del self ._registered [key ]
0 commit comments