Skip to content

Commit 2869268

Browse files
authored
Merge branch 'main' into main
2 parents eeae02b + 772d137 commit 2869268

File tree

24 files changed

+1376
-42
lines changed

24 files changed

+1376
-42
lines changed

api/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ dependencies {
6262
implementation libs.netty.common
6363
implementation libs.netty.handler
6464

65+
66+
// Google Managed Service for Kafka IAM support
67+
implementation (libs.google.managed.kafka.login.handler) {
68+
exclude group: 'com.google.oauth-client', module: 'google-oauth-client'
69+
}
70+
implementation (libs.google.oauth.client) {
71+
because("CVE Fix: It is excluded above because of a vulnerability")
72+
}
73+
6574
// Annotation processors
6675
implementation libs.lombok
6776
implementation libs.mapstruct

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.google.common.collect.ImmutableCollection;
99
import com.google.common.collect.ImmutableSet;
10+
import com.google.protobuf.NullValue;
1011
import dev.cel.common.CelAbstractSyntaxTree;
1112
import dev.cel.common.CelOptions;
1213
import dev.cel.common.CelValidationException;
@@ -26,6 +27,7 @@
2627
import io.kafbat.ui.exception.CelException;
2728
import io.kafbat.ui.model.TopicMessageDTO;
2829
import java.util.HashMap;
30+
import java.util.LinkedHashMap;
2931
import java.util.Map;
3032
import java.util.Objects;
3133
import java.util.Optional;
@@ -38,11 +40,13 @@
3840
@Slf4j
3941
@UtilityClass
4042
public class MessageFilters {
43+
4144
private static final String CEL_RECORD_VAR_NAME = "record";
4245
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName();
4346

4447
private static final CelCompiler CEL_COMPILER = createCompiler();
4548
private static final CelRuntime CEL_RUNTIME = createRuntime();
49+
private static final Object CELL_NULL_VALUE = NullValue.NULL_VALUE;
4650

4751
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4852

@@ -188,10 +192,34 @@ private static Object parseToJsonOrReturnAsIs(@Nullable String str) {
188192
}
189193

190194
try {
191-
return OBJECT_MAPPER.readValue(str, new TypeReference<Map<String, Object>>() {
192-
});
195+
//@formatter:off
196+
var map = OBJECT_MAPPER.readValue(str, new TypeReference<Map<String, Object>>() {});
197+
//@formatter:on
198+
return replaceCelNulls(map);
193199
} catch (JsonProcessingException e) {
194200
return str;
195201
}
196202
}
203+
204+
@SuppressWarnings("unchecked")
205+
private static Map<String, Object> replaceCelNulls(Map<String, Object> map) {
206+
var result = new LinkedHashMap<String, Object>();
207+
208+
for (var entry : map.entrySet()) {
209+
String key = entry.getKey();
210+
Object value = entry.getValue();
211+
212+
if (value == null) {
213+
result.put(key, CELL_NULL_VALUE);
214+
} else if (value instanceof Map<?, ?>) {
215+
var inner = (Map<String, Object>) value;
216+
result.put(key, replaceCelNulls(inner));
217+
} else {
218+
result.put(key, value);
219+
}
220+
}
221+
222+
return result;
223+
}
224+
197225
}

