diff --git a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java index e2c89d614..dabac9d35 100644 --- a/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java +++ b/proxy/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProxyRequestHandler.java @@ -16,6 +16,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.newNode; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; @@ -63,6 +66,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.NotImplementedException; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclOperation; @@ -204,6 +208,16 @@ public class KafkaProxyRequestHandler extends KafkaCommandDecoder { private AtomicInteger dummyCorrelationIdGenerator = new AtomicInteger(-1); private volatile boolean coordinatorNamespaceExists = false; + LoadingCache errorCounts = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.MINUTES) + .build(new CacheLoader<>() { + @Override + public AtomicInteger load(String key) { + return new AtomicInteger(0); + } + }); + + public KafkaProxyRequestHandler(String id, KafkaProtocolProxyMain.PulsarAdminProvider pulsarAdmin, AuthenticationService authenticationService, AuthorizationService authorizationService, @@ -571,6 +585,9 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, .stream() .collect(Collectors.toMap(Function.identity(), p -> new PartitionResponse(Errors.REQUEST_TIMED_OUT))); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, + broker.node.host(), + broker.node.port()); resultFuture.complete(new ProduceResponse(errorsMap)); return null; }); @@ -713,6 +730,9 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, .setErrorMessage(Errors.REQUEST_TIMED_OUT.message())); }); }); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, + kopBroker.host(), + kopBroker.port()); return null; }).whenComplete((ignore1, ignore2) -> { singlePartitionRequest.close(); @@ -815,7 +835,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, // all the partitions are owned by one single broker, // we can forward the whole request to the only broker log.debug("forward FULL fetch of {} parts to {}", numPartitions, first); - grabConnectionToBroker(first.node.host(), first.node.port()). + final String host = first.node.host(); + final int port = first.node.port(); + grabConnectionToBroker(host, port). forwardRequest(fetch) .thenAccept(response -> { resultFuture.complete(response); @@ -823,6 +845,7 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, log.error("bad error for FULL fetch", badError); FetchResponse fetchResponse = buildFetchErrorResponse(fetchRequest, fetchData, Errors.UNKNOWN_SERVER_ERROR); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, host, port); resultFuture.complete(fetchResponse); return null; }); @@ -963,6 +986,9 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch, fetchData.keySet().forEach(topicPartition -> errorsConsumer.accept(topicPartition, Errors.UNKNOWN_SERVER_ERROR) ); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, + kopBroker.host(), + kopBroker.port()); return null; }).whenComplete((ignore1, ignore2) -> { singlePartitionRequest.close(); @@ -1237,6 +1263,7 @@ R extends AbstractResponse> void sendRequestToAllTopicOwners( }).exceptionally(err -> { log.error("Error sending coordinator request to {} :{}", node, err); + incErrorCountAndMaybeDiscardConnectionToBroker(err, node.host(), node.port()); responseFromBroker.complete(errorBuilder.apply(request, err)); return null; }); @@ -1621,7 +1648,9 @@ protected void handleDeleteRecords(KafkaHeaderAndRequest deleteRecords, // all the partitions are owned by one single broker, // we can forward the whole request to the only broker log.debug("forward FULL DeleteRecords of {} parts to {}", numPartitions, first); - grabConnectionToBroker(first.node.host(), first.node.port()). + final String host = first.node.host(); + final int port = first.node.port(); + grabConnectionToBroker(host, port). forwardRequest(deleteRecords) .thenAccept(response -> { resultFuture.complete(response); @@ -1633,6 +1662,7 @@ protected void handleDeleteRecords(KafkaHeaderAndRequest deleteRecords, .stream() .collect(Collectors.toMap(Function.identity(), p -> Errors.UNKNOWN_SERVER_ERROR)); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, host, port); resultFuture.complete(KafkaResponseUtils.newDeleteRecords(errorsMap)); return null; }); @@ -1777,6 +1807,9 @@ protected void handleDeleteRecords(KafkaHeaderAndRequest deleteRecords, errorsConsumer.accept(topicPartition, Errors.UNKNOWN_SERVER_ERROR); }); }); + incErrorCountAndMaybeDiscardConnectionToBroker(badError, + kopBroker.host(), + kopBroker.port()); return null; }).whenComplete((ignore1, ignore2) -> { singlePartitionRequest.close(); @@ -2031,6 +2064,9 @@ protected void handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset, }).exceptionally(err -> { ListOffsetsResponseData responseData = buildDummyListOffsetsResponseData(entry.getKey()); + incErrorCountAndMaybeDiscardConnectionToBroker(err, + brokerAddress.node.host(), + brokerAddress.node.port()); onResponse.accept(fullPartitionName, responseData); return null; }).whenComplete((ignore1, ignore2) -> { @@ -2161,6 +2197,9 @@ protected void handleListOffsetRequestV1(KafkaHeaderAndRequest listOffset, }).exceptionally(err -> { ListOffsetsResponseData responseData = buildDummyListOffsetsResponseData(topicPartition); + incErrorCountAndMaybeDiscardConnectionToBroker(err, + brokerAddress.node.host(), + brokerAddress.node.port()); onResponse.accept(fullPartitionName, responseData); return null; }).whenComplete((ignore1, ignore2) -> { @@ -2342,6 +2381,8 @@ R extends AbstractResponse> void handleRequestWithCoordinator( }).exceptionally(err -> { log.error("Error sending {} coordinator for id {} request to {} :{}", coordinatorType, transactionalId, metadata.node, err); + incErrorCountAndMaybeDiscardConnectionToBroker(err, metadata.node.host(), + metadata.node.port()); resultFuture.complete(errorBuilder.apply(request, err)); return null; }); @@ -2479,6 +2520,7 @@ R extends AbstractResponse> void sendRequestToAllCoordinators( }).exceptionally(err -> { log.error("Error sending {} coordinator request to {} :{}", coordinatorType, node, err); + incErrorCountAndMaybeDiscardConnectionToBroker(err, node.host(), node.port()); responseFromBroker.complete(errorBuilder.apply(request, err)); return null; }); @@ -2719,7 +2761,39 @@ void discardConnectionToBroker(ConnectionToBroker connectionToBroker) { }); } - void forgetMetadataForFailedBroker(String brokerHost, int brokerPort) { + void incErrorCountAndMaybeDiscardConnectionToBroker(Throwable error, String brokerHost, int brokerPort) { + if (error instanceof KafkaException) { + if (log.isDebugEnabled()) { + log.debug("Skipping internal kafka exceptions for {}:{}", brokerHost, brokerPort, error); + } + return; + } + + String connectionKey = brokerHost + ":" + brokerPort; + int errorCount; + try { + errorCount = errorCounts.get(connectionKey).incrementAndGet(); + } catch (ExecutionException e) { + errorCount = 0; + } + if (errorCount > 10) { + log.info("Closing connection to {}:{} due to having {} errors/min; last error:", + brokerHost, brokerPort, errorCount, error); + discardConnectionToBroker(connectionKey); + } + } + + void discardConnectionToBroker(String connectionKey) { + connectionsToBrokers.compute(connectionKey, (key, existing) -> { + if (existing != null) { + existing.close(); + } + return null; + }); + + } + + void forgetMetadataForFailedBroker(String brokerHost, int brokerPort) { Collection keysToRemove = topicsLeaders .entrySet() .stream()