-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-4097: Fix SmartMessageConverter support in batch listeners #4121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, take a look into this article: https://cbea.ms/git-commit/.
The point is to say as much as possible about why? in the commit message, rather then what?.
Your comment on the PR and in the issue is a great foundation about what to put into commit message.
Side note: the first commit message becomes PR title and description.
Therefore, no need in extra work editing it.
And in the long run no one would think about PRs and issue. We would just look into commit history to determine what is going on.
This way I believe just with a single comprehensive commit message we shot a lot of birds and save some time from typing and reviewing.
With the current state I have to read everything to understand what is going.
Please, think about this while we are reviewing your contribution.
Thank you!
|
Alright, will do. Moving forward I will keep this in mind and commit
with clear and concise messages.
Thank you for informing me, I apologize for any inconvenience caused
on my end.
Message ID: ***@***.***
… .com>
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for contribution!
I think we need to dedicate more time for reverse engineering to see what is going on and how it can be fixed.
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
|
Thanks for the review @artembilan! I'll make those formatting fixes.
Regarding the core logic - you're absolutely right to question this. Let me
investigate if MessagingMessageConverter already handles the
SmartMessageConverter internally when we call toMessage().
I'll update the PR with my findings and the necessary changes.
Thank you.
… Message ID: <spring-projects/spring-kafka/pull/4121/review/3369915950@
github.com>
|
a41485a to
5a43d31
Compare
|
Hi @artembilan, I've addressed all your feedback in the latest commit. The changes include:
All tests pass and the fix is ready for review. Thanks for the detailed feedback! |
|
Thank you for an update! One note though: no need to squash commits all the time. |
|
Alright, thank you for letting me know. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java:146: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
> Task :spring-kafka:checkstyleMain
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java:161: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java:228: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java:243: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java:244: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
Please, run gradlew check locally before pushing to PR.
Thanks
BatchMessagingMessageConverter was missing setMessagingConverter() method that exists in MessagingMessageConverter, causing SmartMessageConverter configured via @KafkaListener(contentTypeConverter) to be ignored in batch listeners. This inconsistency between regular and batch listeners leads to ClassCastException when byte[] values aren't converted to the expected String type, breaking the contract that SmartMessageConverter should work the same way regardless of listener type. The fix ensures SmartMessageConverter propagation works consistently by: - Adding setMessagingConverter() to BatchMessagingMessageConverter that delegates to underlying MessagingMessageConverter - Overriding setMessagingConverter() in BatchMessagingMessageListenerAdapter to propagate the converter to batch converter - Maintaining the same SmartMessageConverter behavior between regular and batch listeners Fixes spring-projectsGH-4097 Signed-off-by: Jujuwryy <[email protected]>
|
All done. Builds were successful. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add your real name to the @author tag of all the affected classes.
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Outdated
Show resolved
Hide resolved
|
Please, don't squash commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The annotations in commit messages have to be back-ticked like code snippets to avoid GH user invitation.
You can use -s Git command option to add Singed-of-by automatically into each commit.
…ntation Integration testing revealed the root cause of spring-projectsGH-4097. When Spring processes a `@KafkaListener` with `contentTypeConverter` and `batch="true"`, the framework: 1. Calls `setBatchMessageConverter()` on the adapter 2. This internally calls `setMessageConverter()` which sets `converterSet=true` 3. Spring then tries to apply `contentTypeConverter` by calling `setMessagingConverter()` 4. The parent's validation `Assert.isTrue(!this.converterSet, ...)` blocks this The unit test didn't catch this because it bypassed the adapter and Spring framework integration entirely. Changes: - `BatchMessagingMessageListenerAdapter.setMessagingConverter()`: Override now directly applies `SmartMessageConverter` to batch converter (which propagates to record converter) without calling super, bypassing the validation that doesn't apply to the batch listener workflow - `BatchSmartMessageConverterTests`: Replaced unit test with full integration test using `@SpringJUnitConfig`, `@EmbeddedKafka`, `ConcurrentKafkaListenerContainerFactory`, and `@KafkaListener` to verify the complete framework flow - Added minimal `ByteArrayToStringConverter` (24 lines) for testing as no existing Spring Framework converter provides simple byte[] to String conversion needed for this test scenario All tests pass and checkstyle validation successful. Signed-off-by: Jujuwryy <[email protected]>
|
Apologies for the force push - I needed to amend the commit to add the DCO sign-off and use backticks around annotations to avoid GitHub mentions. The latest commit does include a small change to the "setMessagingConverter" method in the BatchMessagingMessageListenerAdapter.java. Thank you. |
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java
Show resolved
Hide resolved
|
Hello @artembilan, I tested the setMessagingConverter() implementation both with and without calling super.setMessagingConverter(messageConverter). The Issue with Calling Super: "text Root Cause: setBatchMessageConverter() internally calls setMessageConverter(), which sets converterSet = true in the parent class When Spring later processes @KafkaListener(contentTypeConverter = "...") and calls setMessagingConverter(), the parent's validation Assert.isTrue(!this.converterSet) blocks it This creates a timing conflict specific to batch listeners due to their different initialization flow Current Solution: We bypass the parent's restrictive validation We directly propagate the converter to the batch converter The batch converter then properly delegates to the record converter via its own setMessagingConverter() method This approach respects the different initialization sequence required for batch listeners Regarding the Test Converter: I've also incorporated your previous feedback by: Using pattern matching for instanceof where applicable. The current implementation ensures that @KafkaListener(contentTypeConverter = "...") works consistently for both record and batch listeners without triggering the framework's validation conflicts. Kindly let me know if i should re - commit the code. |
- Use pattern matching for instanceof in setMessagingConverter() to avoid explicit casting - Fix constructor parameter indentation to use tabs only (not mixed spaces) - Address checkstyle violations per reviewer feedback These changes improve code readability without affecting functionality. Signed-off-by: Jujuwryy <[email protected]>
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...in/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java
Outdated
Show resolved
Hide resolved
- Fix toMessagingMessage() parameter indentation in BatchMessagingMessageListenerAdapter - Fix toMessage() parameter indentation in BatchMessagingMessageConverter - Use single tab indentation consistently per Spring code style Signed-off-by: Jujuwryy <[email protected]>
|
Hi @artembilan, hope you're doing well! Just wanted to gently bump this PR. Happy to address any feedback or make changes if needed. No rush—just making sure it's still on your radar. Thanks for all your work on this project. |
|
@Jujuwryy We will get this merged early next week. Thanks! |
|
Thank you! Looking forward to it. Let me know if you need anything from my end. |
| * properly propagated to the batch converter, which will then propagate it to the | ||
| * record converter for message conversion in batch listeners. | ||
| * <p> | ||
| * This override does not call the parent implementation because the parent's validation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well.
I see there is a case in this adapter where we do call a super class and its per-record conversion capabilities:
for (ConsumerRecord<K, V> cRecord : records) {
messages.add(toMessagingMessage(cRecord, acknowledgment, consumer));
}
Don't we need to address that somehow as well?
I mean if we don't call here super to setup SmartMessageConverter into the target MessagingMessageConverter, the per-record conversion for messages list might not work well.
What do I miss, please?
| new ConcurrentKafkaListenerContainerFactory<>(); | ||
| factory.setConsumerFactory(consumerFactory(embeddedKafka)); | ||
| factory.setBatchListener(true); | ||
| // Set up batch converter with record converter - framework will propagate SmartMessageConverter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, this is not OK.
The listener container factory is shared and can be used in different @KafkaListener declarations.
One of them may have contentTypeConverter, another does not, but both of them would end up having it.
Or even worse: one wants one contentTypeConverter, another - different, but all of them are going to have only one and not clear which.
I'm not sure what we are trying to fix here, but I feel like we are making it even more broken with mutating shared BatchMessagingMessageConverter.
|
Proposed Solution:
This should ensure: Each listener has an independent converter instance I'll add comprehensive tests verifying that multiple listeners with different converters work independently. |
I think that is good codding task with a lot of interesting obstacles. exists. Exactly avoid a configuration in different places. We need to look making |
|
Thank you for the guidance. I'll simplify this:
This removes the paradox of choice—users can rely on the annotation configuration, and the framework handles propagation. |
|
I would like to avoid cloning at all. |
|
Thank you for the guidance—it's been very helpful in clarifying the right approach. I'm planning to remove all cloning logic and update the implementation to use the same Assert.isTrue(!this.converterSet, ...) validation pattern as the parent class. Here's my proposed approach for setMessagingConverter() in BatchMessagingMessageListenerAdapter:
I'll also update the BatchMessagingMessageConverter default constructor to always create a MessagingMessageConverter by default, so users can rely on the annotation path without needing to set the converter on the factory. This approach should:
I'll implement these changes and add comprehensive tests, including verification that multiple listeners with different contentTypeConverter values work independently. Does this approach sound better? |
|
Yes, your last proposal sounds right in my head. If that is not too much for you to comprehend, please, implement as you feel and we will go from there. Thank you! |
|
Hahahahaha. Let's hope that this proposal moves in the right direction😂 . I initially tried bypassing the parent's validation because: I saw that setBatchMessageConverter() calls setMessageConverter(), which sets converterSet = true. And my test set a converter on the factory. So when the annotation tried to call setMessagingConverter(), the validation failed. I thought the validation was blocking batch listeners. But the validation is working as intended — it prevents configuration conflicts. The issue was my test setup (factory setter + annotation), not the validation. Which is why I appreciate your continuous reviews and comments. It clarifies my thought process. Ill work on it and have a push in a little while. Thank you. |
Fixes spring-projectsGH-4097 Problem: When using `@KafkaListener(contentTypeConverter = "...")` with a batch listener (`batch = "true"`), the SmartMessageConverter was not being applied during message conversion, resulting in ClassCastException when trying to process messages that required conversion (e.g., byte[] to String). Root Cause: The issue stemmed from two problems: 1. `BatchMessagingMessageConverter` default constructor created an instance without a `RecordMessageConverter` (`this(null)`), preventing proper per-record conversion within batch processing. 2. `BatchMessagingMessageListenerAdapter` did not override `setMessagingConverter()` to propagate the SmartMessageConverter from the annotation configuration to the batch converter's record converter. Additionally, the initial approach of bypassing the parent's validation in `setMessagingConverter()` conflicted with the framework's design to prevent the "paradox of choice" - users should configure converters either via factory setter OR annotation attribute, not both. Solution: 1. Changed `BatchMessagingMessageConverter` default constructor to always create a `MessagingMessageConverter` by default (`this(new MessagingMessageConverter())`). This ensures batch converters always have a record converter for per-record conversion within batches, and enables annotation-only configuration without requiring users to set converters on the factory. 2. Added `setMessagingConverter()` method to `BatchMessagingMessageConverter` that propagates the SmartMessageConverter to its internal record converter when it's an instance of `MessagingMessageConverter`. 3. Overrode `setMessagingConverter()` in `BatchMessagingMessageListenerAdapter` to: - Call `super.setMessagingConverter(messageConverter)` first, which applies the same validation as the parent class (`Assert.isTrue(!this.converterSet)`) to prevent configuration conflicts between factory setter and annotation. - Propagate the SmartMessageConverter to the batch converter's record converter for proper message conversion in batch processing. This approach: - Respects the framework's validation to prevent the "paradox of choice" - Ensures both batch and per-record conversion paths work correctly - Allows users to configure converters via annotation without needing to set converters on the factory (annotation-only configuration) - Avoids the complexity and potential issues of cloning converters - Works for multiple listeners with different contentTypeConverter values Testing: Added comprehensive integration test `BatchSmartMessageConverterTests` that: - Verifies SmartMessageConverter works with batch listeners using `@KafkaListener(contentTypeConverter = "...")` - Tests multiple listeners with different converters to ensure isolation - Uses `ConcurrentKafkaListenerContainerFactory` with batch mode to verify annotation attributes propagate correctly to the record converter Signed-off-by: Jujuwryy <[email protected]>
|
Updated changes are committed. Thank you. |
| * properly propagated to the batch converter's record converter for message conversion | ||
| * in batch listeners. | ||
| * <p> | ||
| * Uses the same validation as the parent class to prevent the paradox of choice: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "paradox of choice" is a bit abrupt to be used in the docs.
That is something we can say when we fight for the code in review, but that is not what suppose to go to the official technical documentation.
It is better to get rid off of this paragraph altogether, or at least say something like:
This method cannot be called after setBatchMessageConverter() as it causes a mutation of the internal batchMessageConverter.
Instead, the SmartMessageConverter has to be provided on the external BatchMessageConverter.
Might not the best my English, but this is a gist what I'd like to see as method Javadoc.
| } | ||
|
|
||
| /** | ||
| * Set a spring-messaging {@link SmartMessageConverter} to convert the record value to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "spring-messaging" phrase is redundant here.
It is obvious from the {@link SmartMessageConverter} what class are we talking about.
And for some people who think about Spring from the whole Spring Boot auto-configuration as a single entity it is confusing what is a messaging if we deal with Apache Kafka in this context.
|
|
||
| /** | ||
| * Integration tests for SmartMessageConverter support in batch listeners. | ||
| * Reproduces and verifies the fix for the issue described in GH-4097. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if I'm about to add new tests about smart conversion with the batch in the future?
Should I start a new test class since you have locked this into that specific issue?
I didn't mean to offend you.
Just questioning the reasoning behind issues numbers in the constantly evolving code.
😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, good point. I've removed the GH-4097 reference so the test class isn't locked to a specific issue. Future SmartMessageConverter tests can use this class without issue. Thanks for the catch.
| this.template.send("smartBatchTopic", "world".getBytes()); | ||
|
|
||
| assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); | ||
| assertThat(listener.received).hasSize(2).containsExactly("hello", "world"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the containsExactly() API, the hasSize() is redundant.
| listener2.reset(1); | ||
|
|
||
| this.template.send("smartBatchTopic", "foo".getBytes()); | ||
| this.template.send("smartBatchTopic2", "bar".getBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have a neutral open source code, without foo/bar.
I understand that there is:
Cultural significance:
Their presence is a part of programming culture and history, signaling a common understanding among developers.
But that does not mean that our language should be not literary-correct to emphasize our diversion from the rest of the world.
Please, consider to use other words.
The listener1Data, listener2Data might be OK.
Thanks
- Improve Javadoc clarity with technical explanation for method constraints - Remove redundant phrase from BatchMessagingMessageConverter documentation - Generalize test class description for future extensibility - Clean up test assertions by removing redundant size checks - Use descriptive variable names in test data Signed-off-by: Jujuwryy <[email protected]>
|
Update code has been committed. Thank you. |
| * This method cannot be called after {@link #setBatchMessageConverter(BatchMessageConverter) | ||
| * setBatchMessageConverter()} as it would cause a mutation of the internal | ||
| * batchMessageConverter. Instead, the SmartMessageConverter has to be provided on the | ||
| * external BatchMessageConverter. Since {@link BatchMessagingMessageConverter} now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, revise this Javadoc for code snippets.
I believe that setBatchMessageConverter() link is a bit off.
The batchMessageConverter as to be link, as well as SmartMessageConverter and BatchMessageConverter.
- Add proper links for SmartMessageConverter and BatchMessageConverter - Use code formatting for batchMessageConverter field reference - Simplify setBatchMessageConverter method link format Signed-off-by: Jujuwryy <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Will merge when PR build is green.
Thank you!
|
Thank you for the thorough review and guidance throughout the process! |
|
Thank you for contribution; looking forward for more! |
Fixes #4097
Problem: SmartMessageConverter was ignored in batch listeners, causing ClassCastException
Root Cause: BatchMessagingMessageConverter lacked setMessagingConverter() method
Solution:
Testing: All existing tests pass, new tests verify the fix works
Note: This is my first contribution to Spring Kafka. Happy to make any requested changes!