Skip to content

Commit 1cc78c2

Browse files
committed
allow per-request timeouts; use api_version_auto_timeout_ms
1 parent 06517ce commit 1cc78c2

File tree

2 files changed

+19
-17
lines changed

2 files changed

+19
-17
lines changed

kafka/conn.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class BrokerConnection(object):
169169
(0, 10). Default: (0, 8, 2)
170170
api_version_auto_timeout_ms (int): number of milliseconds to throw a
171171
timeout exception from the constructor when checking the broker
172-
api version. Only applies if api_version is None
172+
api version. Only applies if api_version is None. Default: 2000.
173173
selector (selectors.BaseSelector): Provide a specific selector
174174
implementation to use for I/O multiplexing.
175175
Default: selectors.DefaultSelector
@@ -215,6 +215,7 @@ class BrokerConnection(object):
215215
'ssl_password': None,
216216
'ssl_ciphers': None,
217217
'api_version': (0, 8, 2), # default to most restrictive
218+
'api_version_auto_timeout_ms': 2000,
218219
'selector': selectors.DefaultSelector,
219220
'state_change_callback': lambda node_id, sock, conn: True,
220221
'metrics': None,
@@ -543,14 +544,14 @@ def _try_api_versions_check(self):
543544
# ((0, 10), ApiVersionRequest[0]()),
544545
request = ApiVersionRequest[0]()
545546
future = Future()
546-
response = self._send(request, blocking=True)
547+
response = self._send(request, blocking=True, request_timeout_ms=self.config['api_version_auto_timeout_ms'])
547548
response.add_callback(self._handle_api_versions_response, future)
548549
response.add_errback(self._handle_api_versions_failure, future)
549550
self._api_versions_future = future
550551
elif self._check_version_idx < len(self.VERSION_CHECKS):
551552
version, request = self.VERSION_CHECKS[self._check_version_idx]
552553
future = Future()
553-
response = self._send(request, blocking=True)
554+
response = self._send(request, blocking=True, request_timeout_ms=self.config['api_version_auto_timeout_ms'])
554555
response.add_callback(self._handle_check_version_response, future, version)
555556
response.add_errback(self._handle_check_version_failure, future)
556557
self._api_versions_future = future
@@ -1038,14 +1039,14 @@ def close(self, error=None):
10381039
# drop lock before state change callback and processing futures
10391040
self.config['state_change_callback'](self.node_id, sock, self)
10401041
sock.close()
1041-
for (_correlation_id, (future, _timestamp)) in ifrs:
1042+
for (_correlation_id, (future, _timestamp, _timeout)) in ifrs:
10421043
future.failure(error)
10431044

10441045
def _can_send_recv(self):
10451046
"""Return True iff socket is ready for requests / responses"""
10461047
return self.connected() or self.initializing()
10471048

1048-
def send(self, request, blocking=True):
1049+
def send(self, request, blocking=True, request_timeout_ms=None):
10491050
"""Queue request for async network send, return Future()"""
10501051
future = Future()
10511052
if self.connecting():
@@ -1054,9 +1055,9 @@ def send(self, request, blocking=True):
10541055
return future.failure(Errors.KafkaConnectionError(str(self)))
10551056
elif not self.can_send_more():
10561057
return future.failure(Errors.TooManyInFlightRequests(str(self)))
1057-
return self._send(request, blocking=blocking)
1058+
return self._send(request, blocking=blocking, request_timeout_ms=request_timeout_ms)
10581059

1059-
def _send(self, request, blocking=True):
1060+
def _send(self, request, blocking=True, request_timeout_ms=None):
10601061
future = Future()
10611062
with self._lock:
10621063
if not self._can_send_recv():
@@ -1069,9 +1070,11 @@ def _send(self, request, blocking=True):
10691070

10701071
log.debug('%s Request %d: %s', self, correlation_id, request)
10711072
if request.expect_response():
1072-
sent_time = time.time()
10731073
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
1074-
self.in_flight_requests[correlation_id] = (future, sent_time)
1074+
sent_time = time.time()
1075+
request_timeout_ms = request_timeout_ms or self.config['request_timeout_ms']
1076+
timeout_at = sent_time + (request_timeout_ms / 1000)
1077+
self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at)
10751078
else:
10761079
future.success(None)
10771080

@@ -1161,7 +1164,7 @@ def recv(self):
11611164
for i, (correlation_id, response) in enumerate(responses):
11621165
try:
11631166
with self._lock:
1164-
(future, timestamp) = self.in_flight_requests.pop(correlation_id)
1167+
(future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id)
11651168
except KeyError:
11661169
self.close(Errors.KafkaConnectionError('Received unrecognized correlation id'))
11671170
return ()
@@ -1235,10 +1238,9 @@ def requests_timed_out(self):
12351238
def next_ifr_request_timeout_ms(self):
12361239
with self._lock:
12371240
if self.in_flight_requests:
1238-
get_timestamp = lambda v: v[1]
1239-
oldest_at = min(map(get_timestamp,
1240-
self.in_flight_requests.values()))
1241-
next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0
1241+
get_timeout = lambda v: v[2]
1242+
next_timeout = min(map(get_timeout,
1243+
self.in_flight_requests.values()))
12421244
return max(0, (next_timeout - time.time()) * 1000)
12431245
else:
12441246
return float('inf')

test/test_conn.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,14 @@ def test_requests_timed_out(conn):
351351
# No in-flight requests, not timed out
352352
assert not conn.requests_timed_out()
353353

354-
# Single request, timestamp = now (0)
355-
conn.in_flight_requests[0] = ('foo', 0)
354+
# Single request, timeout_at > now (0)
355+
conn.in_flight_requests[0] = ('foo', 0, 1)
356356
assert not conn.requests_timed_out()
357357

358358
# Add another request w/ timestamp > request_timeout ago
359359
request_timeout = conn.config['request_timeout_ms']
360360
expired_timestamp = 0 - request_timeout - 1
361-
conn.in_flight_requests[1] = ('bar', expired_timestamp)
361+
conn.in_flight_requests[1] = ('bar', 0, expired_timestamp)
362362
assert conn.requests_timed_out()
363363

364364
# Drop the expired request and we should be good to go again

0 commit comments

Comments
 (0)