@@ -321,40 +321,26 @@ def _multiplexed_buffer_helper(self, response):
321
321
walker = end
322
322
yield buf [start :end ]
323
323
324
- def _multiplexed_socket_stream_helper (self , response ):
324
+ def _multiplexed_response_stream_helper (self , response ):
325
325
"""A generator of multiplexed data blocks coming from a response
326
- socket."""
327
- socket = self ._get_raw_response_socket (response )
328
-
329
- def recvall (socket , size ):
330
- blocks = []
331
- while size > 0 :
332
- if six .PY3 :
333
- block = socket ._sock .recv (size )
334
- else :
335
- block = socket .recv (size )
336
- if not block :
337
- return None
338
-
339
- blocks .append (block )
340
- size -= len (block )
326
+ stream."""
341
327
342
- sep = bytes () if six .PY3 else str ()
343
- data = sep .join (blocks )
344
- return data
328
+ # Disable timeout on the underlying socket to prevent
329
+ # Read timed out(s) for long running processes
330
+ socket = self ._get_raw_response_socket (response )
331
+ if six .PY3 :
332
+ socket ._sock .settimeout (None )
333
+ else :
334
+ socket .settimeout (None )
345
335
346
336
while True :
347
- if six .PY3 :
348
- socket ._sock .settimeout (None )
349
- else :
350
- socket .settimeout (None )
351
- header = recvall (socket , STREAM_HEADER_SIZE_BYTES )
337
+ header = response .raw .read (STREAM_HEADER_SIZE_BYTES )
352
338
if not header :
353
339
break
354
340
_ , length = struct .unpack ('>BxxxL' , header )
355
341
if not length :
356
342
break
357
- data = recvall ( socket , length )
343
+ data = response . raw . read ( length )
358
344
if not data :
359
345
break
360
346
yield data
@@ -388,7 +374,7 @@ def stream_result():
388
374
389
375
sep = bytes () if six .PY3 else str ()
390
376
391
- return stream and self ._multiplexed_socket_stream_helper (response ) or \
377
+ return stream and self ._multiplexed_response_stream_helper (response ) or \
392
378
sep .join ([x for x in self ._multiplexed_buffer_helper (response )])
393
379
394
380
def attach_socket (self , container , params = None , ws = False ):
@@ -606,7 +592,7 @@ def execute(self, container, cmd, detach=False, stdout=True, stderr=True,
606
592
data = data , stream = stream )
607
593
self ._raise_for_status (res )
608
594
if stream :
609
- return self ._multiplexed_socket_stream_helper (res )
595
+ return self ._multiplexed_response_stream_helper (res )
610
596
elif six .PY3 :
611
597
return bytes ().join (
612
598
[x for x in self ._multiplexed_buffer_helper (res )]
@@ -776,7 +762,7 @@ def logs(self, container, stdout=True, stderr=True, stream=False,
776
762
url = self ._url ("/containers/{0}/logs" .format (container ))
777
763
res = self ._get (url , params = params , stream = stream )
778
764
if stream :
779
- return self ._multiplexed_socket_stream_helper (res )
765
+ return self ._multiplexed_response_stream_helper (res )
780
766
elif six .PY3 :
781
767
return bytes ().join (
782
768
[x for x in self ._multiplexed_buffer_helper (res )]
0 commit comments