Skip to content

Commit c49940e

Browse files
committed
BE: Closes #71 Messages: Show headers duplicates
1 parent 0ad8695 commit c49940e

File tree

9 files changed

+143
-17
lines changed

9 files changed

+143
-17
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private static CelCompiler createCompiler() {
123123
"timestampMs", SimpleType.INT,
124124
"keyAsText", SimpleType.STRING,
125125
"valueAsText", SimpleType.STRING,
126-
"headers", MapType.create(SimpleType.STRING, SimpleType.STRING),
126+
"headers", MapType.create(SimpleType.STRING, SimpleType.DYN),
127127
"key", SimpleType.DYN,
128128
"value", SimpleType.DYN
129129
);

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import java.time.Instant;
77
import java.time.OffsetDateTime;
88
import java.time.ZoneId;
9+
import java.util.ArrayList;
910
import java.util.Arrays;
1011
import java.util.HashMap;
12+
import java.util.List;
1113
import java.util.Map;
1214
import java.util.function.UnaryOperator;
1315
import lombok.RequiredArgsConstructor;
@@ -63,14 +65,16 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType)
6365
}
6466

6567
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
66-
Map<String, String> headers = new HashMap<>();
68+
Map<String, List<String>> headersMap = new HashMap<>();
6769
rec.headers().iterator()
68-
.forEachRemaining(header ->
69-
headers.put(
70-
header.key(),
71-
header.value() != null ? new String(header.value()) : null
72-
));
73-
message.setHeaders(headers);
70+
.forEachRemaining(header -> {
71+
String key = header.key();
72+
String value = header.value() != null ? new String(header.value()) : null;
73+
headersMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
74+
});
75+
Map<String, Object> finalHeadersMap = new HashMap<>();
76+
headersMap.forEach((key, values) -> finalHeadersMap.put(key, values.size() == 1 ? values.get(0) : values));
77+
message.setHeaders(finalHeadersMap);
7478
}
7579

7680
private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {

api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.kafbat.ui.serdes;
22

3+
import io.kafbat.ui.exception.ValidationException;
34
import io.kafbat.ui.serde.api.Serde;
5+
import java.util.List;
46
import java.util.Map;
57
import javax.annotation.Nullable;
68
import lombok.RequiredArgsConstructor;
@@ -19,7 +21,7 @@ public ProducerRecord<byte[], byte[]> create(String topic,
1921
@Nullable Integer partition,
2022
@Nullable String key,
2123
@Nullable String value,
22-
@Nullable Map<String, String> headers) {
24+
@Nullable Map<String, Object> headers) {
2325
return new ProducerRecord<>(
2426
topic,
2527
partition,
@@ -29,10 +31,23 @@ public ProducerRecord<byte[], byte[]> create(String topic,
2931
);
3032
}
3133

32-
private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
34+
private Iterable<Header> createHeaders(Map<String, Object> clientHeaders) {
3335
RecordHeaders headers = new RecordHeaders();
34-
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes())));
36+
clientHeaders.forEach((k, v) -> {
37+
if (v instanceof List<?> valueList) {
38+
valueList.forEach(value -> headers.add(new RecordHeader(k, valueToBytes(value))));
39+
} else {
40+
headers.add(new RecordHeader(k, valueToBytes(v)));
41+
}
42+
});
3543
return headers;
3644
}
3745

46+
private byte[] valueToBytes(Object value) {
47+
if (value instanceof List<?> || value instanceof Map<?, ?>) {
48+
throw new ValidationException("Header values can only be string or list of strings");
49+
}
50+
return value != null ? String.valueOf(value).getBytes() : null;
51+
}
52+
3853
}

api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package io.kafbat.ui.serdes;
22

33
import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
46
import static org.mockito.Mockito.any;
57
import static org.mockito.Mockito.mock;
68
import static org.mockito.Mockito.verify;
9+
import static org.mockito.Mockito.when;
710

811
import io.kafbat.ui.model.TopicMessageDTO;
912
import io.kafbat.ui.serde.api.DeserializeResult;
1013
import io.kafbat.ui.serde.api.Serde;
14+
import java.util.List;
1115
import java.util.Map;
1216
import java.util.function.UnaryOperator;
1317
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,9 +26,51 @@ void dataMaskingAppliedOnDeserializedMessage() {
2226
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
2327

2428
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
25-
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
29+
recordDeser.deserialize(record());
2630

2731
verify(maskerMock).apply(any(TopicMessageDTO.class));
2832
}
2933

