@@ -180,73 +180,73 @@ async def _poll_cancellation(conn: AsyncConnection) -> None:
180
180
await asyncio .sleep (_POLL_TIMEOUT )
181
181
182
182
183
- async def async_receive_data_stream (
184
- conn : AsyncConnectionStream , length : int , deadline : Optional [float ]
185
- ) -> memoryview :
186
- # sock = conn.conn
187
- # sock_timeout = sock.gettimeout()
188
- timeout : Optional [Union [float , int ]]
189
- # if deadline:
190
- # # When the timeout has expired perform one final check to
191
- # # see if the socket is readable. This helps avoid spurious
192
- # # timeouts on AWS Lambda and other FaaS environments.
193
- # timeout = max(deadline - time.monotonic(), 0)
194
- # else:
195
- # timeout = sock_timeout
196
- loop = asyncio .get_running_loop ()
197
- done = loop .create_future ()
198
- conn .conn [1 ].reset (done , length )
199
- try :
200
- await asyncio .wait_for (done , timeout = None )
201
- return done .result ()
202
- # read_task = create_task(_async_receive_stream(conn, length))
203
- # tasks = [read_task, cancellation_task]
204
- # done, pending = await asyncio.wait(
205
- # tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED
206
- # )
207
- # print(f"Done: {done}, pending: {pending}")
208
- # for task in pending:
209
- # task.cancel()
210
- # if pending:
211
- # await asyncio.wait(pending)
212
- # if len(done) == 0:
213
- # raise socket.timeout("timed out")
214
- # if read_task in done:
215
- # return read_task.result()
216
- # # raise _OperationCancelled("operation cancelled")
217
- finally :
218
- pass
219
- # sock.settimeout(sock_timeout)
220
-
221
-
222
-
223
- async def async_receive_data_socket (
224
- sock : Union [socket .socket , _sslConn ], length : int
225
- ) -> memoryview :
226
- sock_timeout = sock .gettimeout ()
227
- timeout = sock_timeout
228
-
229
- sock .settimeout (0.0 )
230
- loop = asyncio .get_event_loop ()
231
- try :
232
- if _HAVE_SSL and isinstance (sock , (SSLSocket , _sslConn )):
233
- return await asyncio .wait_for (
234
- _async_receive_ssl (sock , length , loop , once = True ), # type: ignore[arg-type]
235
- timeout = timeout ,
236
- )
237
- else :
238
- return await asyncio .wait_for (_async_receive (sock , length , loop ), timeout = timeout ) # type: ignore[arg-type]
239
- except asyncio .TimeoutError as err :
240
- raise socket .timeout ("timed out" ) from err
241
- finally :
242
- sock .settimeout (sock_timeout )
243
-
244
-
245
- async def _async_receive_stream (reader : asyncio .StreamReader , length : int ) -> memoryview :
246
- try :
247
- return memoryview (await reader .readexactly (length ))
248
- except asyncio .IncompleteReadError :
249
- raise OSError ("connection closed" )
183
+ # async def async_receive_data_stream(
184
+ # conn: AsyncConnectionStream, length: int, deadline: Optional[float]
185
+ # ) -> memoryview:
186
+ # # sock = conn.conn
187
+ # # sock_timeout = sock.gettimeout()
188
+ # timeout: Optional[Union[float, int]]
189
+ # # if deadline:
190
+ # # # When the timeout has expired perform one final check to
191
+ # # # see if the socket is readable. This helps avoid spurious
192
+ # # # timeouts on AWS Lambda and other FaaS environments.
193
+ # # timeout = max(deadline - time.monotonic(), 0)
194
+ # # else:
195
+ # # timeout = sock_timeout
196
+ # loop = asyncio.get_running_loop()
197
+ # done = loop.create_future()
198
+ # conn.conn[1].reset(done, length)
199
+ # try:
200
+ # await asyncio.wait_for(done, timeout=None)
201
+ # return done.result()
202
+ # # read_task = create_task(_async_receive_stream(conn, length))
203
+ # # tasks = [read_task, cancellation_task]
204
+ # # done, pending = await asyncio.wait(
205
+ # # tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED
206
+ # # )
207
+ # # print(f"Done: {done}, pending: {pending}")
208
+ # # for task in pending:
209
+ # # task.cancel()
210
+ # # if pending:
211
+ # # await asyncio.wait(pending)
212
+ # # if len(done) == 0:
213
+ # # raise socket.timeout("timed out")
214
+ # # if read_task in done:
215
+ # # return read_task.result()
216
+ # # # raise _OperationCancelled("operation cancelled")
217
+ # finally:
218
+ # pass
219
+ # # sock.settimeout(sock_timeout)
220
+
221
+
222
+
223
+ # async def async_receive_data_socket(
224
+ # sock: Union[socket.socket, _sslConn], length: int
225
+ # ) -> memoryview:
226
+ # sock_timeout = sock.gettimeout()
227
+ # timeout = sock_timeout
228
+ #
229
+ # sock.settimeout(0.0)
230
+ # loop = asyncio.get_event_loop()
231
+ # try:
232
+ # if _HAVE_SSL and isinstance(sock, (SSLSocket, _sslConn)):
233
+ # return await asyncio.wait_for(
234
+ # _async_receive_ssl(sock, length, loop, once=True), # type: ignore[arg-type]
235
+ # timeout=timeout,
236
+ # )
237
+ # else:
238
+ # return await asyncio.wait_for(_async_receive(sock, length, loop), timeout=timeout) # type: ignore[arg-type]
239
+ # except asyncio.TimeoutError as err:
240
+ # raise socket.timeout("timed out") from err
241
+ # finally:
242
+ # sock.settimeout(sock_timeout)
243
+
244
+
245
+ # async def _async_receive_stream(reader: asyncio.StreamReader, length: int) -> memoryview:
246
+ # try:
247
+ # return memoryview(await reader.readexactly(length))
248
+ # except asyncio.IncompleteReadError:
249
+ # raise OSError("connection closed")
250
250
251
251
def receive_data (conn : Connection , length : int , deadline : Optional [float ]) -> memoryview :
252
252
buf = bytearray (length )
0 commit comments