diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java index b008e388e..7472c452f 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.NullValue; import dev.cel.common.CelAbstractSyntaxTree; import dev.cel.common.CelOptions; import dev.cel.common.CelValidationException; @@ -26,6 +27,7 @@ import io.kafbat.ui.exception.CelException; import io.kafbat.ui.model.TopicMessageDTO; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -38,11 +40,13 @@ @Slf4j @UtilityClass public class MessageFilters { + private static final String CEL_RECORD_VAR_NAME = "record"; private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName(); private static final CelCompiler CEL_COMPILER = createCompiler(); private static final CelRuntime CEL_RUNTIME = createRuntime(); + private static final Object CELL_NULL_VALUE = NullValue.NULL_VALUE; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -188,10 +192,34 @@ private static Object parseToJsonOrReturnAsIs(@Nullable String str) { } try { - return OBJECT_MAPPER.readValue(str, new TypeReference>() { - }); + //@formatter:off + var map = OBJECT_MAPPER.readValue(str, new TypeReference>() {}); + //@formatter:on + return replaceCelNulls(map); } catch (JsonProcessingException e) { return str; } } + + @SuppressWarnings("unchecked") + private static Map replaceCelNulls(Map map) { + var result = new LinkedHashMap(); + + for (var entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (value == null) { + result.put(key, CELL_NULL_VALUE); + } else if (value instanceof Map) { + var inner = (Map) value; + result.put(key, replaceCelNulls(inner)); + } else { + result.put(key, value); + } + } + + return result; + } + } diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index 7aafea5ab..fdb02c2e7 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -201,6 +201,18 @@ void filterSpeedIsAtLeast5kPerSec() { assertThat(took).isLessThan(1000); assertThat(matched).isPositive(); } + + @Test + void nullFiltering() { + String msg = "{ \"field\": { \"inner\": null } }"; + + var f = celScriptFilter("record.value.field.inner == null"); + assertTrue(f.test(msg().content(msg))); + + f = celScriptFilter("record.value.field.inner != null"); + assertFalse(f.test(msg().content(msg))); + } + } @Test