|
16 | 16 | from asyncio import log |
17 | 17 | from asyncio import protocols |
18 | 18 | from asyncio import sslproto |
| 19 | +from asyncio import selector_events |
19 | 20 | from test.test_asyncio import utils as test_utils |
20 | 21 | from test.test_asyncio import functional as func_tests |
21 | 22 |
|
@@ -109,6 +110,48 @@ def test_connection_lost(self): |
109 | 110 | test_utils.run_briefly(self.loop) |
110 | 111 | self.assertIsInstance(waiter.exception(), ConnectionAbortedError) |
111 | 112 |
|
| 113 | + def test_connection_lost_when_busy(self): |
| 114 | + sock = mock.Mock() |
| 115 | + sock.fileno = mock.Mock(return_value=12345) |
| 116 | + sock.send = mock.Mock(side_effect=BrokenPipeError) |
| 117 | + |
| 118 | + # construct StreamWriter chain that contains loop dependant logic this emulates that |
| 119 | + # _make_ssl_transport() does in BaseSelectorEventLoop |
| 120 | + reader = asyncio.StreamReader(limit=2 ** 16, loop=self.loop) |
| 121 | + protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop) |
| 122 | + ssl_proto = self.ssl_protocol(proto=protocol) |
| 123 | + |
| 124 | + # emulate reading decompressed data |
| 125 | + sslobj = mock.Mock() |
| 126 | + sslobj.read.side_effect = ssl.SSLWantReadError |
| 127 | + sslobj.write.side_effect = ssl.SSLWantReadError |
| 128 | + ssl_proto._sslobj = sslobj |
| 129 | + |
| 130 | + # emulate outgoing data |
| 131 | + data = b'An interesting message' |
| 132 | + |
| 133 | + outgoing = mock.Mock() |
| 134 | + outgoing.read = mock.Mock(return_value=data) |
| 135 | + outgoing.pending = len(data) |
| 136 | + ssl_proto._outgoing = outgoing |
| 137 | + |
| 138 | + # use correct socket transport to initialize the SSLProtocol |
| 139 | + selector_events._SelectorSocketTransport(self.loop, sock, ssl_proto) |
| 140 | + transport = ssl_proto._app_transport |
| 141 | + writer = asyncio.StreamWriter(transport, protocol, reader, self.loop) |
| 142 | + |
| 143 | + # Write data to the transport n times in a task that blocks the |
| 144 | + # asyncio event loop from a user perspective. |
| 145 | + async def _write_loop(n): |
| 146 | + for i in range(n): |
| 147 | + writer.write(data) |
| 148 | + await writer.drain() |
| 149 | + |
| 150 | + # The test is successful if we raise the error the next time |
| 151 | + # we try to write to the transport. |
| 152 | + with self.assertRaises(ConnectionResetError): |
| 153 | + self.loop.run_until_complete(_write_loop(2)) |
| 154 | + |
112 | 155 | def test_close_during_handshake(self): |
113 | 156 | # bpo-29743 Closing transport during handshake process leaks socket |
114 | 157 | waiter = self.loop.create_future() |
|
0 commit comments