diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3a69b11d..1a6c0a00 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,7 +11,7 @@ micrometer-docs-gen = "1.0.4" micrometer-tracing = "1.6.0-RC1" protobuf = "3.25.8" pulsar = "4.1.1" -spring = "7.0.0-M9" +spring = "7.0.0-RC1" # tests assertj = "3.27.6" awaitility = "4.3.0" diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 53949411..0e472cb4 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -77,8 +77,6 @@ import org.springframework.pulsar.transaction.PulsarTransactionUtils; import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -604,14 +602,10 @@ private Observation newObservation(Message message) { private void dispatchMessageToListenerInTxn(Message message, AtomicBoolean inRetryMode) { try { requireNonNull(this.transactionTemplate, "transactionTemplate must not be null") - .execute(new TransactionCallbackWithoutResult() { - @Override - protected void doInTransactionWithoutResult(TransactionStatus status) { - RuntimeException aborted = dispatchMessageToListener(message, inRetryMode, - getTransaction()); - if (aborted != null) { - throw aborted; - } + .executeWithoutResult((status) -> { + RuntimeException aborted = dispatchMessageToListener(message, inRetryMode, getTransaction()); + if (aborted != null) { + throw aborted; } }); } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java index 3bff1c07..f2d380c1 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/AbstractPulsarMessageToSpringMessageAdapter.java @@ -132,6 +132,7 @@ protected boolean isPulsarMessageList() { return this.isPulsarMessageList; } + @SuppressWarnings("removal") public void setBeanResolver(BeanResolver beanResolver) { this.evaluationContext.setBeanResolver(beanResolver); this.evaluationContext.setTypeConverter(new StandardTypeConverter()); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java index 14f11860..41544f53 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainerTxnTests.java @@ -44,6 +44,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.junit.jupiter.Testcontainers; @@ -252,11 +253,13 @@ void recordListenerWithBatchAckNotSupported() { .withMessage("Transactional record listeners can not use batch ack mode"); } + @Disabled("Flaky test see spring-pulsar/issues/1294") @Test void batchListenerUsesBatchAckWhenSharedSub() throws Exception { batchListenerUsesProperBatchAckForSubscriptionType("batch-lstr-batch-ack", SubscriptionType.Shared); } + @Disabled("Flaky test see spring-pulsar/issues/1294") @Test void batchListenerUsesCumulativeAckWhenNotSharedSub() throws Exception { batchListenerUsesProperBatchAckForSubscriptionType("batch-lstr-cumltv-ack", SubscriptionType.Exclusive);