Skip to content

Commit 2e87c47

Browse files
committed
Add trio message assembler.
Also uniformize code & tests with other implementations.
1 parent 8b1c1d7 commit 2e87c47

File tree

6 files changed

+1001
-156
lines changed

6 files changed

+1001
-156
lines changed

src/websockets/asyncio/messages.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,7 @@ class Assembler:
8181
8282
"""
8383

84-
# coverage reports incorrectly: "line NN didn't jump to the function exit"
85-
def __init__( # pragma: no cover
84+
def __init__(
8685
self,
8786
high: int | None = None,
8887
low: int | None = None,
@@ -155,15 +154,15 @@ async def get(self, decode: bool | None = None) -> Data:
155154
# until get() fetches a complete message or is canceled.
156155

157156
try:
158-
# First frame
157+
# Fetch the first frame.
159158
frame = await self.frames.get(not self.closed)
160159
self.maybe_resume()
161160
assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY
162161
if decode is None:
163162
decode = frame.opcode is OP_TEXT
164163
frames = [frame]
165164

166-
# Following frames, for fragmented messages
165+
# Fetch subsequent frames for fragmented messages.
167166
while not frame.fin:
168167
try:
169168
frame = await self.frames.get(not self.closed)
@@ -230,7 +229,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
230229
# If get_iter() raises an exception e.g. in decoder.decode(),
231230
# get_in_progress remains set and the connection becomes unusable.
232231

233-
# First frame
232+
# Yield the first frame.
234233
try:
235234
frame = await self.frames.get(not self.closed)
236235
except asyncio.CancelledError:
@@ -247,7 +246,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
247246
# Convert to bytes when frame.data is a bytearray.
248247
yield bytes(frame.data)
249248

250-
# Following frames, for fragmented messages
249+
# Yield subsequent frames for fragmented messages.
251250
while not frame.fin:
252251
# We cannot handle asyncio.CancelledError because we don't buffer
253252
# previous fragments — we're streaming them. Canceling get_iter()
@@ -280,22 +279,22 @@ def put(self, frame: Frame) -> None:
280279

281280
def maybe_pause(self) -> None:
282281
"""Pause the writer if queue is above the high water mark."""
283-
# Skip if flow control is disabled
282+
# Skip if flow control is disabled.
284283
if self.high is None:
285284
return
286285

287-
# Check for "> high" to support high = 0
286+
# Check for "> high" to support high = 0.
288287
if len(self.frames) > self.high and not self.paused:
289288
self.paused = True
290289
self.pause()
291290

292291
def maybe_resume(self) -> None:
293292
"""Resume the writer if queue is below the low water mark."""
294-
# Skip if flow control is disabled
293+
# Skip if flow control is disabled.
295294
if self.low is None:
296295
return
297296

298-
# Check for "<= low" to support low = 0
297+
# Check for "<= low" to support low = 0.
299298
if len(self.frames) <= self.low and self.paused:
300299
self.paused = False
301300
self.resume()

src/websockets/sync/messages.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def get(self, timeout: float | None = None, decode: bool | None = None) -> Data:
165165
try:
166166
deadline = Deadline(timeout)
167167

168-
# First frame
168+
# Fetch the first frame.
169169
frame = self.get_next_frame(deadline.timeout(raise_if_elapsed=False))
170170
with self.mutex:
171171
self.maybe_resume()
@@ -174,7 +174,7 @@ def get(self, timeout: float | None = None, decode: bool | None = None) -> Data:
174174
decode = frame.opcode is OP_TEXT
175175
frames = [frame]
176176

177-
# Following frames, for fragmented messages
177+
# Fetch subsequent frames for fragmented messages.
178178
while not frame.fin:
179179
try:
180180
frame = self.get_next_frame(
@@ -245,7 +245,7 @@ def get_iter(self, decode: bool | None = None) -> Iterator[Data]:
245245
# If get_iter() raises an exception e.g. in decoder.decode(),
246246
# get_in_progress remains set and the connection becomes unusable.
247247

248-
# First frame
248+
# Yield the first frame.
249249
frame = self.get_next_frame()
250250
with self.mutex:
251251
self.maybe_resume()
@@ -259,7 +259,7 @@ def get_iter(self, decode: bool | None = None) -> Iterator[Data]:
259259
# Convert to bytes when frame.data is a bytearray.
260260
yield bytes(frame.data)
261261

262-
# Following frames, for fragmented messages
262+
# Yield subsequent frames for fragmented messages.
263263
while not frame.fin:
264264
frame = self.get_next_frame()
265265
with self.mutex:
@@ -300,26 +300,26 @@ def put(self, frame: Frame) -> None:
300300

301301
def maybe_pause(self) -> None:
302302
"""Pause the writer if queue is above the high water mark."""
303-
# Skip if flow control is disabled
303+
# Skip if flow control is disabled.
304304
if self.high is None:
305305
return
306306

307307
assert self.mutex.locked()
308308

309-
# Check for "> high" to support high = 0
309+
# Check for "> high" to support high = 0.
310310
if self.frames.qsize() > self.high and not self.paused:
311311
self.paused = True
312312
self.pause()
313313

314314
def maybe_resume(self) -> None:
315315
"""Resume the writer if queue is below the low water mark."""
316-
# Skip if flow control is disabled
316+
# Skip if flow control is disabled.
317317
if self.low is None:
318318
return
319319

320320
assert self.mutex.locked()
321321

322-
# Check for "<= low" to support low = 0
322+
# Check for "<= low" to support low = 0.
323323
if self.frames.qsize() <= self.low and self.paused:
324324
self.paused = False
325325
self.resume()

0 commit comments

Comments
 (0)