From a1fb075cd97ae3d4d52a89b977a7e4a65ac38069 Mon Sep 17 00:00:00 2001 From: Ezequiel Keimel Date: Sun, 4 May 2025 05:15:00 +0100 Subject: [PATCH] BE: Closes #71 Messages: Show headers duplicates --- .../io/kafbat/ui/emitter/MessageFilters.java | 10 +++- .../ui/serdes/ConsumerRecordDeserializer.java | 14 +++--- .../ui/serdes/ProducerRecordCreator.java | 11 +++-- .../kafbat/ui/emitter/MessageFiltersTest.java | 22 +++++---- .../ConsumerRecordDeserializerTest.java | 46 ++++++++++++++++++- .../ui/serdes/ProducerRecordCreatorTest.java | 37 +++++++++++++++ .../ui/service/MessagesServiceTest.java | 2 +- .../kafbat/ui/service/SendAndReadTests.java | 6 +-- .../main/resources/swagger/kafbat-ui-api.yaml | 12 +++-- .../Topic/Messages/Filters/InfoModal.tsx | 4 +- .../MessageContent/MessageContent.tsx | 2 +- .../__tests__/MessageContent.spec.tsx | 2 +- .../Topic/Messages/__test__/Message.spec.tsx | 2 +- .../Topic/SendMessage/SendMessage.styled.tsx | 10 ++++ .../Topics/Topic/SendMessage/SendMessage.tsx | 35 ++++++++------ 15 files changed, 169 insertions(+), 46 deletions(-) create mode 100644 api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java index 7472c452f..cdc8b5ee2 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java @@ -14,6 +14,7 @@ import dev.cel.common.CelValidationResult; import dev.cel.common.types.CelType; import dev.cel.common.types.CelTypeProvider; +import dev.cel.common.types.ListType; import dev.cel.common.types.MapType; import dev.cel.common.types.SimpleType; import dev.cel.common.types.StructType; @@ -67,9 +68,14 @@ private static boolean headersContains(TopicMessageDTO msg, String searchString) } for (final var entry : headers.entrySet()) { - if (StringUtils.contains(entry.getKey(), searchString) || StringUtils.contains(entry.getValue(), searchString)) { + if (StringUtils.contains(entry.getKey(), searchString)) { return true; } + for (final var value : entry.getValue()) { + if (StringUtils.contains(value, searchString)) { + return true; + } + } } return false; @@ -143,7 +149,7 @@ private static CelCompiler createCompiler() { "timestampMs", SimpleType.INT, "keyAsText", SimpleType.STRING, "valueAsText", SimpleType.STRING, - "headers", MapType.create(SimpleType.STRING, SimpleType.STRING), + "headers", MapType.create(SimpleType.STRING, ListType.create(SimpleType.STRING)), "key", SimpleType.DYN, "value", SimpleType.DYN ); diff --git a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java index b4eb51493..2e3a57b12 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java @@ -6,8 +6,10 @@ import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; import lombok.RequiredArgsConstructor; @@ -63,13 +65,13 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) } private void fillHeaders(TopicMessageDTO message, ConsumerRecord rec) { - Map headers = new HashMap<>(); + Map> headers = new HashMap<>(); rec.headers().iterator() - .forEachRemaining(header -> - headers.put( - header.key(), - header.value() != null ? new String(header.value()) : null - )); + .forEachRemaining(header -> { + String key = header.key(); + String value = header.value() != null ? new String(header.value()) : null; + headers.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + }); message.setHeaders(headers); } diff --git a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java index e7abbad90..0cc521fb8 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java @@ -1,6 +1,7 @@ package io.kafbat.ui.serdes; import io.kafbat.ui.serde.api.Serde; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -19,7 +20,7 @@ public ProducerRecord create(String topic, @Nullable Integer partition, @Nullable String key, @Nullable String value, - @Nullable Map headers) { + @Nullable Map> headers) { return new ProducerRecord<>( topic, partition, @@ -29,10 +30,14 @@ public ProducerRecord create(String topic, ); } - private Iterable
createHeaders(Map clientHeaders) { + private Iterable
createHeaders(Map> clientHeaders) { RecordHeaders headers = new RecordHeaders(); - clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes()))); + clientHeaders.forEach((k, values) -> values.forEach(v -> headers.add(createRecord(k, v)))); return headers; } + private RecordHeader createRecord(String key, String value) { + return new RecordHeader(key, value == null ? null : value.getBytes()); + } + } diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index fdb02c2e7..9b11d9831 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -42,11 +42,17 @@ void returnsTrueWhenStringContainedInKeyOrContentOrHeadersOrInAllThree() { ); assertTrue( - filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", "value"))) + filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("abC", List.of("value")))) ); assertTrue( - filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", "some abC"))) + filter.test(msg().key("dfg").content("does-not-contain").headers(Map.of("x1", List.of("some abC")))) + ); + + assertTrue( + filter.test(msg().key("dfg") + .content("does-not-contain") + .headers(Map.of("x1", List.of("does-not-contain", "some abC")))) ); } @@ -65,7 +71,7 @@ void returnsFalseOtherwise() { ); assertFalse( - filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", "value"))) + filter.test(msg().key("aBc").content("AbC").headers(Map.of("abc", List.of("value")))) ); } @@ -97,12 +103,12 @@ void canCheckOffset() { @Test void canCheckHeaders() { - var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == 'v1'"); - assertTrue(f.test(msg().headers(Map.of("k1", "v1", "k2", "v2")))); - assertFalse(f.test(msg().headers(Map.of("k1", "unexpected", "k2", "v2")))); + var f = celScriptFilter("record.headers.size() == 2 && record.headers['k1'] == ['v1']"); + assertTrue(f.test(msg().headers(Map.of("k1", List.of("v1"), "k2", List.of("v2"))))); + assertFalse(f.test(msg().headers(Map.of("k1", List.of("unexpected"), "k2", List.of("v2"))))); - f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == 'v2'"); - assertTrue(f.test(msg().headers(Map.of("k2", "v2")))); + f = celScriptFilter("record.headers.size() == 1 && !has(record.headers.k1) && record.headers['k2'] == ['v2']"); + assertTrue(f.test(msg().headers(Map.of("k2", List.of("v2"))))); f = celScriptFilter("!has(record.headers) || record.headers.size() == 0"); assertTrue(f.test(msg().headers(Map.of()))); diff --git a/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java b/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java index 649ac203c..0bb9ec63a 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java @@ -1,13 +1,16 @@ package io.kafbat.ui.serdes; import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.kafbat.ui.model.TopicMessageDTO; import io.kafbat.ui.serde.api.DeserializeResult; import io.kafbat.ui.serde.api.Serde; +import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -22,9 +25,50 @@ void dataMaskingAppliedOnDeserializedMessage() { Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); - recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()))); + recordDeser.deserialize(record()); verify(maskerMock).apply(any(TopicMessageDTO.class)); } + @Test + void deserializeWithMultipleHeaderValues() { + UnaryOperator maskerMock = mock(); + when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0)); + Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); + + var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); + ConsumerRecord record = record(); + record.headers().add("headerKey", "headerValue1".getBytes()); + record.headers().add("headerKey", "headerValue2".getBytes()); + TopicMessageDTO message = recordDeser.deserialize(record); + + Map> headers = message.getHeaders(); + assertEquals(1, headers.size()); + assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey")); + } + + @Test + void deserializeWithMixedSingleAndMultipleHeaderValues() { + UnaryOperator maskerMock = mock(); + when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0)); + Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); + + var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); + ConsumerRecord record = record(); + record.headers().add("headerKey1", "singleValue".getBytes()); + record.headers().add("headerKey2", "multiValue1".getBytes()); + record.headers().add("headerKey2", "multiValue2".getBytes()); + TopicMessageDTO message = recordDeser.deserialize(record); + + Map> headers = message.getHeaders(); + assertEquals(1, headers.get("headerKey1").size()); + assertEquals(List.of("singleValue"), headers.get("headerKey1")); + assertEquals(2, headers.get("headerKey2").size()); + assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2")); + } + + private ConsumerRecord record() { + return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())); + } + } diff --git a/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java b/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java new file mode 100644 index 000000000..a023beb91 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java @@ -0,0 +1,37 @@ +package io.kafbat.ui.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; + +import io.kafbat.ui.serde.api.Serde; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +class ProducerRecordCreatorTest { + + @Test + void createWithHeaders() { + Serde.Serializer keySerializer = mock(Serde.Serializer.class); + Serde.Serializer valueSerializer = mock(Serde.Serializer.class); + + ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer); + Map> headers = Map.of( + "headerKey1", List.of("headerValue1"), + "headerKey2", List.of("headerValue2", "headerValue3") + ); + ProducerRecord record = recordCreator.create("topic", 1, "key", "value", headers); + + assertNotNull(record.headers()); + assertEquals(3, record.headers().toArray().length); + assertThat(record.headers()).containsExactlyInAnyOrder( + new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey2", "headerValue3".getBytes()) + ); + } +} diff --git a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java index 1fecae247..9c6892a3c 100644 --- a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java @@ -170,7 +170,7 @@ void execSmartFilterTestReturnsExecutionResult() { + "&& has(record.timestampMs) && has(record.offset)") .key("1234") .value("{ \"some\" : \"value\" } ") - .headers(Map.of("h1", "hv1")) + .headers(Map.of("h1", List.of("hv1"))) .offset(12345L) .timestampMs(System.currentTimeMillis()) .partition(1); diff --git a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java index 9e0164540..a2fb45763 100644 --- a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java +++ b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java @@ -412,7 +412,7 @@ void topicMessageMetadataJson() { .keySerde(SchemaRegistrySerde.name()) .content(JSON_SCHEMA_RECORD) .valueSerde(SchemaRegistrySerde.name()) - .headers(Map.of("header1", "value1")) + .headers(Map.of("header1", List.of("value1"))) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD); @@ -438,9 +438,9 @@ void headerValueNullPresentTest() { .keySerde(SchemaRegistrySerde.name()) .content(JSON_SCHEMA_RECORD) .valueSerde(SchemaRegistrySerde.name()) - .headers(Collections.singletonMap("header123", null)) + .headers(Collections.singletonMap("header123", Collections.singletonList(null))) ) - .doAssert(polled -> assertThat(polled.getHeaders().get("header123")).isNull()); + .doAssert(polled -> assertThat(polled.getHeaders().get("header123")).containsExactly((String) null)); } diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 454d78c7c..1778a7a90 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -3004,7 +3004,9 @@ components: headers: type: object additionalProperties: - type: string + type: array + items: + type: string partition: type: integer offset: @@ -3033,7 +3035,9 @@ components: headers: type: object additionalProperties: - type: string + type: array + items: + type: string content: type: string nullable: true @@ -3123,7 +3127,9 @@ components: headers: type: object additionalProperties: - type: string + type: array + items: + type: string content: type: string keyFormat: diff --git a/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx b/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx index bb134862f..8cf52f165 100644 --- a/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx +++ b/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx @@ -19,7 +19,7 @@ const InfoModal: React.FC = ({ toggleIsOpen }) => { value (json if possible) keyAsText valueAsText - header + headers partition timestampMs @@ -51,7 +51,7 @@ const InfoModal: React.FC = ({ toggleIsOpen }) => { record.headers.size() == 1 && !has(record.headers.k1) && - record.headers['k2'] == 'v2' + 'v2' in record.headers['k2'] diff --git a/frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx b/frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx index ad35399a9..02f7ef133 100644 --- a/frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx +++ b/frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx @@ -11,7 +11,7 @@ type Tab = 'key' | 'content' | 'headers'; export interface MessageContentProps { messageKey?: string; messageContent?: string; - headers?: { [key: string]: string | undefined }; + headers?: { [key: string]: string[] | undefined }; timestamp?: Date; timestampType?: TopicMessageTimestampTypeEnum; keySize?: number; diff --git a/frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx b/frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx index d76455242..1f823026f 100644 --- a/frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx +++ b/frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx @@ -17,7 +17,7 @@ const setupWrapper = (props?: Partial) => { { key: 'test-key', partition: 6, content: '{"data": "test"}', - headers: { header: 'test' }, + headers: { header: ['test'] }, }; const mockKeyFilters: PreviewFilter = { field: 'sub', diff --git a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.styled.tsx b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.styled.tsx index d2750abf7..9893b2090 100644 --- a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.styled.tsx +++ b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.styled.tsx @@ -34,3 +34,13 @@ export const FlexItem = styled.div` width: 100%; } `; +export const Headers = styled.div` + display: flex; + align-items: center; + + svg { + margin-left: 8px; + vertical-align: middle; + cursor: pointer; + } +`; diff --git a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx index 4bdb981f8..665890619 100644 --- a/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx +++ b/frontend/src/components/Topics/Topic/SendMessage/SendMessage.tsx @@ -3,8 +3,10 @@ import { useForm, Controller } from 'react-hook-form'; import { RouteParamsClusterTopic } from 'lib/paths'; import { Button } from 'components/common/Button/Button'; import Editor from 'components/common/Editor/Editor'; +import InfoIcon from 'components/common/Icons/InfoIcon'; import Select from 'components/common/Select/Select'; import Switch from 'components/common/Switch/Switch'; +import Tooltip from 'components/common/Tooltip/Tooltip'; import useAppParams from 'lib/hooks/useAppParams'; import { showAlert } from 'lib/errorHandling'; import { useSendMessage, useTopicDetails } from 'lib/hooks/api/topics'; @@ -233,22 +235,27 @@ const SendMessage: React.FC<{ closeSidebar: () => void }> = ({ -
+ Headers - ( - - )} + } + content='Header keys map to a list of values, e.g. {"k": ["v1", "v2"]}' + placement="top-start" /> -
+ + ( + + )} + />