Skip to content

Commit a34e0cb

Browse files
committed
Experimental npipe:// support
Signed-off-by: Joffrey F <[email protected]>
1 parent 88811a2 commit a34e0cb

File tree

7 files changed

+332
-1
lines changed

7 files changed

+332
-1
lines changed

docker/client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
from . import errors
2828
from .auth import auth
2929
from .unixconn import unixconn
30+
try:
31+
from .npipeconn import npipeconn
32+
except ImportError:
33+
pass
3034
from .ssladapter import ssladapter
3135
from .utils import utils, check_resource, update_headers, kwargs_from_env
3236
from .tls import TLSConfig
@@ -64,6 +68,14 @@ def __init__(self, base_url=None, version=None,
6468
self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
6569
self.mount('http+docker://', self._custom_adapter)
6670
self.base_url = 'http+docker://localunixsocket'
71+
elif base_url.startswith('npipe://'):
72+
if not constants.IS_WINDOWS_PLATFORM:
73+
raise errors.DockerException(
74+
'The npipe:// protocol is only supported on Windows'
75+
)
76+
self._custom_adapter = npipeconn.NpipeAdapter(base_url, timeout)
77+
self.mount('http+docker://', self._custom_adapter)
78+
self.base_url = 'http+docker://localnpipe'
6779
else:
6880
# Use SSLAdapter for the ability to specify SSL version
6981
if isinstance(tls, TLSConfig):

docker/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import sys
2+
13
DEFAULT_DOCKER_API_VERSION = '1.22'
24
DEFAULT_TIMEOUT_SECONDS = 60
35
STREAM_HEADER_SIZE_BYTES = 8
@@ -8,3 +10,5 @@
810
INSECURE_REGISTRY_DEPRECATION_WARNING = \
911
'The `insecure_registry` argument to {} ' \
1012
'is deprecated and non-functional. Please remove it.'
13+
14+
IS_WINDOWS_PLATFORM = (sys.platform == 'win32')

docker/npipeconn/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .npipeconn import NpipeAdapter # flake8: noqa

docker/npipeconn/npipeconn.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import six
2+
import requests.adapters
3+
4+
from .npipesocket import NpipeSocket
5+
6+
if six.PY3:
7+
import http.client as httplib
8+
else:
9+
import httplib
10+
11+
try:
12+
import requests.packages.urllib3 as urllib3
13+
except ImportError:
14+
import urllib3
15+
16+
17+
RecentlyUsedContainer = urllib3._collections.RecentlyUsedContainer
18+
19+
20+
class NpipeHTTPConnection(httplib.HTTPConnection, object):
21+
def __init__(self, npipe_path, timeout=60):
22+
super(NpipeHTTPConnection, self).__init__(
23+
'localhost', timeout=timeout
24+
)
25+
self.npipe_path = npipe_path
26+
self.timeout = timeout
27+
28+
def connect(self):
29+
sock = NpipeSocket()
30+
sock.settimeout(self.timeout)
31+
sock.connect(self.npipe_path)
32+
self.sock = sock
33+
34+
35+
class NpipeHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
36+
def __init__(self, npipe_path, timeout=60):
37+
super(NpipeHTTPConnectionPool, self).__init__(
38+
'localhost', timeout=timeout
39+
)
40+
self.npipe_path = npipe_path
41+
self.timeout = timeout
42+
43+
def _new_conn(self):
44+
return NpipeHTTPConnection(
45+
self.npipe_path, self.timeout
46+
)
47+
48+
49+
class NpipeAdapter(requests.adapters.HTTPAdapter):
50+
def __init__(self, base_url, timeout=60):
51+
self.npipe_path = base_url.replace('npipe://', '')
52+
self.timeout = timeout
53+
self.pools = RecentlyUsedContainer(
54+
10, dispose_func=lambda p: p.close()
55+
)
56+
super(NpipeAdapter, self).__init__()
57+
58+
def get_connection(self, url, proxies=None):
59+
with self.pools.lock:
60+
pool = self.pools.get(url)
61+
if pool:
62+
return pool
63+
64+
pool = NpipeHTTPConnectionPool(
65+
self.npipe_path, self.timeout
66+
)
67+
self.pools[url] = pool
68+
69+
return pool
70+
71+
def request_url(self, request, proxies):
72+
# The select_proxy utility in requests errors out when the provided URL
73+
# doesn't have a hostname, like is the case when using a UNIX socket.
74+
# Since proxies are an irrelevant notion in the case of UNIX sockets
75+
# anyway, we simply return the path URL directly.
76+
# See also: https://github.com/docker/docker-py/issues/811
77+
return request.path_url
78+
79+
def close(self):
80+
self.pools.clear()

docker/npipeconn/npipesocket.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import functools
2+
import io
3+
4+
import win32file
5+
import win32pipe
6+
7+
cSECURITY_SQOS_PRESENT = 0x100000
8+
cSECURITY_ANONYMOUS = 0
9+
cPIPE_READMODE_MESSAGE = 2
10+
11+
12+
def check_closed(f):
13+
@functools.wraps(f)
14+
def wrapped(self, *args, **kwargs):
15+
if self._closed:
16+
raise RuntimeError(
17+
'Can not reuse socket after connection was closed.'
18+
)
19+
return f(self, *args, **kwargs)
20+
return wrapped
21+
22+
23+
class NpipeSocket(object):
24+
""" Partial implementation of the socket API over windows named pipes.
25+
This implementation is only designed to be used as a client socket,
26+
and server-specific methods (bind, listen, accept...) are not
27+
implemented.
28+
"""
29+
def __init__(self, handle=None):
30+
self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
31+
self._handle = handle
32+
self._closed = False
33+
34+
def accept(self):
35+
raise NotImplementedError()
36+
37+
def bind(self, address):
38+
raise NotImplementedError()
39+
40+
def close(self):
41+
self._handle.Close()
42+
self._closed = True
43+
44+
@check_closed
45+
def connect(self, address):
46+
win32pipe.WaitNamedPipe(address, self._timeout)
47+
handle = win32file.CreateFile(
48+
address,
49+
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
50+
0,
51+
None,
52+
win32file.OPEN_EXISTING,
53+
cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT,
54+
0
55+
)
56+
self.flags = win32pipe.GetNamedPipeInfo(handle)[0]
57+
# self.state = win32pipe.GetNamedPipeHandleState(handle)[0]
58+
59+
# if self.state & cPIPE_READMODE_MESSAGE != 0:
60+
# raise RuntimeError("message readmode pipes not supported")
61+
self._handle = handle
62+
self._address = address
63+
64+
@check_closed
65+
def connect_ex(self, address):
66+
return self.connect(address)
67+
68+
@check_closed
69+
def detach(self):
70+
self._closed = True
71+
return self._handle
72+
73+
@check_closed
74+
def dup(self):
75+
return NpipeSocket(self._handle)
76+
77+
@check_closed
78+
def fileno(self):
79+
return int(self._handle)
80+
81+
def getpeername(self):
82+
return self._address
83+
84+
def getsockname(self):
85+
return self._address
86+
87+
def getsockopt(self, level, optname, buflen=None):
88+
raise NotImplementedError()
89+
90+
def ioctl(self, control, option):
91+
raise NotImplementedError()
92+
93+
def listen(self, backlog):
94+
raise NotImplementedError()
95+
96+
def makefile(self, mode=None, bufsize=None):
97+
if mode.strip('b') != 'r':
98+
raise NotImplementedError()
99+
rawio = NpipeFileIOBase(self)
100+
if bufsize is None:
101+
bufsize = io.DEFAULT_BUFFER_SIZE
102+
return io.BufferedReader(rawio, buffer_size=bufsize)
103+
104+
@check_closed
105+
def recv(self, bufsize, flags=0):
106+
err, data = win32file.ReadFile(self._handle, bufsize)
107+
return data
108+
109+
@check_closed
110+
def recvfrom(self, bufsize, flags=0):
111+
data = self.recv(bufsize, flags)
112+
return (data, self._address)
113+
114+
@check_closed
115+
def recvfrom_into(self, buf, nbytes=0, flags=0):
116+
return self.recv_into(buf, nbytes, flags), self._address
117+
118+
@check_closed
119+
def recv_into(self, buf, nbytes=0):
120+
readbuf = buf
121+
if not isinstance(buf, memoryview):
122+
readbuf = memoryview(buf)
123+
124+
err, data = win32file.ReadFile(
125+
self._handle,
126+
readbuf[:nbytes] if nbytes else readbuf
127+
)
128+
return len(data)
129+
130+
@check_closed
131+
def send(self, string, flags=0):
132+
err, nbytes = win32file.WriteFile(self._handle, string)
133+
return nbytes
134+
135+
@check_closed
136+
def sendall(self, string, flags=0):
137+
return self.send(string, flags)
138+
139+
@check_closed
140+
def sendto(self, string, address):
141+
self.connect(address)
142+
return self.send(string)
143+
144+
def setblocking(self, flag):
145+
if flag:
146+
return self.settimeout(None)
147+
return self.settimeout(0)
148+
149+
def settimeout(self, value):
150+
if value is None:
151+
self._timeout = win32pipe.NMPWAIT_NOWAIT
152+
elif not isinstance(value, (float, int)) or value < 0:
153+
raise ValueError('Timeout value out of range')
154+
elif value == 0:
155+
self._timeout = win32pipe.NMPWAIT_USE_DEFAULT_WAIT
156+
else:
157+
self._timeout = value
158+
159+
def gettimeout(self):
160+
return self._timeout
161+
162+
def setsockopt(self, level, optname, value):
163+
raise NotImplementedError()
164+
165+
@check_closed
166+
def shutdown(self, how):
167+
return self.close()
168+
169+
170+
class NpipeFileIOBase(io.RawIOBase):
171+
def __init__(self, npipe_socket):
172+
self.sock = npipe_socket
173+
174+
def close(self):
175+
super(NpipeFileIOBase, self).close()
176+
self.sock = None
177+
178+
def fileno(self):
179+
return self.sock.fileno()
180+
181+
def isatty(self):
182+
return False
183+
184+
def readable(self):
185+
return True
186+
187+
def readinto(self, buf):
188+
return self.sock.recv_into(buf)
189+
190+
def seekable(self):
191+
return False
192+
193+
def writable(self):
194+
return False
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import win32pipe
2+
import win32file
3+
4+
import random
5+
6+
def pipe_name():
7+
return 'testpipe{}'.format(random.randint(0, 4096))
8+
9+
def create_pipe(name):
10+
handle = win32pipe.CreateNamedPipe(
11+
'//./pipe/{}'.format(name),
12+
win32pipe.PIPE_ACCESS_DUPLEX,
13+
win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT,
14+
128, 4096, 4096, 300, None
15+
)
16+
return handle
17+
18+
def rw_loop(pipe):
19+
err = win32pipe.ConnectNamedPipe(pipe, None)
20+
if err != 0:
21+
raise RuntimeError('Error code: {}'.format(err))
22+
while True:
23+
err, data = win32file.ReadFile(pipe, 4096, None)
24+
if err != 0:
25+
raise RuntimeError('Error code: {}'.format(err))
26+
print('Data received: ', data, len(data))
27+
win32file.WriteFile(pipe, b'ACK', None)
28+
29+
30+
def __main__():
31+
name = pipe_name()
32+
print('Initializing pipe {}'.format(name))
33+
pipe = create_pipe(name)
34+
print('Pipe created, entering server loop.')
35+
rw_loop(pipe)
36+
37+
__main__()

docker/utils/utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,9 @@ def parse_host(addr, platform=None, tls=False):
413413
elif addr.startswith('https://'):
414414
proto = "https"
415415
addr = addr[8:]
416+
elif addr.startswith('npipe://'):
417+
proto = 'npipe'
418+
addr = addr[8:]
416419
elif addr.startswith('fd://'):
417420
raise errors.DockerException("fd protocol is not implemented")
418421
else:
@@ -448,7 +451,7 @@ def parse_host(addr, platform=None, tls=False):
448451
else:
449452
host = addr
450453

451-
if proto == "http+unix":
454+
if proto == "http+unix" or proto == 'npipe':
452455
return "{0}://{1}".format(proto, host)
453456
return "{0}://{1}:{2}{3}".format(proto, host, port, path)
454457

0 commit comments

Comments
 (0)