Skip to content

Commit bd6394c

Browse files
iliaxiliax
andauthored
Polling timeouts made configurable (#3513)
1. Polling timeouts made configurable 2. polling-related classes moved to emitter package --------- Co-authored-by: iliax <[email protected]>
1 parent e2dc12d commit bd6394c

File tree

17 files changed

+184
-69
lines changed

17 files changed

+184
-69
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class ClustersProperties {
2727

2828
String internalTopicPrefix;
2929

30+
PollingProperties polling = new PollingProperties();
31+
3032
@Data
3133
public static class Cluster {
3234
String name;
@@ -49,6 +51,13 @@ public static class Cluster {
4951
TruststoreConfig ssl;
5052
}
5153

54+
@Data
55+
public static class PollingProperties {
56+
Integer pollTimeoutMs;
57+
Integer partitionPollTimeout;
58+
Integer noDataEmptyPolls;
59+
}
60+
5261
@Data
5362
@ToString(exclude = "password")
5463
public static class MetricsConfigData {

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
55
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
66
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
7-
import com.provectus.kafka.ui.util.PollingThrottler;
87
import java.time.Duration;
98
import java.time.Instant;
109
import org.apache.kafka.clients.consumer.Consumer;
@@ -14,27 +13,21 @@
1413
import reactor.core.publisher.FluxSink;
1514

1615
public abstract class AbstractEmitter {
17-
private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
18-
19-
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
20-
// This happens when we have holes in records sequences that is usual case for compact topics or
21-
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
22-
// there is no guarantee that you will ever see record with offset Y.
23-
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
24-
public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3;
2516

2617
private final ConsumerRecordDeserializer recordDeserializer;
2718
private final ConsumingStats consumingStats = new ConsumingStats();
2819
private final PollingThrottler throttler;
20+
protected final PollingSettings pollingSettings;
2921

30-
protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
22+
protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
3123
this.recordDeserializer = recordDeserializer;
32-
this.throttler = throttler;
24+
this.pollingSettings = pollingSettings;
25+
this.throttler = pollingSettings.getPollingThrottler();
3326
}
3427

3528
protected ConsumerRecords<Bytes, Bytes> poll(
3629
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
37-
return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
30+
return poll(sink, consumer, pollingSettings.getPollTimeout());
3831
}
3932

4033
protected ConsumerRecords<Bytes, Bytes> poll(

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
import com.provectus.kafka.ui.model.ConsumerPosition;
44
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
55
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
6-
import com.provectus.kafka.ui.util.PollingThrottler;
7-
import java.time.Duration;
86
import java.util.ArrayList;
97
import java.util.Collections;
108
import java.util.Comparator;
119
import java.util.List;
1210
import java.util.TreeMap;
1311
import java.util.function.Supplier;
14-
import java.util.stream.Collectors;
1512
import lombok.extern.slf4j.Slf4j;
1613
import org.apache.kafka.clients.consumer.Consumer;
1714
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -26,8 +23,6 @@ public class BackwardRecordEmitter
2623
extends AbstractEmitter
2724
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
2825

29-
private static final Duration POLL_TIMEOUT = Duration.ofMillis(200);
30-
3126
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
3227
private final ConsumerPosition consumerPosition;
3328
private final int messagesPerPage;
@@ -37,8 +32,8 @@ public BackwardRecordEmitter(
3732
ConsumerPosition consumerPosition,
3833
int messagesPerPage,
3934
ConsumerRecordDeserializer recordDeserializer,
40-
PollingThrottler throttler) {
41-
super(recordDeserializer, throttler);
35+
PollingSettings pollingSettings) {
36+
super(recordDeserializer, pollingSettings);
4237
this.consumerPosition = consumerPosition;
4338
this.messagesPerPage = messagesPerPage;
4439
this.consumerSupplier = consumerSupplier;
@@ -109,17 +104,18 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
109104

110105
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
111106

112-
// we use empty polls counting to verify that partition was fully read
113-
for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
114-
var polledRecords = poll(sink, consumer, POLL_TIMEOUT);
115-
log.debug("{} records polled from {}", polledRecords.count(), tp);
107+
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
108+
while (!sink.isCancelled()
109+
&& recordsToSend.size() < desiredMsgsToPoll
110+
&& !emptyPolls.noDataEmptyPollsReached()) {
111+
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
112+
emptyPolls.count(polledRecords);
116113

117-
// counting sequential empty polls
118-
emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0;
114+
log.debug("{} records polled from {}", polledRecords.count(), tp);
119115

120116
var filteredRecords = polledRecords.records(tp).stream()
121117
.filter(r -> r.offset() < toOffset)
122-
.collect(Collectors.toList());
118+
.toList();
123119

124120
if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
125121
// we already read all messages in target offsets interval
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.provectus.kafka.ui.emitter;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecords;
4+
5+
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
6+
// This happens when we have holes in records sequences that is usual case for compact topics or
7+
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
8+
// there is no guarantee that you will ever see record with offset Y.
9+
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
10+
public class EmptyPollsCounter {
11+
12+
private final int maxEmptyPolls;
13+
14+
private int emptyPolls = 0;
15+
16+
EmptyPollsCounter(int maxEmptyPolls) {
17+
this.maxEmptyPolls = maxEmptyPolls;
18+
}
19+
20+
public void count(ConsumerRecords<?, ?> polled) {
21+
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
22+
}
23+
24+
public boolean noDataEmptyPollsReached() {
25+
return emptyPolls >= maxEmptyPolls;
26+
}
27+
28+
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.provectus.kafka.ui.model.ConsumerPosition;
44
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
55
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
6-
import com.provectus.kafka.ui.util.PollingThrottler;
76
import java.util.function.Supplier;
87
import lombok.extern.slf4j.Slf4j;
98
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -25,8 +24,8 @@ public ForwardRecordEmitter(
2524
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
2625
ConsumerPosition position,
2726
ConsumerRecordDeserializer recordDeserializer,
28-
PollingThrottler throttler) {
29-
super(recordDeserializer, throttler);
27+
PollingSettings pollingSettings) {
28+
super(recordDeserializer, pollingSettings);
3029
this.position = position;
3130
this.consumerSupplier = consumerSupplier;
3231
}
@@ -39,16 +38,16 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
3938
var seekOperations = SeekOperations.create(consumer, position);
4039
seekOperations.assignAndSeekNonEmptyPartitions();
4140

42-
// we use empty polls counting to verify that topic was fully read
43-
int emptyPolls = 0;
41+
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
4442
while (!sink.isCancelled()
4543
&& !seekOperations.assignedPartitionsFullyPolled()
46-
&& emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
44+
&& !emptyPolls.noDataEmptyPollsReached()) {
4745

4846
sendPhase(sink, "Polling");
4947
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
48+
emptyPolls.count(records);
49+
5050
log.debug("{} records polled", records.count());
51-
emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
5251

5352
for (ConsumerRecord<Bytes, Bytes> msg : records) {
5453
if (!sink.isCancelled()) {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package com.provectus.kafka.ui.emitter;
2+
3+
import com.provectus.kafka.ui.config.ClustersProperties;
4+
import java.time.Duration;
5+
import java.util.Optional;
6+
import java.util.function.Supplier;
7+
8+
public class PollingSettings {
9+
10+
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
11+
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
12+
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
13+
14+
private final Duration pollTimeout;
15+
private final Duration partitionPollTimeout;
16+
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
17+
18+
private final Supplier<PollingThrottler> throttlerSupplier;
19+
20+
public static PollingSettings create(ClustersProperties.Cluster cluster,
21+
ClustersProperties clustersProperties) {
22+
var pollingProps = Optional.ofNullable(clustersProperties.getPolling())
23+
.orElseGet(ClustersProperties.PollingProperties::new);
24+
25+
var pollTimeout = pollingProps.getPollTimeoutMs() != null
26+
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
27+
: DEFAULT_POLL_TIMEOUT;
28+
29+
var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
30+
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
31+
: Duration.ofMillis(pollTimeout.toMillis() / 5);
32+
33+
int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
34+
? pollingProps.getNoDataEmptyPolls()
35+
: DEFAULT_NO_DATA_EMPTY_POLLS;
36+
37+
return new PollingSettings(
38+
pollTimeout,
39+
partitionPollTimeout,
40+
noDataEmptyPolls,
41+
PollingThrottler.throttlerSupplier(cluster)
42+
);
43+
}
44+
45+
public static PollingSettings createDefault() {
46+
return new PollingSettings(
47+
DEFAULT_POLL_TIMEOUT,
48+
DEFAULT_PARTITION_POLL_TIMEOUT,
49+
DEFAULT_NO_DATA_EMPTY_POLLS,
50+
PollingThrottler::noop
51+
);
52+
}
53+
54+
private PollingSettings(Duration pollTimeout,
55+
Duration partitionPollTimeout,
56+
int notDataEmptyPolls,
57+
Supplier<PollingThrottler> throttlerSupplier) {
58+
this.pollTimeout = pollTimeout;
59+
this.partitionPollTimeout = partitionPollTimeout;
60+
this.notDataEmptyPolls = notDataEmptyPolls;
61+
this.throttlerSupplier = throttlerSupplier;
62+
}
63+
64+
public EmptyPollsCounter createEmptyPollsCounter() {
65+
return new EmptyPollsCounter(notDataEmptyPolls);
66+
}
67+
68+
public Duration getPollTimeout() {
69+
return pollTimeout;
70+
}
71+
72+
public Duration getPartitionPollTimeout() {
73+
return partitionPollTimeout;
74+
}
75+
76+
public PollingThrottler getPollingThrottler() {
77+
return throttlerSupplier.get();
78+
}
79+
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java renamed to kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
package com.provectus.kafka.ui.util;
1+
package com.provectus.kafka.ui.emitter;
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.util.concurrent.RateLimiter;
55
import com.provectus.kafka.ui.config.ClustersProperties;
6+
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
67
import java.util.function.Supplier;
78
import lombok.extern.slf4j.Slf4j;
89
import org.apache.kafka.clients.consumer.ConsumerRecords;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java renamed to kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.provectus.kafka.ui.util;
1+
package com.provectus.kafka.ui.emitter;
22

33
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
44
import java.util.concurrent.atomic.AtomicInteger;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.provectus.kafka.ui.model.ConsumerPosition;
44
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
55
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
6-
import com.provectus.kafka.ui.util.PollingThrottler;
76
import java.util.HashMap;
87
import java.util.function.Supplier;
98
import lombok.extern.slf4j.Slf4j;
@@ -22,8 +21,8 @@ public class TailingEmitter extends AbstractEmitter
2221
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
2322
ConsumerPosition consumerPosition,
2423
ConsumerRecordDeserializer recordDeserializer,
25-
PollingThrottler throttler) {
26-
super(recordDeserializer, throttler);
24+
PollingSettings pollingSettings) {
25+
super(recordDeserializer, pollingSettings);
2726
this.consumerSupplier = consumerSupplier;
2827
this.consumerPosition = consumerPosition;
2928
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22

33
import com.provectus.kafka.ui.config.ClustersProperties;
44
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
5+
import com.provectus.kafka.ui.emitter.PollingSettings;
56
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
67
import com.provectus.kafka.ui.service.masking.DataMasking;
78
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
8-
import com.provectus.kafka.ui.util.PollingThrottler;
99
import com.provectus.kafka.ui.util.ReactiveFailover;
1010
import java.util.Map;
1111
import java.util.Properties;
12-
import java.util.function.Supplier;
1312
import lombok.AccessLevel;
1413
import lombok.AllArgsConstructor;
1514
import lombok.Builder;
@@ -28,7 +27,7 @@ public class KafkaCluster {
2827
private final boolean readOnly;
2928
private final MetricsConfig metricsConfig;
3029
private final DataMasking masking;
31-
private final Supplier<PollingThrottler> throttler;
30+
private final PollingSettings pollingSettings;
3231
private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
3332
private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
3433
private final ReactiveFailover<KsqlApiClient> ksqlClient;

0 commit comments

Comments
 (0)