api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class KafkaConfigSanitizer {
2929
.addAll(kafkaConfigKeysToSanitize())
3030
.add(
3131
"basic.auth.user.info", /* For Schema Registry credentials */
32-
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
32+
"password", "secret", "token", "key", ".*credentials.*", "passphrase", /* General credential patterns */
3333
"aws.access.*", "aws.secret.*", "aws.session.*", /* AWS-related credential patterns */
3434
"connection.uri" /* mongo credential patterns */
3535
)

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.kafbat.ui.exception.NotFoundException;
1414
import io.kafbat.ui.exception.ValidationException;
1515
import io.kafbat.ui.util.KafkaVersion;
16+
import io.kafbat.ui.util.MetadataVersion;
1617
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
1718
import java.io.Closeable;
1819
import java.time.Duration;
@@ -49,6 +50,8 @@
4950
import org.apache.kafka.clients.admin.DescribeClusterOptions;
5051
import org.apache.kafka.clients.admin.DescribeClusterResult;
5152
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
53+
import org.apache.kafka.clients.admin.FeatureMetadata;
54+
import org.apache.kafka.clients.admin.FinalizedVersionRange;
5255
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
5356
import org.apache.kafka.clients.admin.ListOffsetsResult;
5457
import org.apache.kafka.clients.admin.ListTopicsOptions;
@@ -96,6 +99,7 @@
9699
@Slf4j
97100
@AllArgsConstructor
98101
public class ReactiveAdminClient implements Closeable {
102+
private static final String DEFAULT_UNKNOWN_VERSION = "Unknown";
99103

100104
public enum SupportedFeature {
101105
INCREMENTAL_ALTER_CONFIGS(2.3f),
@@ -114,8 +118,8 @@ public enum SupportedFeature {
114118
this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
115119
}
116120

117-
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
118-
@Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
121+
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, Optional<String> kafkaVersionStr) {
122+
@Nullable Float kafkaVersion = kafkaVersionStr.flatMap(KafkaVersion::parse).orElse(null);
119123
return Flux.fromArray(SupportedFeature.values())
120124
.flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
121125
.filter(Tuple2::getT2)
@@ -150,18 +154,28 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
150154
.orElse(desc.getNodes().iterator().next().id());
151155
return loadBrokersConfig(ac, List.of(targetNodeId))
152156
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
153-
.flatMap(configs -> {
154-
String version = "1.0-UNKNOWN";
157+
.zipWith(toMono(ac.describeFeatures().featureMetadata()))
158+
.flatMap(tuple -> {
159+
List<ConfigEntry> configs = tuple.getT1();
160+
FeatureMetadata featureMetadata = tuple.getT2();
161+
Optional<String> version = Optional.empty();
155162
boolean topicDeletionEnabled = true;
156163
for (ConfigEntry entry : configs) {
157164
if (entry.name().contains("inter.broker.protocol.version")) {
158-
version = entry.value();
165+
version = Optional.of(entry.value());
159166
}
160167
if (entry.name().equals("delete.topic.enable")) {
161168
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
162169
}
163170
}
164-
final String finalVersion = version;
171+
if (version.isEmpty()) {
172+
FinalizedVersionRange metadataVersion =
173+
featureMetadata.finalizedFeatures().get("metadata.version");
174+
if (metadataVersion != null) {
175+
version = MetadataVersion.findVersion(metadataVersion.maxVersionLevel());
176+
}
177+
}
178+
final String finalVersion = version.orElse(DEFAULT_UNKNOWN_VERSION);
165179
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
166180
return SupportedFeature.forVersion(ac, version)
167181
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kafbat.ui.util;
2+
3+
import java.util.Arrays;
4+
import java.util.Optional;
5+
6+
public enum MetadataVersion {
7+
IBP_3_0_IV1(1, "3.0-IV1"),
8+
IBP_3_1_IV0(2, "3.1-IV0"),
9+
IBP_3_2_IV0(3, "3.2-IV0"),
10+
IBP_3_3_IV0(4, "3.3-IV0"),
11+
IBP_3_3_IV1(5, "3.3-IV1"),
12+
IBP_3_3_IV2(6, "3.3-IV2"),
13+
IBP_3_3_IV3(7, "3.3-IV3"),
14+
IBP_3_4_IV0(8, "3.4-IV0"),
15+
IBP_3_5_IV0(9, "3.5-IV0"),
16+
IBP_3_5_IV1(10, "3.5-IV1"),
17+
IBP_3_5_IV2(11, "3.5-IV2"),
18+
IBP_3_6_IV0(12, "3.6-IV0"),
19+
IBP_3_6_IV1(13, "3.6-IV1"),
20+
IBP_3_6_IV2(14, "3.6-IV2"),
21+
IBP_3_7_IV0(15, "3.7-IV0"),
22+
IBP_3_7_IV1(16, "3.7-IV1"),
23+
IBP_3_7_IV2(17, "3.7-IV2"),
24+
IBP_3_7_IV3(18, "3.7-IV3"),
25+
IBP_3_7_IV4(19, "3.7-IV4"),
26+
IBP_3_8_IV0(20, "3.8-IV0"),
27+
IBP_3_9_IV0(21, "3.9-IV0"),
28+
IBP_4_0_IV0(22, "4.0-IV0"),
29+
IBP_4_0_IV1(23, "4.0-IV1"),
30+
IBP_4_0_IV2(24, "4.0-IV2"),
31+
IBP_4_0_IV3(25, "4.0-IV3"),
32+
IBP_4_1_IV0(26, "4.1-IV0");
33+
34+
private final int featureLevel;
35+
private final String release;
36+
37+
MetadataVersion(int featureLevel, String release) {
38+
this.featureLevel = featureLevel;
39+
this.release = release;
40+
}
41+
42+
public static Optional<String> findVersion(int featureLevel) {
43+
return Arrays.stream(values())
44+
.filter(v -> v.featureLevel == featureLevel)
45+
.findFirst().map(v -> v.release);
46+
}
47+
48+
}

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ void filterSpeedIsAtLeast5kPerSec() {
201201
assertThat(took).isLessThan(1000);
202202
assertThat(matched).isPositive();
203203
}
204+
205+
@Test
206+
void nullFiltering() {
207+
String msg = "{ \"field\": { \"inner\": null } }";
208+
209+
var f = celScriptFilter("record.value.field.inner == null");
210+
assertTrue(f.test(msg().content(msg)));
211+
212+
f = celScriptFilter("record.value.field.inner != null");
213+
assertFalse(f.test(msg().content(msg)));
214+
}
215+
204216
}
205217

206218
@Test

api/src/test/java/io/kafbat/ui/service/KafkaConfigSanitizerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void obfuscateCredentials() {
2727
assertThat(sanitizer.sanitize("main.consumer.sasl.jaas.config", "secret")).isEqualTo("******");
2828
assertThat(sanitizer.sanitize("database.password", "secret")).isEqualTo("******");
2929
assertThat(sanitizer.sanitize("basic.auth.user.info", "secret")).isEqualTo("******");
30+
assertThat(sanitizer.sanitize("private.key.passphrase", "secret")).isEqualTo("******");
3031

3132
//AWS var sanitizing
3233
assertThat(sanitizer.sanitize("aws.access.key.id", "secret")).isEqualTo("******");

0 commit comments

Comments
 (0)