diff --git a/.dev/dev.yaml b/.dev/dev.yaml index 47149ed92..bacd6774c 100644 --- a/.dev/dev.yaml +++ b/.dev/dev.yaml @@ -21,6 +21,11 @@ services: KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088 + # Kafka Connect client settings (rate limiting for connect API scraping) + # KAFKA_KAFKACONNECTCLIENT_SCRAPECONCURRENCY: 4 + # KAFKA_KAFKACONNECTCLIENT_MAXRETRIES: 5 + # KAFKA_KAFKACONNECTCLIENT_RETRYBASEDELAYMS: 500 + # KAFKA_KAFKACONNECTCLIENT_RETRYMAXDELAYMS: 10000 DYNAMIC_CONFIG_ENABLED: 'true' KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index cbe48f811..be180b35c 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -40,11 +40,21 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; private static final Duration RETRIES_DELAY = Duration.ofMillis(200); + private final Retry transientErrorRetry; + public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config, @Nullable ClustersProperties.TruststoreConfig truststoreConfig, DataSize maxBuffSize, - Duration responseTimeout) { + Duration responseTimeout, + ClustersProperties.KafkaConnect connectConfig) { super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout)); + this.transientErrorRetry = Retry + .backoff(connectConfig.getMaxRetries(), Duration.ofMillis(connectConfig.getRetryBaseDelayMs())) + .maxBackoff(Duration.ofMillis(connectConfig.getRetryMaxDelayMs())) + .filter(e -> e instanceof WebClientResponseException wce + && (wce.getStatusCode().value() == 502 + || wce.getStatusCode().value() == 503 + || wce.getStatusCode().value() == 429)); } private static Retry conflictCodeRetry() { @@ -69,20 +79,24 @@ private static Retry conflictCodeRetry() { }); } - private static Mono withRetryOnConflictOrRebalance(Mono publisher) { + private Mono withRetryOnConflictOrRebalance(Mono publisher) { return publisher .retryWhen(retryOnRebalance()) - .retryWhen(conflictCodeRetry()); + .retryWhen(conflictCodeRetry()) + .retryWhen(transientErrorRetry); } - private static Flux withRetryOnConflictOrRebalance(Flux publisher) { + private Flux withRetryOnConflictOrRebalance(Flux publisher) { return publisher .retryWhen(retryOnRebalance()) - .retryWhen(conflictCodeRetry()); + .retryWhen(conflictCodeRetry()) + .retryWhen(transientErrorRetry); } - private static Mono withRetryOnRebalance(Mono publisher) { - return publisher.retryWhen(retryOnRebalance()); + private Mono withRetryOnRebalance(Mono publisher) { + return publisher + .retryWhen(retryOnRebalance()) + .retryWhen(transientErrorRetry); } diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index da48dd2ff..f1c41437e 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -45,6 +45,8 @@ public class ClustersProperties { AdminClient adminClient = new AdminClient(); + KafkaConnect kafkaConnectClient = new KafkaConnect(); + Csv csv = new Csv(); Boolean messageRelativeTimestamp; @@ -68,6 +70,14 @@ public static class AdminClient { int describeTopicsPartitionSize = 200; } + @Data + public static class KafkaConnect { + int scrapeConcurrency = 4; + int maxRetries = 5; + long retryBaseDelayMs = 500; + long retryMaxDelayMs = 10000; + } + @Data public static class Cluster { @NotBlank(message = "field name for for cluster could not be blank") diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index affbdd654..c006f0848 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -58,9 +58,11 @@ public class KafkaClusterFactory { private final DataSize webClientMaxBuffSize; private final Duration responseTimeout; private final JmxMetricsRetriever jmxMetricsRetriever; + private final ClustersProperties clustersProperties; public KafkaClusterFactory(WebclientProperties webclientProperties, - JmxMetricsRetriever jmxMetricsRetriever) { + JmxMetricsRetriever jmxMetricsRetriever, + ClustersProperties clustersProperties) { this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize()) .map(DataSize::parse) .orElse(DEFAULT_WEBCLIENT_BUFFER); @@ -68,6 +70,7 @@ public KafkaClusterFactory(WebclientProperties webclientProperties, .map(Duration::ofMillis) .orElse(DEFAULT_RESPONSE_TIMEOUT); this.jmxMetricsRetriever = jmxMetricsRetriever; + this.clustersProperties = clustersProperties; } public KafkaCluster create(ClustersProperties properties, @@ -230,7 +233,8 @@ private ReactiveFailover connectClient(ClustersProperties connectCluster.toBuilder().address(url).build(), cluster.getSsl(), webClientMaxBuffSize, - responseTimeout + responseTimeout, + clustersProperties.getKafkaConnectClient() ), ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, "No alive connect instances available", diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index d79b008d6..7f81691ce 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -51,6 +51,7 @@ public class KafkaConnectService { private final KafkaConfigSanitizer kafkaConfigSanitizer; private final ClustersProperties clustersProperties; private final StatisticsCache statisticsCache; + private final int connectScrapeConcurrency; public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, KafkaConfigSanitizer kafkaConfigSanitizer, @@ -60,6 +61,7 @@ public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper, this.kafkaConfigSanitizer = kafkaConfigSanitizer; this.clustersProperties = clustersProperties; this.statisticsCache = statisticsCache; + this.connectScrapeConcurrency = clustersProperties.getKafkaConnectClient().getScrapeConcurrency(); } public Flux getConnects(KafkaCluster cluster, boolean withStats) { @@ -69,14 +71,15 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { if (withStats) { return connectClusters.map(connects -> Flux.fromIterable(connects).flatMap(c -> - getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci)) + getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci)), + connectScrapeConcurrency ).flatMap(tuple -> ( getConnectConnectors(cluster, tuple.getT1()) .collectList() .map(connectors -> kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), true) ) - ) + ), connectScrapeConcurrency ) ).orElse(Flux.fromIterable(List.of())); } else { @@ -84,7 +87,7 @@ public Flux getConnects(KafkaCluster cluster, boolean withStats) { .flatMap(c -> getClusterInfo(cluster, c.getName()).map(info -> kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false) - ) + ), connectScrapeConcurrency ); } } @@ -128,9 +131,10 @@ public Flux getAllConnectors(final KafkaCluster cluster, e.getKey() ).map(topics -> kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics()) - ).map(i -> checkConsumerGroup(cluster, i)) + ).map(i -> checkConsumerGroup(cluster, i)), + connectScrapeConcurrency ) - ) + ), connectScrapeConcurrency ).map(kafkaConnectMapper::fullConnectorInfo) .collectList() .map(lst -> filterConnectors(lst, search, fts)) @@ -145,7 +149,8 @@ public Flux scrapeAllConnects(KafkaCluster cluster) { return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c -> getClusterInfo(cluster, c.getName()).map(info -> kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false) - ).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName()))) + ).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName()))), + connectScrapeConcurrency ).flatMap(connect -> getConnectorsWithErrorsSuppress(cluster, connect.getName()) .onErrorResume(_ -> Mono.just(Map.of())) @@ -158,9 +163,10 @@ public Flux scrapeAllConnects(KafkaCluster cluster) { e.getKey() ).map(topics -> kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics()) - ) + ), connectScrapeConcurrency ) - ).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors)) + ).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors)), + connectScrapeConcurrency ); } @@ -391,7 +397,7 @@ public Flux getTopicConnectors(KafkaCluster cluster, Strin ).map(i -> checkConsumerGroup(cluster, i) ).map(kafkaConnectMapper::fullConnectorInfo).toList() - ) + ), connectScrapeConcurrency ).flatMap(Flux::fromIterable); } diff --git a/api/src/main/resources/application-local.yml b/api/src/main/resources/application-local.yml index 0c40ff079..ecadf9559 100644 --- a/api/src/main/resources/application-local.yml +++ b/api/src/main/resources/application-local.yml @@ -23,6 +23,12 @@ spring: group-filter-search-base: "ou=people,dc=planetexpress,dc=com" kafka: + # Kafka Connect client settings (applied to all connect clusters) + # kafka-connect-client: + # scrape-concurrency: 4 # max parallel requests per connect cluster during scrape + # max-retries: 5 # max retries on transient errors (502/503/429) + # retry-base-delay-ms: 500 # initial backoff delay + # retry-max-delay-ms: 10000 # max backoff delay clusters: - name: local bootstrapServers: localhost:9092 diff --git a/api/src/test/java/io/kafbat/ui/client/RetryingKafkaConnectClientTest.java b/api/src/test/java/io/kafbat/ui/client/RetryingKafkaConnectClientTest.java new file mode 100644 index 000000000..586503cd0 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/client/RetryingKafkaConnectClientTest.java @@ -0,0 +1,193 @@ +package io.kafbat.ui.client; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.config.ClustersProperties; +import java.io.IOException; +import java.time.Duration; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.test.StepVerifier; + +class RetryingKafkaConnectClientTest { + + private final MockWebServer mockWebServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + mockWebServer.start(); + } + + @AfterEach + void tearDown() throws IOException { + mockWebServer.close(); + } + + private RetryingKafkaConnectClient createClient(ClustersProperties.KafkaConnect connectConfig) { + var connectCluster = ClustersProperties.ConnectCluster.builder() + .name("test-connect") + .address(mockWebServer.url("/").toString()) + .build(); + return new RetryingKafkaConnectClient( + connectCluster, + null, + DataSize.ofMegabytes(20), + Duration.ofSeconds(5), + connectConfig + ); + } + + private ClustersProperties.KafkaConnect fastRetryConfig() { + var config = new ClustersProperties.KafkaConnect(); + config.setMaxRetries(3); + config.setRetryBaseDelayMs(50); + config.setRetryMaxDelayMs(200); + return config; + } + + @Test + void retriesOn502AndEventuallySucceeds() { + var client = createClient(fastRetryConfig()); + + // First two calls return 502, third returns 200 with valid connectors JSON + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + mockWebServer.enqueue(new MockResponse() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{}")); + + StepVerifier.create(client.getConnectors(null, null)) + .assertNext(result -> assertThat(result).isEmpty()) + .verifyComplete(); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(3); + } + + @Test + void retriesOn503AndEventuallySucceeds() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(503).setBody("Service Unavailable")); + mockWebServer.enqueue(new MockResponse() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{}")); + + StepVerifier.create(client.getConnectors(null, null)) + .assertNext(result -> assertThat(result).isEmpty()) + .verifyComplete(); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(2); + } + + @Test + void retriesOn429AndEventuallySucceeds() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(429).setBody("Too Many Requests")); + mockWebServer.enqueue(new MockResponse() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{}")); + + StepVerifier.create(client.getConnectors(null, null)) + .assertNext(result -> assertThat(result).isEmpty()) + .verifyComplete(); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(2); + } + + @Test + void failsAfterMaxRetriesExhausted() { + var client = createClient(fastRetryConfig()); + + // Enqueue more 502s than maxRetries (3 retries + 1 initial = 4 attempts total) + for (int i = 0; i < 5; i++) { + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + } + + StepVerifier.create(client.getConnectors(null, null)) + .expectErrorSatisfies(e -> { + // Reactor's Retry.backoff throws Exceptions.retryExhausted which is an IllegalStateException + assertThat(e).isInstanceOf(IllegalStateException.class); + assertThat(e.getMessage()).contains("Retries exhausted"); + }) + .verify(Duration.ofSeconds(10)); + + // 1 initial + 3 retries = 4 total + assertThat(mockWebServer.getRequestCount()).isEqualTo(4); + } + + @Test + void doesNotRetryOn400() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(400).setBody("Bad Request")); + + StepVerifier.create(client.getConnectors(null, null)) + .expectError(WebClientResponseException.BadRequest.class) + .verify(Duration.ofSeconds(5)); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(1); + } + + @Test + void doesNotRetryOn404() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(404).setBody("Not Found")); + + StepVerifier.create(client.getConnectors(null, null)) + .expectError(WebClientResponseException.NotFound.class) + .verify(Duration.ofSeconds(5)); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(1); + } + + @Test + void retriesWithExponentialBackoff() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + mockWebServer.enqueue(new MockResponse() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{}")); + + long start = System.currentTimeMillis(); + StepVerifier.create(client.getConnectors(null, null)) + .assertNext(result -> assertThat(result).isEmpty()) + .verifyComplete(); + long elapsed = System.currentTimeMillis() - start; + + // With baseDelay=50ms and 2 retries, total should be at least ~50ms (backoff applies) + assertThat(elapsed).isGreaterThanOrEqualTo(50); + } + + @Test + void getConnectorTopicsRetriesOnTransientError() { + var client = createClient(fastRetryConfig()); + + mockWebServer.enqueue(new MockResponse().setResponseCode(502).setBody("Bad Gateway")); + mockWebServer.enqueue(new MockResponse() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{\"test-connector\": {\"topics\": [\"topic1\"]}}")); + + StepVerifier.create(client.getConnectorTopics("test-connector")) + .assertNext(result -> { + assertThat(result).containsKey("test-connector"); + assertThat(result.get("test-connector").getTopics()).containsExactly("topic1"); + }) + .verifyComplete(); + + assertThat(mockWebServer.getRequestCount()).isEqualTo(2); + } +} diff --git a/api/src/test/java/io/kafbat/ui/config/ClustersPropertiesTest.java b/api/src/test/java/io/kafbat/ui/config/ClustersPropertiesTest.java index 81a0185d4..0f50ffb0c 100644 --- a/api/src/test/java/io/kafbat/ui/config/ClustersPropertiesTest.java +++ b/api/src/test/java/io/kafbat/ui/config/ClustersPropertiesTest.java @@ -4,7 +4,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.junit.jupiter.api.Test; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; class ClustersPropertiesTest { @@ -48,4 +52,72 @@ void ifOnlyOneClusterProvidedNameIsOptionalAndSetToDefault() { .isEqualTo("Default"); } + @Test + void kafkaConnectConfigHasSensibleDefaults() { + var config = new ClustersProperties.KafkaConnect(); + assertThat(config.getScrapeConcurrency()).isEqualTo(4); + assertThat(config.getMaxRetries()).isEqualTo(5); + assertThat(config.getRetryBaseDelayMs()).isEqualTo(500L); + assertThat(config.getRetryMaxDelayMs()).isEqualTo(10000L); + } + + @Test + void kafkaConnectClientFieldIsInitializedByDefault() { + ClustersProperties properties = new ClustersProperties(); + assertThat(properties.getKafkaConnectClient()).isNotNull(); + assertThat(properties.getKafkaConnectClient().getScrapeConcurrency()).isEqualTo(4); + } + + @Test + void kafkaConnectConfigOverriddenViaYamlStyleProperties() { + Map props = new HashMap<>(); + props.put("kafka.kafka-connect-client.scrape-concurrency", "8"); + props.put("kafka.kafka-connect-client.max-retries", "10"); + props.put("kafka.kafka-connect-client.retry-base-delay-ms", "1000"); + props.put("kafka.kafka-connect-client.retry-max-delay-ms", "30000"); + + ClustersProperties result = new Binder(new MapConfigurationPropertySource(props)) + .bind("kafka", ClustersProperties.class) + .get(); + + assertThat(result.getKafkaConnectClient().getScrapeConcurrency()).isEqualTo(8); + assertThat(result.getKafkaConnectClient().getMaxRetries()).isEqualTo(10); + assertThat(result.getKafkaConnectClient().getRetryBaseDelayMs()).isEqualTo(1000L); + assertThat(result.getKafkaConnectClient().getRetryMaxDelayMs()).isEqualTo(30000L); + } + + @Test + void kafkaConnectConfigOverriddenViaEnvVarStyleProperties() { + Map props = new HashMap<>(); + props.put("kafka.kafkaconnectclient.scrapeconcurrency", "16"); + props.put("kafka.kafkaconnectclient.maxretries", "3"); + props.put("kafka.kafkaconnectclient.retrybasedelayms", "250"); + props.put("kafka.kafkaconnectclient.retrymaxdelayms", "5000"); + + ClustersProperties result = new Binder(new MapConfigurationPropertySource(props)) + .bind("kafka", ClustersProperties.class) + .get(); + + assertThat(result.getKafkaConnectClient().getScrapeConcurrency()).isEqualTo(16); + assertThat(result.getKafkaConnectClient().getMaxRetries()).isEqualTo(3); + assertThat(result.getKafkaConnectClient().getRetryBaseDelayMs()).isEqualTo(250L); + assertThat(result.getKafkaConnectClient().getRetryMaxDelayMs()).isEqualTo(5000L); + } + + @Test + void kafkaConnectConfigPartialOverrideKeepsDefaults() { + Map props = new HashMap<>(); + props.put("kafka.kafka-connect-client.scrape-concurrency", "12"); + + ClustersProperties result = new Binder(new MapConfigurationPropertySource(props)) + .bind("kafka", ClustersProperties.class) + .get(); + + assertThat(result.getKafkaConnectClient().getScrapeConcurrency()).isEqualTo(12); + // Others keep defaults + assertThat(result.getKafkaConnectClient().getMaxRetries()).isEqualTo(5); + assertThat(result.getKafkaConnectClient().getRetryBaseDelayMs()).isEqualTo(500L); + assertThat(result.getKafkaConnectClient().getRetryMaxDelayMs()).isEqualTo(10000L); + } + } diff --git a/api/src/test/java/io/kafbat/ui/service/KafkaConnectServiceConcurrencyTest.java b/api/src/test/java/io/kafbat/ui/service/KafkaConnectServiceConcurrencyTest.java new file mode 100644 index 000000000..b393bfb56 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/KafkaConnectServiceConcurrencyTest.java @@ -0,0 +1,202 @@ +package io.kafbat.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.connect.model.ClusterInfo; +import io.kafbat.ui.connect.model.Connector; +import io.kafbat.ui.connect.model.ConnectorStatus; +import io.kafbat.ui.connect.model.ConnectorStatusConnector; +import io.kafbat.ui.connect.model.ConnectorTopics; +import io.kafbat.ui.connect.model.ExpandedConnector; +import io.kafbat.ui.mapper.KafkaConnectMapperImpl; +import io.kafbat.ui.model.KafkaCluster; +import io.kafbat.ui.model.Statistics; +import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; +import io.kafbat.ui.util.ReactiveFailover; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class KafkaConnectServiceConcurrencyTest { + + private static final int CONCURRENCY = 2; + private static final int CONNECTOR_COUNT = 10; + + private KafkaConnectService service; + private final AtomicInteger concurrentCalls = new AtomicInteger(0); + private final AtomicInteger maxConcurrentCalls = new AtomicInteger(0); + + @BeforeEach + void setUp() { + concurrentCalls.set(0); + maxConcurrentCalls.set(0); + + var clustersProperties = new ClustersProperties(); + var kafkaConnectConfig = new ClustersProperties.KafkaConnect(); + kafkaConnectConfig.setScrapeConcurrency(CONCURRENCY); + clustersProperties.setKafkaConnectClient(kafkaConnectConfig); + + var statisticsCache = mock(StatisticsCache.class); + var statistics = Statistics.builder() + .connectStates(Map.of()) + .clusterState(mock(ScrapedClusterState.class)) + .build(); + when(statisticsCache.get(any())).thenReturn(statistics); + + var mapper = new KafkaConnectMapperImpl(); + KafkaConfigSanitizer sanitizer = new KafkaConfigSanitizer(true, List.of()); + + service = new KafkaConnectService(mapper, sanitizer, clustersProperties, statisticsCache); + } + + @SuppressWarnings("unchecked") + private ReactiveFailover mockConnectApi() { + var api = mock(io.kafbat.ui.connect.api.KafkaConnectClientApi.class); + + // getClusterInfo + when(api.getClusterInfo()).thenReturn(Mono.just(new ClusterInfo().version("3.0"))); + + // getConnectors - returns N connectors + Map connectors = IntStream.range(0, CONNECTOR_COUNT) + .boxed() + .collect(Collectors.toMap( + i -> "connector-" + i, + i -> new ExpandedConnector() + .info(new Connector() + .name("connector-" + i) + .type(Connector.TypeEnum.SINK) + .config(Map.of("name", "connector-" + i, "connector.class", "TestClass")) + .tasks(List.of())) + .status(new ConnectorStatus() + .name("connector-" + i) + .tasks(List.of()) + .connector(new ConnectorStatusConnector() + .state(ConnectorStatusConnector.StateEnum.RUNNING))) + )); + when(api.getConnectors(any(), any())).thenReturn(Mono.just(connectors)); + + // getConnectorTopics - track concurrency at subscription time + when(api.getConnectorTopics(anyString())).thenAnswer(invocation -> { + String name = invocation.getArgument(0); + return Mono.defer(() -> { + int current = concurrentCalls.incrementAndGet(); + maxConcurrentCalls.updateAndGet(max -> Math.max(max, current)); + return Mono.just(Map.of(name, new ConnectorTopics().topics(List.of("topic1")))) + .delayElement(Duration.ofMillis(100)) + .doOnTerminate(() -> concurrentCalls.decrementAndGet()); + }); + }); + + var failover = mock(ReactiveFailover.class); + when(failover.mono(any())).thenAnswer(invocation -> { + @SuppressWarnings("unchecked") + var fn = (java.util.function.Function>) invocation + .getArgument(0); + // Defer so fn.apply() runs at subscription time, not invocation time + return Mono.defer(() -> fn.apply(api)); + }); + when(failover.flux(any())).thenAnswer(invocation -> { + @SuppressWarnings("unchecked") + var fn = (java.util.function.Function>) invocation + .getArgument(0); + return reactor.core.publisher.Flux.defer(() -> fn.apply(api)); + }); + return failover; + } + + private KafkaCluster buildCluster( + ReactiveFailover connectApi) { + var connectCluster = ClustersProperties.ConnectCluster.builder() + .name("test-connect") + .address("http://localhost:8083") + .consumerNamePattern("connect-%s") + .build(); + var clusterProps = new ClustersProperties.Cluster(); + clusterProps.setName("test-cluster"); + clusterProps.setBootstrapServers("localhost:9092"); + clusterProps.setKafkaConnect(List.of(connectCluster)); + + return KafkaCluster.builder() + .name("test-cluster") + .bootstrapServers("localhost:9092") + .originalProperties(clusterProps) + .connectsClients(Map.of("test-connect", connectApi)) + .connectsConfigs(Map.of("test-connect", connectCluster)) + .build(); + } + + @Test + void scrapeAllConnectsLimitsConcurrency() { + var connectApi = mockConnectApi(); + var cluster = buildCluster(connectApi); + + StepVerifier.create(service.scrapeAllConnects(cluster).collectList()) + .assertNext(states -> { + assertThat(states).hasSize(1); + KafkaConnectState state = states.getFirst(); + assertThat(state.getConnectors()).hasSize(CONNECTOR_COUNT); + }) + .verifyComplete(); + + // maxConcurrentCalls should not exceed CONCURRENCY + assertThat(maxConcurrentCalls.get()) + .as("Max concurrent getConnectorTopics calls should not exceed configured concurrency (%d)", CONCURRENCY) + .isLessThanOrEqualTo(CONCURRENCY); + } + + @Test + void scrapeAllConnectsWithHigherConcurrency() { + // Reconfigure with higher concurrency + concurrentCalls.set(0); + maxConcurrentCalls.set(0); + + var clustersProperties = new ClustersProperties(); + var kafkaConnectConfig = new ClustersProperties.KafkaConnect(); + kafkaConnectConfig.setScrapeConcurrency(8); + clustersProperties.setKafkaConnectClient(kafkaConnectConfig); + + var statisticsCache = mock(StatisticsCache.class); + var statistics = Statistics.builder() + .connectStates(Map.of()) + .clusterState(mock(ScrapedClusterState.class)) + .build(); + when(statisticsCache.get(any())).thenReturn(statistics); + + service = new KafkaConnectService( + new KafkaConnectMapperImpl(), + new KafkaConfigSanitizer(true, List.of()), + clustersProperties, + statisticsCache + ); + + var connectApi = mockConnectApi(); + var cluster = buildCluster(connectApi); + + StepVerifier.create(service.scrapeAllConnects(cluster).collectList()) + .assertNext(states -> assertThat(states).hasSize(1)) + .verifyComplete(); + + // With concurrency=8 and 10 connectors, max should be at most 8 + assertThat(maxConcurrentCalls.get()) + .as("Max concurrent calls should not exceed configured concurrency (8)") + .isLessThanOrEqualTo(8); + + // But should be higher than the restrictive limit of 2 + assertThat(maxConcurrentCalls.get()) + .as("Max concurrent calls should be higher than 2 when concurrency=8") + .isGreaterThan(CONCURRENCY); + } +}