diff --git a/kafka/conn.py b/kafka/conn.py old mode 100644 new mode 100755 index 64445fab0..4a0d64a28 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,6 +326,9 @@ def _dns_lookup(self): return True def _next_afi_sockaddr(self): + if self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): + return (socket.AF_UNSPEC, (self.host, self.port)) + if not self._gai: if not self._dns_lookup(): return @@ -379,6 +382,7 @@ def connect(self): self._sock_afi, self._sock_addr = next_lookup try: if self.config["socks5_proxy"] is not None: + log.debug('%s: initializing Socks5 proxy at %s', self, self.config["socks5_proxy"]) self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) else: @@ -864,6 +868,8 @@ def connection_delay(self): if self.disconnected() or self.connecting(): if len(self._gai) > 0: return 0 + elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): + return 0 else: time_waited = time.time() - self.last_attempt return max(self._reconnect_backoff - time_waited, 0) * 1000 @@ -964,6 +970,7 @@ def close(self, error=None): # the socket fd from selectors cleanly. sock = self._sock self._sock = None + self._socks5_proxy = None # drop lock before state change callback and processing futures self.config['state_change_callback'](self.node_id, sock, self) diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py old mode 100644 new mode 100755 index 18bea7c8d..6715f2093 --- a/kafka/socks5_wrapper.py +++ b/kafka/socks5_wrapper.py @@ -64,6 +64,15 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) return [] + @classmethod + def use_remote_lookup(cls, proxy_url): + if proxy_url is None: + return False + return urlparse(proxy_url).scheme == 'socks5h' + + def _use_remote_lookup(self): + return self._proxy_url.scheme == 'socks5h' + def socket(self, family, sock_type): """Open and record a socket. @@ -187,7 +196,10 @@ def connect_ex(self, addr): return errno.ECONNREFUSED if self._state == ProxyConnectionStates.REQUEST_SUBMIT: - if self._target_afi == socket.AF_INET: + if self._use_remote_lookup(): + addr_type = 3 + addr_len = len(addr[0]) + elif self._target_afi == socket.AF_INET: addr_type = 1 addr_len = 4 elif self._target_afi == socket.AF_INET6: @@ -200,14 +212,28 @@ def connect_ex(self, addr): return errno.ECONNREFUSED self._buffer_out = struct.pack( - "!bbbb{}sh".format(addr_len), + "!bbbb", 5, # version 1, # command: connect 0, # reserved - addr_type, # 1 for ipv4, 4 for ipv6 address - socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address - addr[1], # port + addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name ) + # Addr format depends on type + if addr_type == 3: + # len + domain name (no null terminator) + self._buffer_out += struct.pack( + "!b{}s".format(addr_len), + addr_len, + addr[0].encode('ascii'), + ) + else: + # either 4 (type 1) or 16 (type 4) bytes of actual address + self._buffer_out += struct.pack( + "!{}s".format(addr_len), + socket.inet_pton(self._target_afi, addr[0]), + ) + self._buffer_out += struct.pack("!H", addr[1]) # port + self._state = ProxyConnectionStates.REQUESTING if self._state == ProxyConnectionStates.REQUESTING: