Skip to content

Commit dd743db

Browse files
rycus86shin-
authored andcommitted
Allow cancelling the streams from other threads
Signed-off-by: Viktor Adam <[email protected]>
1 parent d310d95 commit dd743db

File tree

7 files changed

+181
-10
lines changed

7 files changed

+181
-10
lines changed

docker/api/container.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from .. import utils
66
from ..constants import DEFAULT_DATA_CHUNK_SIZE
77
from ..types import (
8-
ContainerConfig, EndpointConfig, HostConfig, NetworkingConfig
8+
CancellableStream, ContainerConfig, EndpointConfig, HostConfig,
9+
NetworkingConfig
910
)
1011

1112

@@ -52,10 +53,15 @@ def attach(self, container, stdout=True, stderr=True,
5253
u = self._url("/containers/{0}/attach", container)
5354
response = self._post(u, headers=headers, params=params, stream=True)
5455

55-
return self._read_from_socket(
56+
output = self._read_from_socket(
5657
response, stream, self._check_is_tty(container)
5758
)
5859

60+
if stream:
61+
return CancellableStream(output, response)
62+
else:
63+
return output
64+
5965
@utils.check_resource('container')
6066
def attach_socket(self, container, params=None, ws=False):
6167
"""
@@ -815,7 +821,12 @@ def logs(self, container, stdout=True, stderr=True, stream=False,
815821

816822
url = self._url("/containers/{0}/logs", container)
817823
res = self._get(url, params=params, stream=stream)
818-
return self._get_result(container, stream, res)
824+
output = self._get_result(container, stream, res)
825+
826+
if stream:
827+
return CancellableStream(output, res)
828+
else:
829+
return output
819830

820831
@utils.check_resource('container')
821832
def pause(self, container):

docker/api/daemon.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
from datetime import datetime
33

4-
from .. import auth, utils
4+
from .. import auth, types, utils
55

66

77
class DaemonApiMixin(object):
@@ -34,8 +34,7 @@ def events(self, since=None, until=None, filters=None, decode=None):
3434
the fly. False by default.
3535
3636
Returns:
37-
(generator): A blocking generator you can iterate over to retrieve
38-
events as they happen.
37+
A :py:class:`docker.types.daemon.CancellableStream` generator
3938
4039
Raises:
4140
:py:class:`docker.errors.APIError`
@@ -50,6 +49,14 @@ def events(self, since=None, until=None, filters=None, decode=None):
5049
u'status': u'start',
5150
u'time': 1423339459}
5251
...
52+
53+
or
54+
55+
>>> events = client.events()
56+
>>> for event in events:
57+
... print event
58+
>>> # and cancel from another thread
59+
>>> events.close()
5360
"""
5461

5562
if isinstance(since, datetime):
@@ -68,10 +75,10 @@ def events(self, since=None, until=None, filters=None, decode=None):
6875
}
6976
url = self._url('/events')
7077

71-
return self._stream_helper(
72-
self._get(url, params=params, stream=True, timeout=None),
73-
decode=decode
74-
)
78+
response = self._get(url, params=params, stream=True, timeout=None)
79+
stream = self._stream_helper(response, decode=decode)
80+
81+
return types.CancellableStream(stream, response)
7582

7683
def info(self):
7784
"""

docker/types/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# flake8: noqa
22
from .containers import ContainerConfig, HostConfig, LogConfig, Ulimit
3+
from .daemon import CancellableStream
34
from .healthcheck import Healthcheck
45
from .networks import EndpointConfig, IPAMConfig, IPAMPool, NetworkingConfig
56
from .services import (

docker/types/daemon.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import socket
2+
3+
try:
4+
import requests.packages.urllib3 as urllib3
5+
except ImportError:
6+
import urllib3
7+
8+
9+
class CancellableStream(object):
10+
"""
11+
Stream wrapper for real-time events, logs, etc. from the server.
12+
13+
Example:
14+
>>> events = client.events()
15+
>>> for event in events:
16+
... print event
17+
>>> # and cancel from another thread
18+
>>> events.close()
19+
"""
20+
21+
def __init__(self, stream, response):
22+
self._stream = stream
23+
self._response = response
24+
25+
def __iter__(self):
26+
return self
27+
28+
def __next__(self):
29+
try:
30+
return next(self._stream)
31+
except urllib3.exceptions.ProtocolError:
32+
raise StopIteration
33+
except socket.error:
34+
raise StopIteration
35+
36+
next = __next__
37+
38+
def close(self):
39+
"""
40+
Closes the event streaming.
41+
"""
42+
43+
if not self._response.raw.closed:
44+
# find the underlying socket object
45+
# based on api.client._get_raw_response_socket
46+
47+
sock_fp = self._response.raw._fp.fp
48+
49+
if hasattr(sock_fp, 'raw'):
50+
sock_raw = sock_fp.raw
51+
52+
if hasattr(sock_raw, 'sock'):
53+
sock = sock_raw.sock
54+
55+
elif hasattr(sock_raw, '_sock'):
56+
sock = sock_raw._sock
57+
58+
else:
59+
sock = sock_fp._sock
60+
61+
sock.shutdown(socket.SHUT_RDWR)
62+
sock.makefile().close()
63+
sock.close()

tests/integration/api_container_test.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import re
33
import signal
44
import tempfile
5+
import threading
56
from datetime import datetime
67

78
import docker
@@ -880,6 +881,30 @@ def test_logs_streaming_and_follow(self):
880881

881882
assert logs == (snippet + '\n').encode(encoding='ascii')
882883

884+
def test_logs_streaming_and_follow_and_cancel(self):
885+
snippet = 'Flowering Nights (Sakuya Iyazoi)'
886+
container = self.client.create_container(
887+
BUSYBOX, 'sh -c "echo \\"{0}\\" && sleep 3"'.format(snippet)
888+
)
889+
id = container['Id']
890+
self.tmp_containers.append(id)
891+
self.client.start(id)
892+
logs = six.binary_type()
893+
894+
generator = self.client.logs(id, stream=True, follow=True)
895+
896+
exit_timer = threading.Timer(3, os._exit, args=[1])
897+
exit_timer.start()
898+
899+
threading.Timer(1, generator.close).start()
900+
901+
for chunk in generator:
902+
logs += chunk
903+
904+
exit_timer.cancel()
905+
906+
assert logs == (snippet + '\n').encode(encoding='ascii')
907+
883908
def test_logs_with_dict_instead_of_id(self):
884909
snippet = 'Flowering Nights (Sakuya Iyazoi)'
885910
container = self.client.create_container(
@@ -1226,6 +1251,29 @@ def test_attach_no_stream(self):
12261251
output = self.client.attach(container, stream=False, logs=True)
12271252
assert output == 'hello\n'.encode(encoding='ascii')
12281253

1254+
def test_attach_stream_and_cancel(self):
1255+
container = self.client.create_container(
1256+
BUSYBOX, 'sh -c "echo hello && sleep 60"',
1257+
tty=True
1258+
)
1259+
self.tmp_containers.append(container)
1260+
self.client.start(container)
1261+
output = self.client.attach(container, stream=True, logs=True)
1262+
1263+
exit_timer = threading.Timer(3, os._exit, args=[1])
1264+
exit_timer.start()
1265+
1266+
threading.Timer(1, output.close).start()
1267+
1268+
lines = []
1269+
for line in output:
1270+
lines.append(line)
1271+
1272+
exit_timer.cancel()
1273+
1274+
assert len(lines) == 1
1275+
assert lines[0] == 'hello\r\n'.encode(encoding='ascii')
1276+
12291277
def test_detach_with_default(self):
12301278
container = self.client.create_container(
12311279
BUSYBOX, 'cat',

tests/integration/client_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import threading
12
import unittest
23

34
import docker
45

6+
from datetime import datetime, timedelta
7+
58
from ..helpers import requires_api_version
69
from .base import TEST_API_VERSION
710

@@ -27,3 +30,20 @@ def test_df(self):
2730
assert 'Containers' in data
2831
assert 'Volumes' in data
2932
assert 'Images' in data
33+
34+
35+
class CancellableEventsTest(unittest.TestCase):
36+
client = docker.from_env(version=TEST_API_VERSION)
37+
38+
def test_cancel_events(self):
39+
start = datetime.now()
40+
41+
events = self.client.events(until=start + timedelta(seconds=5))
42+
43+
cancel_thread = threading.Timer(2, events.close)
44+
cancel_thread.start()
45+
46+
for _ in events:
47+
pass
48+
49+
self.assertLess(datetime.now() - start, timedelta(seconds=3))

tests/integration/models_containers_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import os
12
import tempfile
3+
import threading
24

35
import docker
46
import pytest
@@ -141,6 +143,25 @@ def test_run_with_streamed_logs(self):
141143
assert logs[0] == b'hello\n'
142144
assert logs[1] == b'world\n'
143145

146+
def test_run_with_streamed_logs_and_cancel(self):
147+
client = docker.from_env(version=TEST_API_VERSION)
148+
out = client.containers.run(
149+
'alpine', 'sh -c "echo hello && echo world"', stream=True
150+
)
151+
152+
exit_timer = threading.Timer(3, os._exit, args=[1])
153+
exit_timer.start()
154+
155+
threading.Timer(1, out.close).start()
156+
157+
logs = [line for line in out]
158+
159+
exit_timer.cancel()
160+
161+
assert len(logs) == 2
162+
assert logs[0] == b'hello\n'
163+
assert logs[1] == b'world\n'
164+
144165
def test_get(self):
145166
client = docker.from_env(version=TEST_API_VERSION)
146167
container = client.containers.run("alpine", "sleep 300", detach=True)

0 commit comments

Comments
 (0)