-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -179,3 +179,80 @@ MessagingMessageConverter converter() { | |
|
|
||
| If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template. | ||
|
|
||
| [[multi-value-header]] | ||
| == Support multi-value header mapping | ||
|
|
||
| Starting with 4.0, multi-value header mapping is supported, where the same logical header key appears more than once in a Kafka record. | ||
|
|
||
| By default, the `HeaderMapper` does **not** create multiple Kafka headers with the same name. | ||
| Instead, when it encounters a collection value (e.g., a `List<byte[]>`), it serializes the entire collection into **one** Kafka header whose value is a JSON array. | ||
|
|
||
| * **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it. | ||
| * **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded. | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| For example, on the producer side: | ||
| [source, java] | ||
| ---- | ||
| Message<String> message = MessageBuilder | ||
| .withPayload("test-multi-value-header") | ||
| .setHeader("test-multi-value-header", List.of("test-value1", "test-value2")) | ||
| .build(); | ||
|
|
||
| DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); | ||
| RecordHeaders mappedHeaders = new RecordHeaders(); | ||
| headerMapper.fromHeaders(message.getHeaders(), mappedHeaders); | ||
|
|
||
| ObjectMapper objectMapper = new ObjectMapper(); | ||
| RecordHeader expectedHeader = new RecordHeader("test-multi-value-header", | ||
| objectMapper.writeValueAsBytes(List.of("test-value1", "test-value2"))); | ||
| assertThat(mappedHeaders.headers(multiValueHeader1).iterator().next()).isEqualTo(expectedHeader); | ||
|
||
| ---- | ||
|
|
||
| For example, on the consumer side: | ||
| [source, java] | ||
| ---- | ||
| RecordHeaders recordHeaders = new RecordHeaders(); | ||
| recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 1 }); | ||
| recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 2 }); | ||
|
|
||
| Map<String, Object> mappedHeaders = new HashMap<>(); | ||
| headerMapper.toHeaders(recordHeaders, mappedHeaders); | ||
|
|
||
| assertThat(headersMapped.get("test-multi-value1")).isEqualTo(new byte[] { 0, 0, 0, 2 }); | ||
|
||
| ---- | ||
|
|
||
| Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued. | ||
|
|
||
| `DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names. | ||
|
|
||
| [source, java] | ||
| ---- | ||
| DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); | ||
|
|
||
| // Explicit header names | ||
| mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2"); | ||
|
|
||
| // Wildcard patterns for test-multi-value1, test-multi-value2 | ||
| mapper.setMultiValueHeaderPatterns("test-multi-*"); | ||
| ---- | ||
|
|
||
| Any header whose name matches one of the supplied patterns is | ||
|
|
||
| * **Producer side:** written as separate Kafka headers, one per element. | ||
| * **Consumer side:** collected into a `List<?>` that contains the individual header values; each element is returned to the application **after the usual deserialization or type conversion performed by the configured `HeaderMapper`.** | ||
|
|
||
| [NOTE] | ||
| ==== | ||
| Regular expressions are *not* supported; only the `*` wildcard is allowed in simple patterns—supporting direct equality and forms such as: | ||
|
|
||
| - `xxx*` | ||
| - `*xxx` | ||
| - `*xxx*` | ||
| - `xxx*yyy` | ||
|
||
| ==== | ||
|
|
||
| [IMPORTANT] | ||
| ==== | ||
| All elements collected under the same multi‑value header **must be of the same Java type**. | ||
| Mixing, for example, `String` and `byte[]` values under a single header key is not supported and will lead to a conversion error. | ||
|
||
| ==== | ||
Uh oh!
There was an error while loading. Please reload this page.