Skip to content

Commit 89e0660

Browse files
ztzgjeffwidman
authored andcommitted
fix(core): do not allow responses to choke request and ping processing
Without this patch, a single select event is processed by iteration in the 'ConnectionHandler' event loop. In a scenario where the client issues a large number of async requests with an important amplification factor, e.g. 'get_children_async' on a large node, it is possible for the 'select' operation to almost always return a "response ready" socket--as the server is often able to process, serialize and ship a new reponse while Kazoo processes the previous one. That response socket often (always?) ends up at the beginning of the list returned by 'select'. As only 'select_result[0]' is processed in the loop, this can cause the client to ignore the "request ready" FD for a long time, during which no requests or pings are sent. In effect, asynchronously "browsing" a large tree of nodes can stretch that duration to the point where it exceeds the timeout--causing the client to lose its session. This patch considers both descriptors after 'select', and also arranges for pings to be sent in case it encounters an "unending" stream of responses to requests which were sent earlier.
1 parent b2f7a46 commit 89e0660

File tree

1 file changed

+21
-10
lines changed

1 file changed

+21
-10
lines changed

kazoo/protocol/connection.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,6 @@ def _connect_loop(self, retry):
561561
def _connect_attempt(self, host, hostip, port, retry):
562562
client = self.client
563563
KazooTimeoutError = self.handler.timeout_exception
564-
close_connection = False
565564

566565
self._socket = None
567566

@@ -582,13 +581,14 @@ def _connect_attempt(self, host, hostip, port, retry):
582581
connect_timeout = connect_timeout / 1000.0
583582
retry.reset()
584583
self.ping_outstanding.clear()
584+
last_send = time.time()
585585
with self._socket_error_handling():
586-
while not close_connection:
586+
while True:
587587
# Watch for something to read or send
588-
jitter_time = random.randint(0, 40) / 100.0
588+
jitter_time = random.randint(1, 40) / 100.0
589+
deadline = last_send + read_timeout / 2.0 - jitter_time
589590
# Ensure our timeout is positive
590-
timeout = max([read_timeout / 2.0 - jitter_time,
591-
jitter_time])
591+
timeout = max([deadline - time.time(), jitter_time])
592592
s = self.handler.select([self._socket, self._read_sock],
593593
[], [], timeout)[0]
594594

@@ -597,12 +597,23 @@ def _connect_attempt(self, host, hostip, port, retry):
597597
self.ping_outstanding.clear()
598598
raise ConnectionDropped(
599599
"outstanding heartbeat ping not received")
600-
self._send_ping(connect_timeout)
601-
elif s[0] == self._socket:
602-
response = self._read_socket(read_timeout)
603-
close_connection = response == CLOSE_RESPONSE
604600
else:
605-
self._send_request(read_timeout, connect_timeout)
601+
if self._socket in s:
602+
response = self._read_socket(read_timeout)
603+
if response == CLOSE_RESPONSE:
604+
break
605+
# Check if any requests need sending before proceeding
606+
# to process more responses. Otherwise the responses
607+
# may choke out the requests. See PR#633.
608+
if self._read_sock in s:
609+
self._send_request(read_timeout, connect_timeout)
610+
# Requests act as implicit pings.
611+
last_send = time.time()
612+
continue
613+
614+
if time.time() >= deadline:
615+
self._send_ping(connect_timeout)
616+
last_send = time.time()
606617
self.logger.info('Closing connection to %s:%s', host, port)
607618
client._session_callback(KeeperState.CLOSED)
608619
return STOP_CONNECTING

0 commit comments

Comments
 (0)