Skip to content

Commit a520232

Browse files
authored
Improve connection state logging (#2574)
1 parent c75b85b commit a520232

File tree

2 files changed

+26
-25
lines changed

2 files changed

+26
-25
lines changed

kafka/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ def _conn_state_change(self, node_id, sock, conn):
365365
self._connecting.remove(node_id)
366366
try:
367367
self._selector.unregister(sock)
368-
except KeyError:
368+
except (KeyError, ValueError):
369369
pass
370370

371371
if self._sensors:

kafka/conn.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ def _init_sasl_mechanism(self):
319319
def _dns_lookup(self):
320320
self._gai = dns_lookup(self.host, self.port, self.afi)
321321
if not self._gai:
322-
log.error('DNS lookup failed for %s:%i (%s)',
323-
self.host, self.port, self.afi)
322+
log.error('%s: DNS lookup failed for %s:%i (%s)',
323+
self, self.host, self.port, self.afi)
324324
return False
325325
return True
326326

@@ -366,6 +366,7 @@ def connect_blocking(self, timeout=float('inf')):
366366
def connect(self):
367367
"""Attempt to connect and return ConnectionState"""
368368
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
369+
self.state = ConnectionStates.CONNECTING
369370
self.last_attempt = time.time()
370371
next_lookup = self._next_afi_sockaddr()
371372
if not next_lookup:
@@ -390,7 +391,6 @@ def connect(self):
390391
self._sock.setsockopt(*option)
391392

392393
self._sock.setblocking(False)
393-
self.state = ConnectionStates.CONNECTING
394394
self.config['state_change_callback'](self.node_id, self._sock, self)
395395
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
396396
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
@@ -412,20 +412,20 @@ def connect(self):
412412
log.debug('%s: established TCP connection', self)
413413

414414
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
415-
log.debug('%s: initiating SSL handshake', self)
416415
self.state = ConnectionStates.HANDSHAKE
416+
log.debug('%s: initiating SSL handshake', self)
417417
self.config['state_change_callback'](self.node_id, self._sock, self)
418418
# _wrap_ssl can alter the connection state -- disconnects on failure
419419
self._wrap_ssl()
420420
else:
421-
log.debug('%s: checking broker Api Versions', self)
422421
self.state = ConnectionStates.API_VERSIONS_SEND
422+
log.debug('%s: checking broker Api Versions', self)
423423
self.config['state_change_callback'](self.node_id, self._sock, self)
424424

425425
# Connection failed
426426
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
427427
elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
428-
log.error('Connect attempt to %s returned error %s.'
428+
log.error('%s: Connect attempt returned error %s.'
429429
' Disconnecting.', self, ret)
430430
errstr = errno.errorcode.get(ret, 'UNKNOWN')
431431
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
@@ -438,22 +438,22 @@ def connect(self):
438438
if self.state is ConnectionStates.HANDSHAKE:
439439
if self._try_handshake():
440440
log.debug('%s: completed SSL handshake.', self)
441-
log.debug('%s: checking broker Api Versions', self)
442441
self.state = ConnectionStates.API_VERSIONS_SEND
442+
log.debug('%s: checking broker Api Versions', self)
443443
self.config['state_change_callback'](self.node_id, self._sock, self)
444444

445445
if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV):
446446
if self._try_api_versions_check():
447447
# _try_api_versions_check has side-effects: possibly disconnected on socket errors
448448
if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV):
449449
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
450-
log.debug('%s: initiating SASL authentication', self)
451450
self.state = ConnectionStates.AUTHENTICATING
451+
log.debug('%s: initiating SASL authentication', self)
452452
self.config['state_change_callback'](self.node_id, self._sock, self)
453453
else:
454454
# security_protocol PLAINTEXT
455-
log.info('%s: Connection complete.', self)
456455
self.state = ConnectionStates.CONNECTED
456+
log.info('%s: Connection complete.', self)
457457
self._reset_reconnect_backoff()
458458
self.config['state_change_callback'](self.node_id, self._sock, self)
459459

@@ -462,8 +462,8 @@ def connect(self):
462462
if self._try_authenticate():
463463
# _try_authenticate has side-effects: possibly disconnected on socket errors
464464
if self.state is ConnectionStates.AUTHENTICATING:
465-
log.info('%s: Connection complete.', self)
466465
self.state = ConnectionStates.CONNECTED
466+
log.info('%s: Connection complete.', self)
467467
self._reset_reconnect_backoff()
468468
self.config['state_change_callback'](self.node_id, self._sock, self)
469469

@@ -472,7 +472,7 @@ def connect(self):
472472
# Connection timed out
473473
request_timeout = self.config['request_timeout_ms'] / 1000.0
474474
if time.time() > request_timeout + self.last_attempt:
475-
log.error('Connection attempt to %s timed out', self)
475+
log.error('%s: Connection attempt timed out', self)
476476
self.close(Errors.KafkaConnectionError('timeout'))
477477
return self.state
478478

