Skip to content

Commit e0db1c7

Browse files
steve-aom-elliottdsgrievetimtebeek
authored
Migrating Spring Kafka's SeekToCurrentErrorHandler to DefaultErrorHandler and updates method that was previously used to set the handler for AbstractKafkaListenerContainerFactory. (#900)
* Migrating Spring Kafka's `SeekToCurrentErrorHandler` to `DefaultErrorHandler` and updates method that was previously used to set the handler for `AbstractKafkaListenerContainerFactory`. * Adding license that was missed and `@DocumentExample` * Updating recipes.csv * update type tables * Helping a separate test to run on Windows * Add version suffix when there are multiple spring-kafka entries --------- Co-authored-by: David Grieve <david@moderne.io> Co-authored-by: Tim te Beek <tim@moderne.io>
1 parent dd19cb4 commit e0db1c7

File tree

8 files changed

+152
-24
lines changed

8 files changed

+152
-24
lines changed

build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ recipeDependencies {
123123
testParserClasspath("org.springframework.batch:spring-batch-infrastructure:5.+")
124124
testParserClasspath("org.springframework.batch:spring-batch-test:4.3.+")
125125

126+
testParserClasspath("org.springframework.kafka:spring-kafka:2.8.+")
127+
126128
testParserClasspath("org.springframework.boot:spring-boot-actuator:2.1.+")
127129
testParserClasspath("org.springframework.boot:spring-boot-actuator:3.0.+")
128130
testParserClasspath("org.springframework.boot:spring-boot-actuator:3.4.+")
442 Bytes
Binary file not shown.

src/main/resources/META-INF/rewrite/recipes.csv

Lines changed: 12 additions & 11 deletions
Large diffs are not rendered by default.

src/main/resources/META-INF/rewrite/spring-kafka-30.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,24 @@ recipeList:
4242
- org.openrewrite.java.ReplaceConstantWithAnotherConstant:
4343
existingFullyQualifiedConstantName: org.springframework.kafka.support.KafkaHeaders.RECEIVED_PARTITION_ID
4444
fullyQualifiedConstantName: org.springframework.kafka.support.KafkaHeaders.RECEIVED_PARTITION
45+
- org.openrewrite.java.spring.kafka.UpgradeSpringKafka_2_8_ErrorHandlers
46+
---
47+
type: specs.openrewrite.org/v1beta/recipe
48+
name: org.openrewrite.java.spring.kafka.UpgradeSpringKafka_2_8_ErrorHandlers
49+
displayName: Migrates Spring Kafka deprecated error handlers
50+
description: >-
51+
Migrate error handlers deprecated in Spring Kafka `2.8.x` to their replacements.
52+
tags:
53+
- spring
54+
- kafka
55+
recipeList:
56+
# `BiConsumer<T>` arguments to `SeekToCurrentErrorHandler` constructor also change to `ConsumerRecordRecoverer`, but values are assignable already
57+
- org.openrewrite.java.ChangeMethodName:
58+
methodPattern: org.springframework.kafka.listener.SeekToCurrentErrorHandler handle(..)
59+
newMethodName: handleRemaining
60+
- org.openrewrite.java.ChangeMethodName:
61+
methodPattern: org.springframework.kafka.config.AbstractKafkaListenerContainerFactory setErrorHandler(..)
62+
newMethodName: setCommonErrorHandler
63+
- org.openrewrite.java.ChangeType:
64+
oldFullyQualifiedTypeName: org.springframework.kafka.listener.SeekToCurrentErrorHandler
65+
newFullyQualifiedTypeName: org.springframework.kafka.listener.DefaultErrorHandler

src/test/java/org/openrewrite/java/spring/kafka/KafkaOperationsSendReturnTypeTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void defaults(RecipeSpec spec) {
3434
"spring-beans-5",
3535
"spring-context-5",
3636
"spring-core-5",
37-
"spring-kafka-2",
37+
"spring-kafka-2.9",
3838
"spring-messaging-5"
3939
));
4040
}
@@ -51,7 +51,7 @@ void changeKafkaOperationsSendReturnType() {
5151
import org.springframework.kafka.support.SendResult;
5252
import org.springframework.util.concurrent.ListenableFuture;
5353
import org.springframework.util.concurrent.ListenableFutureCallback;
54-
54+
5555
class Foo {
5656
void bar(KafkaOperations<String, String> kafkaOperations) {
5757
ListenableFuture<SendResult<String,String>> future = kafkaOperations.send("topic", "key", "value");
@@ -60,7 +60,7 @@ void bar(KafkaOperations<String, String> kafkaOperations) {
6060
public void onSuccess(SendResult<String, String> result) {
6161
System.out.println(result.getRecordMetadata());
6262
}
63-
63+
6464
@Override
6565
public void onFailure(Throwable ex) {
6666
System.err.println(ex.getMessage());
@@ -72,9 +72,9 @@ public void onFailure(Throwable ex) {
7272
"""
7373
import org.springframework.kafka.core.KafkaOperations;
7474
import org.springframework.kafka.support.SendResult;
75-
75+
7676
import java.util.concurrent.CompletableFuture;
77-
77+
7878
class Foo {
7979
void bar(KafkaOperations<String, String> kafkaOperations) {
8080
CompletableFuture<SendResult<String,String>> future = kafkaOperations.send("topic", "key", "value");
@@ -103,7 +103,7 @@ void changeKafkaTemplate() {
103103
import org.springframework.kafka.support.SendResult;
104104
import org.springframework.util.concurrent.ListenableFuture;
105105
import org.springframework.util.concurrent.ListenableFutureCallback;
106-
106+
107107
class Foo {
108108
void bar(KafkaTemplate<String, String> kafkaTemplate) {
109109
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send("topic", "key", "value");
@@ -112,7 +112,7 @@ void bar(KafkaTemplate<String, String> kafkaTemplate) {
112112
public void onSuccess(SendResult<String, String> result) {
113113
System.out.println(result.getRecordMetadata());
114114
}
115-
115+
116116
@Override
117117
public void onFailure(Throwable ex) {
118118
System.err.println(ex.getMessage());
@@ -124,9 +124,9 @@ public void onFailure(Throwable ex) {
124124
"""
125125
import org.springframework.kafka.core.KafkaTemplate;
126126
import org.springframework.kafka.support.SendResult;
127-
127+
128128
import java.util.concurrent.CompletableFuture;
129-
129+
130130
class Foo {
131131
void bar(KafkaTemplate<String, String> kafkaTemplate) {
132132
CompletableFuture<SendResult<String,String>> future = kafkaTemplate.send("topic", "key", "value");
@@ -153,15 +153,15 @@ void noReplacementElsewhereYet() {
153153
"""
154154
import org.springframework.util.concurrent.ListenableFuture;
155155
import org.springframework.util.concurrent.ListenableFutureCallback;
156-
156+
157157
class Foo {
158158
void bar(ListenableFuture<String> future) {
159159
future.addCallback(new ListenableFutureCallback<>() {
160160
@Override
161161
public void onSuccess(String result) {
162162
System.out.println(result);
163163
}
164-
164+
165165
@Override
166166
public void onFailure(Throwable ex) {
167167
System.err.println(ex.getMessage());
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2026 the original author or authors.
3+
* <p>
4+
* Licensed under the Moderne Source Available License (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* https://docs.moderne.io/licensing/moderne-source-available-license
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.openrewrite.java.spring.kafka;
17+
18+
import org.junit.jupiter.api.Test;
19+
import org.openrewrite.DocumentExample;
20+
import org.openrewrite.InMemoryExecutionContext;
21+
import org.openrewrite.java.JavaParser;
22+
import org.openrewrite.test.RecipeSpec;
23+
import org.openrewrite.test.RewriteTest;
24+
25+
import static org.openrewrite.java.Assertions.java;
26+
27+
class UpgradeSpringKafkaErrorHandlersTest implements RewriteTest {
28+
@Override
29+
public void defaults(RecipeSpec spec) {
30+
spec
31+
.recipeFromResources("org.openrewrite.java.spring.kafka.UpgradeSpringKafka_2_8_ErrorHandlers")
32+
.parser(JavaParser.fromJavaVersion().classpathFromResources(new InMemoryExecutionContext(),
33+
"kafka-clients",
34+
"spring-beans",
35+
"spring-context",
36+
"spring-kafka-2.8"
37+
));
38+
}
39+
40+
@DocumentExample
41+
@Test
42+
void migratesSeekToCurrentErrorHandler() {
43+
rewriteRun(
44+
//language=java
45+
java(
46+
"""
47+
import java.lang.Exception;
48+
import java.util.List;
49+
50+
import org.apache.kafka.clients.consumer.Consumer;
51+
import org.apache.kafka.clients.consumer.ConsumerRecord;
52+
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
53+
import org.springframework.kafka.listener.MessageListenerContainer;
54+
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
55+
56+
class A {
57+
private final SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler();
58+
59+
void method(AbstractKafkaListenerContainerFactory factory) {
60+
factory.setErrorHandler(handler);
61+
}
62+
63+
void anotherMethod(
64+
Exception exception,
65+
List<ConsumerRecord<?,?>> records,
66+
Consumer<?,?> consumer,
67+
MessageListenerContainer container
68+
) {
69+
handler.handle(exception, records, consumer, container);
70+
}
71+
}
72+
""",
73+
"""
74+
import java.lang.Exception;
75+
import java.util.List;
76+
77+
import org.apache.kafka.clients.consumer.Consumer;
78+
import org.apache.kafka.clients.consumer.ConsumerRecord;
79+
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
80+
import org.springframework.kafka.listener.DefaultErrorHandler;
81+
import org.springframework.kafka.listener.MessageListenerContainer;
82+
83+
class A {
84+
private final DefaultErrorHandler handler = new DefaultErrorHandler();
85+
86+
void method(AbstractKafkaListenerContainerFactory factory) {
87+
factory.setCommonErrorHandler(handler);
88+
}
89+
90+
void anotherMethod(
91+
Exception exception,
92+
List<ConsumerRecord<?,?>> records,
93+
Consumer<?,?> consumer,
94+
MessageListenerContainer container
95+
) {
96+
handler.handleRemaining(exception, records, consumer, container);
97+
}
98+
}
99+
"""
100+
)
101+
);
102+
}
103+
}

src/test/java/org/openrewrite/java/spring/search/FindSpringComponentsTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.openrewrite.test.RewriteTest;
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.openrewrite.PathUtils.separatorsToSystem;
2728
import static org.openrewrite.java.Assertions.java;
2829

2930
class FindSpringComponentsTest implements RewriteTest {
@@ -46,14 +47,14 @@ void findSpringComponents() {
4647
SpringComponentRelationships.Row::getSourceFile,
4748
SpringComponentRelationships.Row::getDependantType,
4849
SpringComponentRelationships.Row::getDependencyType)
49-
.contains("test/Config.java", "test.A", "test.B"),
50+
.contains(separatorsToSystem("test/Config.java"), "test.A", "test.B"),
5051
two ->
5152
assertThat(two)
5253
.extracting(
5354
SpringComponentRelationships.Row::getSourceFile,
5455
SpringComponentRelationships.Row::getDependantType,
5556
SpringComponentRelationships.Row::getDependencyType)
56-
.contains("test/C.java", "test.C", "test.B"))),
57+
.contains(separatorsToSystem("test/C.java"), "test.C", "test.B"))),
5758
//language=java
5859
java("package test; public class B {}"),
5960
//language=java
53.9 KB
Binary file not shown.

0 commit comments

Comments
 (0)