Skip to content

Commit ffb14f0

Browse files
BE: Topics: Validate ISR/replication upon creation (kafbat#103)
1 parent 40984bf commit ffb14f0

File tree

3 files changed

+1134
-0
lines changed

3 files changed

+1134
-0
lines changed

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kafbat.ui.exception.IllegalEntityStateException;
1313
import io.kafbat.ui.exception.NotFoundException;
1414
import io.kafbat.ui.exception.ValidationException;
15+
import io.kafbat.ui.service.validators.KafkaPropertiesConstraintsValidator;
1516
import io.kafbat.ui.util.KafkaVersion;
1617
import io.kafbat.ui.util.MetadataVersion;
1718
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
@@ -476,6 +477,9 @@ public Mono<Void> createTopic(String name,
476477
int numPartitions,
477478
@Nullable Integer replicationFactor,
478479
Map<String, String> configs) {
480+
KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor,
481+
configs);
482+
validator.validate();
479483
var newTopic = new NewTopic(
480484
name,
481485
Optional.of(numPartitions),
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package io.kafbat.ui.service.validators;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import lombok.AllArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
12+
@Slf4j
13+
@AllArgsConstructor
14+
public class KafkaPropertiesConstraintsValidator {
15+
private Integer replicationFactor;
16+
private Map<String, String> configs;
17+
18+
private static final String COMPACT = "compact";
19+
private static final String LOCAL_RETENTION_MS = "local.retention.ms";
20+
private static final String LOCAL_RETENTION_BYTES = "local.retention.bytes";
21+
private static final String RETENTION_MS = "retention.ms";
22+
private static final String SEGMENT_MS = "segment.ms";
23+
private static final String RETENTION_BYTES = "retention.bytes";
24+
private static final String SEGMENT_BYTES = "segment.bytes";
25+
private static final String MAX_MESSAGE_BYTES = "max.message.bytes";
26+
private static final String COMPRESSION_ZSTD_LEVEL = "compression.zstd.level";
27+
private static final String COMPRESSION_LZ4_LEVEL = "compression.lz4.level";
28+
private static final String COMPRESSION_GZIP_LEVEL = "compression.gzip.level";
29+
private static final String MIN_CLEANABLE_DIRTY_RATIO = "min.cleanable.dirty.ratio";
30+
private static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms";
31+
private static final String MAX_COMPACTION_LAG_MS = "max.compaction.lag.ms";
32+
private static final String DELETE_RETENTION_MS = "delete.retention.ms";
33+
34+
public void validate() {
35+
minInSyncReplicasLessThanReplicationFactorValidation();
36+
compressionConfigValueValidation();
37+
compactionConfigValuesValidation();
38+
remoteStorageConfigValuesValidation();
39+
retentionAndDeletionTimeConfigurationBasedConstraintsValidation();
40+
retentionAndDeletionMemoryConfigurationBasedConstraintsValidation();
41+
}
42+
43+
void minInSyncReplicasLessThanReplicationFactorValidation() {
44+
Integer minInSyncReplicas = configs.get("min.insync.replicas") != null
45+
? Integer.parseInt(configs.get("min.insync.replicas"))
46+
: null;
47+
48+
if (minInSyncReplicas != null && replicationFactor != null && minInSyncReplicas > replicationFactor) {
49+
throw new IllegalArgumentException(
50+
String.format("min.insync.replicas (%d) should be less than or equal to replication.factor (%d)",
51+
minInSyncReplicas, replicationFactor));
52+
}
53+
}
54+
55+
void compressionConfigValueValidation() {
56+
String compressionType = configs.get("compression.type");
57+
if (configs.get(COMPRESSION_ZSTD_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_ZSTD_LEVEL), "3")
58+
&& !"zstd".equals(compressionType)) {
59+
throw new IllegalArgumentException(
60+
String.format("compression.zstd.level (%s) should be set only when compression.type is zstd",
61+
configs.get(COMPRESSION_ZSTD_LEVEL)));
62+
}
63+
if (configs.get(COMPRESSION_LZ4_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_LZ4_LEVEL), "9")
64+
&& !"lz4".equals(compressionType)) {
65+
throw new IllegalArgumentException(
66+
String.format("compression.lz4.level (%s) should be set only when compression.type is lz4",
67+
configs.get(COMPRESSION_LZ4_LEVEL)));
68+
}
69+
if (configs.get(COMPRESSION_GZIP_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_GZIP_LEVEL), "-1")
70+
&& !"gzip".equals(compressionType)) {
71+
throw new IllegalArgumentException(
72+
String.format("compression.gzip.level (%s) should be set only when compression.type is gzip",
73+
configs.get(COMPRESSION_GZIP_LEVEL)));
74+
}
75+
}
76+
77+
void compactionConfigValuesValidation() {
78+
String cleanupPolicy = configs.get("cleanup.policy");
79+
List<String> policies = new ArrayList<>();
80+
if (cleanupPolicy != null) {
81+
policies = Arrays.asList(cleanupPolicy.split(","));
82+
}
83+
if (configs.get(MIN_CLEANABLE_DIRTY_RATIO) != null
84+
&& !Objects.equals(configs.get(MIN_CLEANABLE_DIRTY_RATIO), "0.5")
85+
&& !policies.contains(COMPACT)) {
86+
throw new IllegalArgumentException(
87+
String.format("min.cleanable.dirty.ratio (%s) should be set only when cleanup.policy is compact",
88+
configs.get(MIN_CLEANABLE_DIRTY_RATIO)));
89+
}
90+
if (configs.get(MIN_COMPACTION_LAG_MS) != null
91+
&& !Objects.equals(configs.get(MIN_COMPACTION_LAG_MS), "0")
92+
&& !policies.contains(COMPACT)) {
93+
throw new IllegalArgumentException(
94+
String.format("min.compaction.lag.ms (%s) should be set only when cleanup.policy is compact",
95+
configs.get(MIN_COMPACTION_LAG_MS)));
96+
}
97+
if (configs.get(MAX_COMPACTION_LAG_MS) != null
98+
&& !Objects.equals(configs.get(MAX_COMPACTION_LAG_MS), "9223372036854775807")
99+
&& !policies.contains(COMPACT)) {
100+
throw new IllegalArgumentException(
101+
String.format("max.compaction.lag.ms (%s) should be set only when cleanup.policy is compact",
102+
configs.get(MAX_COMPACTION_LAG_MS)));
103+
}
104+
if (configs.get(DELETE_RETENTION_MS) != null
105+
&& !Objects.equals(configs.get(DELETE_RETENTION_MS), "86400000")
106+
&& !policies.contains(COMPACT)) {
107+
throw new IllegalArgumentException(
108+
String.format("delete.retention.ms (%s) should be set only when cleanup.policy is compact",
109+
configs.get(DELETE_RETENTION_MS)));
110+
}
111+
112+
}
113+
114+
void remoteStorageConfigValuesValidation() {
115+
String remoteStorageEnabled = configs.get("remote.storage.enable");
116+
if (configs.get(LOCAL_RETENTION_MS) != null && !Objects.equals(configs.get(LOCAL_RETENTION_MS), "-2")
117+
&& !"true".equals(remoteStorageEnabled)) {
118+
throw new IllegalArgumentException(
119+
String.format("local.retention.ms (%s) should be set only when remoteStorageEnabled is true",
120+
configs.get(LOCAL_RETENTION_MS)));
121+
}
122+
if (configs.get(LOCAL_RETENTION_BYTES) != null && !Objects.equals(configs.get(LOCAL_RETENTION_BYTES), "-2")
123+
&& !"true".equals(remoteStorageEnabled)) {
124+
throw new IllegalArgumentException(
125+
String.format("local.retention.bytes (%s) should be set only when remoteStorageEnabled is true",
126+
configs.get(LOCAL_RETENTION_BYTES)));
127+
}
128+
}
129+
130+
void retentionAndDeletionTimeConfigurationBasedConstraintsValidation() {
131+
List<String> keys = new ArrayList<>();
132+
List<Long> values = new ArrayList<>();
133+
if (!Objects.equals(configs.get(RETENTION_MS), "-1")) {
134+
keys.add(RETENTION_MS);
135+
addRetentionMsToValues(values);
136+
}
137+
if (!Objects.equals(configs.get(LOCAL_RETENTION_MS), "-2") && configs.get(LOCAL_RETENTION_MS) != null) {
138+
keys.add(LOCAL_RETENTION_MS);
139+
values.add(parseLong(configs.get(LOCAL_RETENTION_MS)));
140+
}
141+
keys.add(SEGMENT_MS);
142+
if (configs.get(SEGMENT_MS) != null) {
143+
values.add(parseLong(configs.get(SEGMENT_MS)));
144+
} else {
145+
values.add(604800000L);
146+
}
147+
148+
for (int i = 0; i < values.size() - 1; i++) {
149+
Long current = values.get(i);
150+
Long next = values.get(i + 1);
151+
if (current != 0 && next != 0 && current < next) {
152+
throw new IllegalArgumentException(
153+
String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)",
154+
keys.get(i), current,
155+
keys.get(i + 1), next));
156+
}
157+
}
158+
159+
}
160+
161+
void addRetentionMsToValues(List<Long> values) {
162+
if (configs.get(RETENTION_MS) != null) {
163+
values.add(parseLong(configs.get(RETENTION_MS)));
164+
} else {
165+
values.add(604800000L);
166+
}
167+
}
168+
169+
void retentionAndDeletionMemoryConfigurationBasedConstraintsValidation() {
170+
171+
List<String> keys = new ArrayList<>();
172+
List<Long> values = new ArrayList<>();
173+
if (!Objects.equals(configs.get(RETENTION_BYTES), "-1") && configs.get(RETENTION_BYTES) != null) {
174+
keys.add(RETENTION_BYTES);
175+
values.add(parseLong(configs.get(RETENTION_BYTES)));
176+
}
177+
if (!Objects.equals(configs.get(LOCAL_RETENTION_BYTES), "-2") && configs.get(LOCAL_RETENTION_BYTES) != null) {
178+
keys.add(LOCAL_RETENTION_BYTES);
179+
values.add(parseLong(configs.get(LOCAL_RETENTION_BYTES)));
180+
}
181+
keys.add(SEGMENT_BYTES);
182+
if (configs.get(SEGMENT_BYTES) != null) {
183+
values.add(parseLong(configs.get(SEGMENT_BYTES)));
184+
} else {
185+
values.add(1073741824L);
186+
}
187+
keys.add(MAX_MESSAGE_BYTES);
188+
if (configs.get(MAX_MESSAGE_BYTES) != null) {
189+
values.add(parseLong(configs.get(MAX_MESSAGE_BYTES)));
190+
} else {
191+
values.add(1048588L);
192+
}
193+
194+
for (int i = 0; i < values.size() - 1; i++) {
195+
Long current = values.get(i);
196+
Long next = values.get(i + 1);
197+
if (current != 0 && next != 0 && current < next) {
198+
199+
throw new IllegalArgumentException(
200+
String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)",
201+
keys.get(i), current,
202+
keys.get(i + 1), next
203+
));
204+
}
205+
}
206+
207+
}
208+
209+
private static long parseLong(String value) {
210+
try {
211+
return value != null ? Long.parseLong(value) : 0L;
212+
} catch (NumberFormatException e) {
213+
return 0L;
214+
}
215+
}
216+
217+
}

0 commit comments

Comments
 (0)