34+
@Test
35+
void deserializeWithMultipleHeaderValues() {
36+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
37+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
38+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
39+
40+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
41+
ConsumerRecord<Bytes, Bytes> record = record();
42+
record.headers().add("headerKey", "headerValue1".getBytes());
43+
record.headers().add("headerKey", "headerValue2".getBytes());
44+
TopicMessageDTO message = recordDeser.deserialize(record);
45+
46+
Map<String, Object> headers = message.getHeaders();
47+
assertEquals(1, headers.size());
48+
assertTrue(headers.get("headerKey") instanceof List);
49+
assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey"));
50+
}
51+
52+
@Test
53+
void deserializeWithMixedSingleAndMultipleHeaderValues() {
54+
UnaryOperator<TopicMessageDTO> maskerMock = mock();
55+
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
56+
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
57+
58+
var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
59+
ConsumerRecord<Bytes, Bytes> record = record();
60+
record.headers().add("headerKey1", "singleValue".getBytes());
61+
record.headers().add("headerKey2", "multiValue1".getBytes());
62+
record.headers().add("headerKey2", "multiValue2".getBytes());
63+
TopicMessageDTO message = recordDeser.deserialize(record);
64+
65+
Map<String, Object> headers = message.getHeaders();
66+
assertEquals("singleValue", headers.get("headerKey1"));
67+
assertTrue(headers.get("headerKey2") instanceof List);
68+
assertEquals(2, ((List<?>) headers.get("headerKey2")).size());
69+
assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2"));
70+
}
71+
72+
private ConsumerRecord<Bytes, Bytes> record() {
73+
return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()));
74+
}
75+
3076
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.kafbat.ui.serdes;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertThrows;
7+
import static org.mockito.Mockito.mock;
8+
9+
import io.kafbat.ui.exception.ValidationException;
10+
import io.kafbat.ui.serde.api.Serde;
11+
import java.util.List;
12+
import java.util.Map;
13+
import org.apache.kafka.clients.producer.ProducerRecord;
14+
import org.apache.kafka.common.header.internals.RecordHeader;
15+
import org.junit.jupiter.api.Test;
16+
17+
class ProducerRecordCreatorTest {
18+
19+
@Test
20+
void createWithHeaders() {
21+
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
22+
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);
23+
24+
ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
25+
Map<String, Object> headersMap = Map.of(
26+
"headerKey1", "headerValue1",
27+
"headerKey2", List.of("headerValue2", "headerValue3")
28+
);
29+
ProducerRecord<byte[], byte[]> record = recordCreator.create("topic", 1, "key", "value", headersMap);
30+
31+
assertNotNull(record.headers());
32+
assertEquals(3, record.headers().toArray().length);
33+
assertThat(record.headers()).containsExactlyInAnyOrder(
34+
new RecordHeader("headerKey1", "headerValue1".getBytes()),
35+
new RecordHeader("headerKey2", "headerValue2".getBytes()),
36+
new RecordHeader("headerKey2", "headerValue3".getBytes())
37+
);
38+
}
39+
40+
@Test
41+
void createWithInvalidHeaderValue() {
42+
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
43+
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);
44+
45+
ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
46+
Map<String, Object> invalidHeaders = Map.of("headerKey", Map.of("invalid", "value"));
47+
48+
assertThrows(ValidationException.class, () ->
49+
recordCreator.create("topic", 1, "key", "value", invalidHeaders));
50+
}
51+
}

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2919,8 +2919,9 @@ components:
29192919
type: string
29202920
headers:
29212921
type: object
2922+
description: header values can be string or list of strings (for keys with multiple values)
29222923
additionalProperties:
2923-
type: string
2924+
type: object
29242925
partition:
29252926
type: integer
29262927
offset:
@@ -2948,8 +2949,9 @@ components:
29482949
nullable: true
29492950
headers:
29502951
type: object
2952+
description: header values can be string or list of strings (for keys with multiple values)
29512953
additionalProperties:
2952-
type: string
2954+
type: object
29532955
content:
29542956
type: string
29552957
nullable: true
@@ -3038,8 +3040,9 @@ components:
30383040
type: string
30393041
headers:
30403042
type: object
3043+
description: header values can be string or list of strings (for keys with multiple values)
30413044
additionalProperties:
3042-
type: string
3045+
type: object
30433046
content:
30443047
type: string
30453048
keyFormat:

frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
5454
record.headers[&apos;k2&apos;] == &apos;v2&apos;
5555
</code>
5656
</S.ListItem>
57+
<S.ListItem>
58+
<code>
59+
&quot;v1&quot; in (type(record.headers.k1) == list ?
60+
record.headers.k1 : [record.headers.k1])
61+
</code> - in case a header can hold either a single value (string) or
62+
multiple values (list of strings)
63+
</S.ListItem>
5764
</ol>
5865
<Flexbox justifyContent="center" margin="20px 0 0 0">
5966
<Button

frontend/src/components/Topics/Topic/Messages/MessageContent/MessageContent.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type Tab = 'key' | 'content' | 'headers';
1111
export interface MessageContentProps {
1212
messageKey?: string;
1313
messageContent?: string;
14-
headers?: { [key: string]: string | undefined };
14+
headers?: { [key: string]: string | string[] | undefined };
1515
timestamp?: Date;
1616
timestampType?: TopicMessageTimestampTypeEnum;
1717
keySize?: number;

frontend/src/components/Topics/Topic/Messages/MessageContent/__tests__/MessageContent.spec.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const setupWrapper = (props?: Partial<MessageContentProps>) => {
1717
<MessageContent
1818
messageKey='"test-key"'
1919
messageContent='{"data": "test"}'
20-
headers={{ header: 'test' }}
20+
headers={{ header1: 'test', header2: ['value1', 'value2'] }}
2121
timestamp={new Date(0)}
2222
timestampType={TopicMessageTimestampTypeEnum.CREATE_TIME}
2323
keySerde="SchemaRegistry"

0 commit comments

Comments
 (0)