@@ -107,7 +107,7 @@ class MplexStream(IMuxedStream):
107
107
108
108
name : str
109
109
stream_id : StreamID
110
- # NOTE: All methods used here are part of Mplex which is a derived
110
+ # NOTE: All methods used here are part of ` Mplex` which is a derived
111
111
# class of IMuxedConn. Ignoring this type assignment should not pose
112
112
# any risk.
113
113
muxed_conn : "Mplex" # type: ignore[assignment]
@@ -117,7 +117,7 @@ class MplexStream(IMuxedStream):
117
117
rw_lock : ReadWriteLock
118
118
close_lock : trio .Lock
119
119
120
- # NOTE: dataIn is size of 8 in Go implementation.
120
+ # NOTE: ` dataIn` is size of 8 in Go implementation.
121
121
incoming_data_channel : "trio.MemoryReceiveChannel[bytes]"
122
122
123
123
event_local_closed : trio .Event
@@ -175,8 +175,8 @@ def _read_return_when_blocked(self) -> bytearray:
175
175
176
176
async def read (self , n : int | None = None ) -> bytes :
177
177
"""
178
- Read up to n bytes. Read possibly returns fewer than n bytes, if
179
- there are not enough bytes in the Mplex buffer. If n is None, read
178
+ Read up to n bytes. Read possibly returns fewer than `n` bytes, if
179
+ there are not enough bytes in the Mplex buffer. If ` n is None` , read
180
180
until EOF.
181
181
182
182
:param n: number of bytes to read
@@ -185,8 +185,8 @@ async def read(self, n: int | None = None) -> bytes:
185
185
async with self .rw_lock .read_lock ():
186
186
if n is not None and n < 0 :
187
187
raise ValueError (
188
- "the number of bytes to read n must be non-negative or "
189
- f"None to indicate read until EOF, got n={ n } "
188
+ "the number of bytes to read `n` must be non-negative or "
189
+ f"` None` to indicate read until EOF, got n={ n } "
190
190
)
191
191
if self .event_reset .is_set ():
192
192
raise MplexStreamReset
@@ -202,8 +202,8 @@ async def read(self, n: int | None = None) -> bytes:
202
202
except trio .EndOfChannel :
203
203
raise MplexStreamEOF
204
204
except trio .WouldBlock :
205
- # We know receive will be blocked here. Wait for data here with
206
- # receive and catch all kinds of errors here.
205
+ # We know ` receive` will be blocked here. Wait for data here with
206
+ # ` receive` and catch all kinds of errors here.
207
207
try :
208
208
data = await self .incoming_data_channel .receive ()
209
209
self ._buf .extend (data )
@@ -213,12 +213,12 @@ async def read(self, n: int | None = None) -> bytes:
213
213
if self .event_remote_closed .is_set ():
214
214
raise MplexStreamEOF
215
215
except trio .ClosedResourceError as error :
216
- # Probably incoming_data_channel is closed in reset when we are
217
- # waiting for receive.
216
+ # Probably ` incoming_data_channel` is closed in ` reset` when
217
+ # we are waiting for ` receive` .
218
218
if self .event_reset .is_set ():
219
219
raise MplexStreamReset
220
220
raise Exception (
221
- "incoming_data_channel is closed but stream is not reset. "
221
+ "` incoming_data_channel` is closed but stream is not reset."
222
222
"This should never happen."
223
223
) from error
224
224
self ._buf .extend (self ._read_return_when_blocked ())
@@ -256,7 +256,7 @@ async def close(self) -> None:
256
256
flag = (
257
257
HeaderTags .CloseInitiator if self .is_initiator else HeaderTags .CloseReceiver
258
258
)
259
- # TODO: Raise when muxed_conn.send_message fails and Mplex isn't shutdown.
259
+ # TODO: Raise when ` muxed_conn.send_message` fails and ` Mplex` isn't shutdown.
260
260
await self .muxed_conn .send_message (flag , None , self .stream_id )
261
261
262
262
_is_remote_closed : bool
0 commit comments