diff --git a/docs/project/changelog.rst b/docs/project/changelog.rst index b7f4f62f9..de6b0be21 100644 --- a/docs/project/changelog.rst +++ b/docs/project/changelog.rst @@ -35,6 +35,10 @@ notice. Bug fixes ......... +* Fixed ``connection.recv(timeout=0)`` in the :mod:`threading` implementation. + If a message is already received, it is returned. Previously, + :exc:`TimeoutError` was raised incorrectly. + * Wrapped errors when reading the opening handshake request or response in :exc:`~exceptions.InvalidMessage` so that :func:`~asyncio.client.connect` raises :exc:`~exceptions.InvalidHandshake` or a subclass when the opening diff --git a/src/websockets/sync/messages.py b/src/websockets/sync/messages.py index 98490797f..12e8b1623 100644 --- a/src/websockets/sync/messages.py +++ b/src/websockets/sync/messages.py @@ -79,7 +79,12 @@ def get_next_frame(self, timeout: float | None = None) -> Frame: raise EOFError("stream of frames ended") from None else: try: - frame = self.frames.get(block=True, timeout=timeout) + # Check for a frame that's already received if timeout <= 0. + # SimpleQueue.get() doesn't support negative timeout values. + if timeout is not None and timeout <= 0: + frame = self.frames.get(block=False) + else: + frame = self.frames.get(block=True, timeout=timeout) except queue.Empty: raise TimeoutError(f"timed out in {timeout:.1f}s") from None if frame is None: @@ -143,7 +148,7 @@ def get(self, timeout: float | None = None, decode: bool | None = None) -> Data: deadline = Deadline(timeout) # First frame - frame = self.get_next_frame(deadline.timeout()) + frame = self.get_next_frame(deadline.timeout(raise_if_elapsed=False)) with self.mutex: self.maybe_resume() assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY @@ -154,7 +159,9 @@ def get(self, timeout: float | None = None, decode: bool | None = None) -> Data: # Following frames, for fragmented messages while not frame.fin: try: - frame = self.get_next_frame(deadline.timeout()) + frame = self.get_next_frame( + deadline.timeout(raise_if_elapsed=False) + ) except TimeoutError: # Put frames already received back into the queue # so that future calls to get() can return them. diff --git a/tests/sync/test_messages.py b/tests/sync/test_messages.py index d22693102..0a94b4f85 100644 --- a/tests/sync/test_messages.py +++ b/tests/sync/test_messages.py @@ -198,6 +198,12 @@ def test_get_timeout_after_first_frame(self): message = self.assembler.get() self.assertEqual(message, "café") + def test_get_if_received(self): + """get returns a text message if it's already received.""" + self.assembler.put(Frame(OP_TEXT, b"caf\xc3\xa9")) + message = self.assembler.get(timeout=0) + self.assertEqual(message, "café") + # Test get_iter def test_get_iter_text_message_already_received(self):