Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dev/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ services:
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088
# Kafka Connect client settings (rate limiting for connect API scraping)
# KAFKA_KAFKACONNECTCLIENT_SCRAPECONCURRENCY: 4
# KAFKA_KAFKACONNECTCLIENT_MAXRETRIES: 5
# KAFKA_KAFKACONNECTCLIENT_RETRYBASEDELAYMS: 500
# KAFKA_KAFKACONNECTCLIENT_RETRYMAXDELAYMS: 10000
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,21 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
private static final int MAX_RETRIES = 5;
private static final Duration RETRIES_DELAY = Duration.ofMillis(200);

private final Retry transientErrorRetry;

public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config,
@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
DataSize maxBuffSize,
Duration responseTimeout) {
Duration responseTimeout,
ClustersProperties.KafkaConnect connectConfig) {
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout));
this.transientErrorRetry = Retry
.backoff(connectConfig.getMaxRetries(), Duration.ofMillis(connectConfig.getRetryBaseDelayMs()))
.maxBackoff(Duration.ofMillis(connectConfig.getRetryMaxDelayMs()))
.filter(e -> e instanceof WebClientResponseException wce
&& (wce.getStatusCode().value() == 502
|| wce.getStatusCode().value() == 503
|| wce.getStatusCode().value() == 429));
}

private static Retry conflictCodeRetry() {
Expand All @@ -69,20 +79,24 @@ private static Retry conflictCodeRetry() {
});
}

private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
private <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
.retryWhen(conflictCodeRetry())
.retryWhen(transientErrorRetry);
}

private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
private <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
.retryWhen(conflictCodeRetry())
.retryWhen(transientErrorRetry);
}

private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
return publisher.retryWhen(retryOnRebalance());
private <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(transientErrorRetry);
}


Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class ClustersProperties {

AdminClient adminClient = new AdminClient();

KafkaConnect kafkaConnectClient = new KafkaConnect();

Csv csv = new Csv();

Boolean messageRelativeTimestamp;
Expand All @@ -68,6 +70,14 @@ public static class AdminClient {
int describeTopicsPartitionSize = 200;
}

@Data
public static class KafkaConnect {
int scrapeConcurrency = 4;
int maxRetries = 5;
long retryBaseDelayMs = 500;
long retryMaxDelayMs = 10000;
}

@Data
public static class Cluster {
@NotBlank(message = "field name for for cluster could not be blank")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ public class KafkaClusterFactory {
private final DataSize webClientMaxBuffSize;
private final Duration responseTimeout;
private final JmxMetricsRetriever jmxMetricsRetriever;
private final ClustersProperties clustersProperties;

public KafkaClusterFactory(WebclientProperties webclientProperties,
JmxMetricsRetriever jmxMetricsRetriever) {
JmxMetricsRetriever jmxMetricsRetriever,
ClustersProperties clustersProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
this.responseTimeout = Optional.ofNullable(webclientProperties.getResponseTimeoutMs())
.map(Duration::ofMillis)
.orElse(DEFAULT_RESPONSE_TIMEOUT);
this.jmxMetricsRetriever = jmxMetricsRetriever;
this.clustersProperties = clustersProperties;
}

public KafkaCluster create(ClustersProperties properties,
Expand Down Expand Up @@ -230,7 +233,8 @@ private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties
connectCluster.toBuilder().address(url).build(),
cluster.getSsl(),
webClientMaxBuffSize,
responseTimeout
responseTimeout,
clustersProperties.getKafkaConnectClient()
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available",
Expand Down
24 changes: 15 additions & 9 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class KafkaConnectService {
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final ClustersProperties clustersProperties;
private final StatisticsCache statisticsCache;
private final int connectScrapeConcurrency;

public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
KafkaConfigSanitizer kafkaConfigSanitizer,
Expand All @@ -60,6 +61,7 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
this.clustersProperties = clustersProperties;
this.statisticsCache = statisticsCache;
this.connectScrapeConcurrency = clustersProperties.getKafkaConnectClient().getScrapeConcurrency();
}

public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
Expand All @@ -69,22 +71,23 @@ public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
if (withStats) {
return connectClusters.map(connects ->
Flux.fromIterable(connects).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci)),
connectScrapeConcurrency
).flatMap(tuple -> (
getConnectConnectors(cluster, tuple.getT1())
.collectList()
.map(connectors ->
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), true)
)
)
), connectScrapeConcurrency
)
).orElse(Flux.fromIterable(List.of()));
} else {
return Flux.fromIterable(connectClusters.orElse(List.of()))
.flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
)
), connectScrapeConcurrency
);
}
}
Expand Down Expand Up @@ -128,9 +131,10 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics())
).map(i -> checkConsumerGroup(cluster, i))
).map(i -> checkConsumerGroup(cluster, i)),
connectScrapeConcurrency
)
)
), connectScrapeConcurrency
).map(kafkaConnectMapper::fullConnectorInfo)
.collectList()
.map(lst -> filterConnectors(lst, search, fts))
Expand All @@ -145,7 +149,8 @@ public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName())))
).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName()))),
connectScrapeConcurrency
).flatMap(connect ->
getConnectorsWithErrorsSuppress(cluster, connect.getName())
.onErrorResume(_ -> Mono.just(Map.of()))
Expand All @@ -158,9 +163,10 @@ public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics())
)
), connectScrapeConcurrency
)
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors))
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors)),
connectScrapeConcurrency
);
}

Expand Down Expand Up @@ -391,7 +397,7 @@ public Flux<FullConnectorInfoDTO> getTopicConnectors(KafkaCluster cluster, Strin
).map(i ->
checkConsumerGroup(cluster, i)
).map(kafkaConnectMapper::fullConnectorInfo).toList()
)
), connectScrapeConcurrency
).flatMap(Flux::fromIterable);
}

Expand Down
6 changes: 6 additions & 0 deletions api/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ spring:
group-filter-search-base: "ou=people,dc=planetexpress,dc=com"

kafka:
# Kafka Connect client settings (applied to all connect clusters)
# kafka-connect-client:
# scrape-concurrency: 4 # max parallel requests per connect cluster during scrape
# max-retries: 5 # max retries on transient errors (502/503/429)
# retry-base-delay-ms: 500 # initial backoff delay
# retry-max-delay-ms: 10000 # max backoff delay
clusters:
- name: local
bootstrapServers: localhost:9092
Expand Down
Loading
Loading