Skip to content

Commit 9676feb

Browse files
author
Davanum Srinivas
committed
Fix to enable streaming container logs reliably
Started a ubuntu container that just runs "ping 8.8.8.8" and tried the sample code in https://gist.github.com/dims/c3327f633c526847c8e5 to recreate the problem mentioned in: #300 To debug the problem i printed the byte array read in recvall when reading STREAM_HEADER_SIZE_BYTES and realized that the data being read was far ahead of the actual start of the header documented in the vnd.docker.raw-stream of the docker remote api. This is possibly because the requests/urllib3 is reading ahead a bit more and we shouldn't be trying to hack the internals of those projects. So just using the documented file-like response.raw is good enough for us to get the functionality we need which is being able to read for exactly where the stream header starts. With this change i can reliably stream the logs just like "docker logs --follow". Note that we still need to access the underlying socket to set the timeout to prevent read time outs. The original fix was for client.logs() only but on further review it made sense to replace all occurances of _multiplexed_socket_stream_helper with the new method.
1 parent d3a2d90 commit 9676feb

File tree

1 file changed

+14
-28
lines changed

1 file changed

+14
-28
lines changed

docker/client.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -320,40 +320,26 @@ def _multiplexed_buffer_helper(self, response):
320320
walker = end
321321
yield buf[start:end]
322322

323-
def _multiplexed_socket_stream_helper(self, response):
323+
def _multiplexed_response_stream_helper(self, response):
324324
"""A generator of multiplexed data blocks coming from a response
325-
socket."""
326-
socket = self._get_raw_response_socket(response)
327-
328-
def recvall(socket, size):
329-
blocks = []
330-
while size > 0:
331-
if six.PY3:
332-
block = socket._sock.recv(size)
333-
else:
334-
block = socket.recv(size)
335-
if not block:
336-
return None
337-
338-
blocks.append(block)
339-
size -= len(block)
325+
stream."""
340326

341-
sep = bytes() if six.PY3 else str()
342-
data = sep.join(blocks)
343-
return data
327+
# Disable timeout on the underlying socket to prevent
328+
# Read timed out(s) for long running processes
329+
socket = self._get_raw_response_socket(response)
330+
if six.PY3:
331+
socket._sock.settimeout(None)
332+
else:
333+
socket.settimeout(None)
344334

345335
while True:
346-
if six.PY3:
347-
socket._sock.settimeout(None)
348-
else:
349-
socket.settimeout(None)
350-
header = recvall(socket, STREAM_HEADER_SIZE_BYTES)
336+
header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
351337
if not header:
352338
break
353339
_, length = struct.unpack('>BxxxL', header)
354340
if not length:
355341
break
356-
data = recvall(socket, length)
342+
data = response.raw.read(length)
357343
if not data:
358344
break
359345
yield data
@@ -387,7 +373,7 @@ def stream_result():
387373

388374
sep = bytes() if six.PY3 else str()
389375

390-
return stream and self._multiplexed_socket_stream_helper(response) or \
376+
return stream and self._multiplexed_response_stream_helper(response) or \
391377
sep.join([x for x in self._multiplexed_buffer_helper(response)])
392378

393379
def attach_socket(self, container, params=None, ws=False):
@@ -604,7 +590,7 @@ def execute(self, container, cmd, detach=False, stdout=True, stderr=True,
604590
data=data, stream=stream)
605591
self._raise_for_status(res)
606592
if stream:
607-
return self._multiplexed_socket_stream_helper(res)
593+
return self._multiplexed_response_stream_helper(res)
608594
elif six.PY3:
609595
return bytes().join(
610596
[x for x in self._multiplexed_buffer_helper(res)]
@@ -774,7 +760,7 @@ def logs(self, container, stdout=True, stderr=True, stream=False,
774760
url = self._url("/containers/{0}/logs".format(container))
775761
res = self._get(url, params=params, stream=stream)
776762
if stream:
777-
return self._multiplexed_socket_stream_helper(res)
763+
return self._multiplexed_response_stream_helper(res)
778764
elif six.PY3:
779765
return bytes().join(
780766
[x for x in self._multiplexed_buffer_helper(res)]

0 commit comments

Comments
 (0)