@@ -531,7 +531,7 @@ def _try_handshake(self):
531531
except (SSLWantReadError, SSLWantWriteError):
532532
pass
533533
except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError):
534-
log.warning('SSL connection closed by server during handshake.')
534+
log.warning('%s: SSL connection closed by server during handshake.', self)
535535
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
536536
# Other SSLErrors will be raised to user
537537

@@ -611,7 +611,7 @@ def _handle_api_versions_response(self, future, response):
611611
for api_key, min_version, max_version, *rest in response.api_versions
612612
])
613613
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
614-
log.info('Broker version identified as %s', '.'.join(map(str, self._api_version)))
614+
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
615615
future.success(self._api_version)
616616
self.connect()
617617

@@ -621,7 +621,7 @@ def _handle_api_versions_failure(self, future, ex):
621621
# after failure connection is closed, so state should already be DISCONNECTED
622622

623623
def _handle_check_version_response(self, future, version, _response):
624-
log.info('Broker version identified as %s', '.'.join(map(str, version)))
624+
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, version)))
625625
log.info('Set configuration api_version=%s to skip auto'
626626
' check_version requests on startup', version)
627627
self._api_versions = BROKER_API_VERSIONS[version]
@@ -751,7 +751,7 @@ def _send_sasl_authenticate(self, sasl_auth_bytes):
751751
request = SaslAuthenticateRequest[0](sasl_auth_bytes)
752752
self._send(request, blocking=True)
753753
else:
754-
log.debug('Sending %d raw sasl auth bytes to server', len(sasl_auth_bytes))
754+
log.debug('%s: Sending %d raw sasl auth bytes to server', self, len(sasl_auth_bytes))
755755
try:
756756
self._send_bytes_blocking(Int32.encode(len(sasl_auth_bytes)) + sasl_auth_bytes)
757757
except (ConnectionError, TimeoutError) as e:
@@ -781,7 +781,7 @@ def _recv_sasl_authenticate(self):
781781
latency_ms = (time.time() - timestamp) * 1000
782782
if self._sensors:
783783
self._sensors.request_time.record(latency_ms)
784-
log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
784+
log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
785785

786786
error_type = Errors.for_code(response.error_code)
787787
if error_type is not Errors.NoError:
@@ -792,7 +792,7 @@ def _recv_sasl_authenticate(self):
792792
return response.auth_bytes
793793
else:
794794
# unframed bytes w/ SaslHandhake v0
795-
log.debug('Received %d raw sasl auth bytes from server', nbytes)
795+
log.debug('%s: Received %d raw sasl auth bytes from server', self, nbytes)
796796
return data[4:]
797797

798798
def _sasl_authenticate(self, future):
@@ -956,7 +956,8 @@ def close(self, error=None):
956956

957957
# drop lock before state change callback and processing futures
958958
self.config['state_change_callback'](self.node_id, sock, self)
959-
sock.close()
959+
if sock:
960+
sock.close()
960961
for (_correlation_id, (future, _timestamp, _timeout)) in ifrs:
961962
future.failure(error)
962963

@@ -1002,7 +1003,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None):
10021003

10031004
correlation_id = self._protocol.send_request(request)
10041005

1005-
log.debug('%s Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request)
1006+
log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request)
10061007
if request.expect_response():
10071008
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
10081009
sent_time = time.time()
@@ -1036,7 +1037,7 @@ def send_pending_requests(self):
10361037
return True
10371038

10381039
except (ConnectionError, TimeoutError) as e:
1039-
log.exception("Error sending request data to %s", self)
1040+
log.exception("%s: Error sending request data", self)
10401041
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
10411042
self.close(error=error)
10421043
return False
@@ -1069,7 +1070,7 @@ def send_pending_requests_v2(self):
10691070
return len(self._send_buffer) == 0
10701071

10711072
except (ConnectionError, TimeoutError, Exception) as e:
1072-
log.exception("Error sending request data to %s", self)
1073+
log.exception("%s: Error sending request data", self)
10731074
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
10741075
self.close(error=error)
10751076
return False
@@ -1106,7 +1107,7 @@ def recv(self):
11061107
if not responses and self.requests_timed_out():
11071108
timed_out = self.timed_out_ifrs()
11081109
timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000
1109-
log.warning('%s timed out after %s ms. Closing connection.',
1110+
log.warning('%s: timed out after %s ms. Closing connection.',
11101111
self, timeout_ms)
11111112
self.close(error=Errors.RequestTimedOutError(
11121113
'Request timed out after %s ms' %
@@ -1125,7 +1126,7 @@ def recv(self):
11251126
if self._sensors:
11261127
self._sensors.request_time.record(latency_ms)
11271128

1128-
log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
1129+
log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
11291130
self._maybe_throttle(response)
11301131
responses[i] = (response, future)
11311132

@@ -1137,7 +1138,7 @@ def _recv(self):
11371138
err = None
11381139
with self._lock:
11391140
if not self._can_send_recv():
1140-
log.warning('%s cannot recv: socket not connected', self)
1141+
log.warning('%s: cannot recv: socket not connected', self)
11411142
return ()
11421143

11431144
while len(recvd) < self.config['sock_chunk_buffer_count']:

0 commit comments

Comments
 (0)