Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ def _pipe_closed(self, fut):
class _ProactorDatagramTransport(_ProactorBasePipeTransport,
transports.DatagramTransport):
max_size = 256 * 1024
_header_size = 8

def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
self._address = address
Expand Down Expand Up @@ -499,7 +501,7 @@ def sendto(self, data, addr=None):

# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
self._buffer_size += len(data) + 8 # include header bytes
self._buffer_size += len(data) + self._header_size

if self._write_fut is None:
# No current write operations are active, kick one off
Expand All @@ -526,7 +528,7 @@ def _loop_writing(self, fut=None):
return

data, addr = self._buffer.popleft()
self._buffer_size -= len(data)
self._buffer_size -= len(data) + self._header_size
if self._address is not None:
self._write_fut = self._loop._proactor.send(self._sock,
data)
Expand Down
7 changes: 4 additions & 3 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,7 @@ def close(self):
class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):

_buffer_factory = collections.deque
_header_size = 8

def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
Expand Down Expand Up @@ -1285,21 +1286,21 @@ def sendto(self, data, addr=None):

# Ensure that what we buffer is immutable.
self._buffer.append((bytes(data), addr))
self._buffer_size += len(data) + 8 # include header bytes
self._buffer_size += len(data) + self._header_size
self._maybe_pause_protocol()

def _sendto_ready(self):
while self._buffer:
data, addr = self._buffer.popleft()
self._buffer_size -= len(data)
self._buffer_size -= len(data) + self._header_size
try:
if self._extra['peername']:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
self._buffer_size += len(data)
self._buffer_size += len(data) + self._header_size
break
except OSError as exc:
self._protocol.error_received(exc)
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ def test_sendto(self):
self.assertTrue(self.proactor.sendto.called)
self.proactor.sendto.assert_called_with(
self.sock, data, addr=('0.0.0.0', 1234))
self.assertFalse(transport._buffer)
self.assertEqual(0, transport._buffer_size)

def test_sendto_bytearray(self):
data = bytearray(b'data')
Expand Down
41 changes: 41 additions & 0 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,47 @@ def test_sendto_closing(self):
transport.sendto(b'data', (1,))
self.assertEqual(transport._conn_lost, 2)

def test_sendto_sendto_ready(self):
data = b'data'

# First queue up a buffer by having the socket block
self.sock.sendto.side_effect = BlockingIOError
transport = self.datagram_transport()
transport.sendto(data, ('0.0.0.0', 12345))
self.loop.assert_writer(7, transport._sendto_ready)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

# Now let the socket send the buffer
self.sock.sendto.side_effect = None
transport._sendto_ready()
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
self.assertFalse(self.loop.writers)
self.assertFalse(transport._buffer)
self.assertEqual(transport._buffer_size, 0)

def test_sendto_sendto_ready_blocked(self):
data = b'data'

# First queue up a buffer by having the socket block
self.sock.sendto.side_effect = BlockingIOError
transport = self.datagram_transport()
transport.sendto(data, ('0.0.0.0', 12345))
self.loop.assert_writer(7, transport._sendto_ready)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

# Now try to send the buffer, it will be added to buffer again if it fails
transport._sendto_ready()
self.assertTrue(self.sock.sendto.called)
self.assertEqual(
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 12345)))
self.assertTrue(self.loop.writers)
self.assertEqual(1, len(transport._buffer))
self.assertEqual(transport._buffer_size, len(data) + transport._header_size)

def test_sendto_ready(self):
data = b'data'
self.sock.sendto.return_value = len(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix :meth:`asyncio.DatagramTransport.sendto` to account for datagram header size when
data cannot be sent.
Loading