-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| /* | ||
| * Copyright 2017-2025 the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.kafka.support; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| import org.apache.kafka.common.header.Header; | ||
|
|
||
| import org.springframework.kafka.retrytopic.RetryTopicHeaders; | ||
|
|
||
| /** | ||
| * Extended Header Mapper based on {@link DefaultKafkaHeaderMapper}. | ||
| * This Header Mapper manages header values as a list, | ||
| * except for certain reserved headers. | ||
| * Other behaviors are identical to {@link DefaultKafkaHeaderMapper}. | ||
| * | ||
| * @author Sanghyeok An | ||
| * | ||
| * @since 4.0.0 | ||
| * | ||
| */ | ||
| public class MultiValueKafkaHeaderMapper extends DefaultKafkaHeaderMapper { | ||
|
||
|
|
||
| private final List<String> defaultSingleValueHeaderList = List.of( | ||
| KafkaHeaders.PREFIX, | ||
| KafkaHeaders.RECEIVED, | ||
| KafkaHeaders.TOPIC, | ||
| KafkaHeaders.KEY, | ||
| KafkaHeaders.PARTITION, | ||
| KafkaHeaders.OFFSET, | ||
| KafkaHeaders.RAW_DATA, | ||
| KafkaHeaders.RECORD_METADATA, | ||
| KafkaHeaders.ACKNOWLEDGMENT, | ||
| KafkaHeaders.CONSUMER, | ||
| KafkaHeaders.RECEIVED_TOPIC, | ||
| KafkaHeaders.RECEIVED_KEY, | ||
| KafkaHeaders.RECEIVED_PARTITION, | ||
| KafkaHeaders.TIMESTAMP_TYPE, | ||
| KafkaHeaders.TIMESTAMP, | ||
| KafkaHeaders.RECEIVED_TIMESTAMP, | ||
| KafkaHeaders.NATIVE_HEADERS, | ||
| KafkaHeaders.BATCH_CONVERTED_HEADERS, | ||
| KafkaHeaders.CORRELATION_ID, | ||
| KafkaHeaders.REPLY_TOPIC, | ||
| KafkaHeaders.REPLY_PARTITION, | ||
| KafkaHeaders.DLT_EXCEPTION_FQCN, | ||
| KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, | ||
| KafkaHeaders.DLT_EXCEPTION_STACKTRACE, | ||
| KafkaHeaders.DLT_EXCEPTION_MESSAGE, | ||
| KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE, | ||
| KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE, | ||
| KafkaHeaders.DLT_KEY_EXCEPTION_FQCN, | ||
| KafkaHeaders.DLT_ORIGINAL_TOPIC, | ||
| KafkaHeaders.DLT_ORIGINAL_PARTITION, | ||
| KafkaHeaders.DLT_ORIGINAL_OFFSET, | ||
| KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, | ||
| KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, | ||
| KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, | ||
| KafkaHeaders.GROUP_ID, | ||
| KafkaHeaders.DELIVERY_ATTEMPT, | ||
| KafkaHeaders.EXCEPTION_FQCN, | ||
| KafkaHeaders.EXCEPTION_CAUSE_FQCN, | ||
| KafkaHeaders.EXCEPTION_STACKTRACE, | ||
| KafkaHeaders.EXCEPTION_MESSAGE, | ||
| KafkaHeaders.KEY_EXCEPTION_STACKTRACE, | ||
| KafkaHeaders.KEY_EXCEPTION_MESSAGE, | ||
| KafkaHeaders.KEY_EXCEPTION_FQCN, | ||
| KafkaHeaders.ORIGINAL_TOPIC, | ||
| KafkaHeaders.ORIGINAL_PARTITION, | ||
| KafkaHeaders.ORIGINAL_OFFSET, | ||
| KafkaHeaders.ORIGINAL_TIMESTAMP, | ||
| KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, | ||
| KafkaHeaders.CONVERSION_FAILURES, | ||
| KafkaHeaders.LISTENER_INFO, | ||
| RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, | ||
| RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, | ||
| RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); | ||
|
|
||
| private final Set<String> singleValueHeaders = new HashSet<>(this.defaultSingleValueHeaderList); | ||
|
|
||
| /** | ||
| * Adds headers that the {@link MultiValueKafkaHeaderMapper} should handle as single values. | ||
| * @param headerName the header name. | ||
| */ | ||
| public void addSingleValueHeader(String headerName) { | ||
| this.singleValueHeaders.add(headerName); | ||
| } | ||
|
|
||
| @Override | ||
| protected void handleHeader(String headerName, Header header, Map<String, Object> headers) { | ||
| if (this.singleValueHeaders.contains(headerName)) { | ||
| headers.put(headerName, headerValueToAddIn(header)); | ||
| } | ||
| else { | ||
| Object values = headers.getOrDefault(headerName, new ArrayList<>()); | ||
|
|
||
| if (values instanceof List) { | ||
| @SuppressWarnings("unchecked") | ||
| List<Object> castedValues = (List<Object>) values; | ||
| castedValues.add(headerValueToAddIn(header)); | ||
| headers.put(headerName, castedValues); | ||
| } | ||
| else { | ||
| headers.put(headerName, headerValueToAddIn(header)); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mapping from Kafka headers, So, I believe this name is better in this context:
fromUserHeader.Since standard headers have been already mapped before.