Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-maven

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Java CI with Maven

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package --file pom.xml

# Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive
- name: Update dependency graph
uses: advanced-security/maven-dependency-submission-action@571e99aab1055c2e71a1e2309b9691de18d6b7d6
Original file line number Diff line number Diff line change
Expand Up @@ -1203,14 +1203,28 @@ public void abortTransaction() throws ProducerFencedException {

@Override
public void registerMetricForSubscription(KafkaMetric kafkaMetric) {
//TODO - INVESTIGATE IF WE ARE MISSING SOMETHING
this.delegate.registerMetricForSubscription(kafkaMetric);
LOGGER.trace(() -> toString() + " registerMetricForSubscription(" + kafkaMetric + ")");
try {
this.delegate.registerMetricForSubscription(kafkaMetric);
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "Metric registration failed: " + this);
this.producerFailed = e;
throw e;
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric kafkaMetric) {
//TODO - INVESTIGATE IF WE ARE MISSING SOMETHING
this.delegate.unregisterMetricFromSubscription(kafkaMetric);
LOGGER.trace(() -> toString() + " unregisterMetricFromSubscription(" + kafkaMetric + ")");
try {
this.delegate.unregisterMetricFromSubscription(kafkaMetric);
}
catch (RuntimeException e) {
LOGGER.error(e, () -> "Metric unregistration failed: " + this);
this.producerFailed = e;
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;

import org.apache.kafka.clients.producer.Producer;
import org.jspecify.annotations.NonNull;

import org.springframework.transaction.support.ResourceHolderSupport;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -52,6 +53,7 @@ public KafkaResourceHolder(Producer<K, V> producer, Duration closeTimeout) {
this.closeTimeout = closeTimeout;
}

@NonNull
public Producer<K, V> getProducer() {
return this.producer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void setBeanName(String name) {
* Return the bean name.
* @return the bean name.
*/
// TODO: work on this @Nullable
@NonNull
public String getBeanName() {
return this.beanName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.NonNull;

import org.springframework.context.Lifecycle;
import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -78,6 +79,7 @@ public ContainerGroup(String name, MessageListenerContainer... containers) {
* Return the group name.
* @return the name.
*/
@NonNull
public String getName() {
return this.name;
}
Expand All @@ -86,6 +88,7 @@ public String getName() {
* Return the listener ids of the containers in this group.
* @return the listener ids.
*/
@NonNull
public Collection<String> getListenerIds() {
return this.containers.stream()
.map(container -> container.getListenerId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public void setSeekAfterError(boolean seekAfterError) {
this.seekAfterError = seekAfterError;
}

/**
* Return the delivery attempt for the given topic/partition/offset.
* @param topicPartitionOffset the topic/partition/offset.
* @return the delivery attempt.
* @since 2.5
*/
@Override
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
return this.failureTracker.deliveryAttempt(topicPartitionOffset);
Expand All @@ -175,6 +181,12 @@ protected FailedRecordTracker getFailureTracker() {
return this.failureTracker;
}

/**
* Clear the thread-local state maintained by the failure tracker.
* This method should be called when the thread is being returned to a pool
* to prevent memory leaks from thread-local storage.
* @since 2.3.1
*/
public void clearThreadState() {
this.failureTracker.clearThreadState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,74 @@ public ConsumerRecordMetadata(RecordMetadata delegate, TimestampType timestampTy
this.timestampType = timestampType;
}

/**
* Return true if the offset is valid.
* @return true if the offset is valid.
*/
public boolean hasOffset() {
return this.delegate.hasOffset();
}

/**
* Return the offset of the record in the topic partition.
* @return the offset.
*/
public long offset() {
return this.delegate.offset();
}

/**
* Return true if the timestamp is valid.
* @return true if the timestamp is valid.
*/
public boolean hasTimestamp() {
return this.delegate.hasTimestamp();
}

/**
* Return the timestamp of the record.
* @return the timestamp.
*/
public long timestamp() {
return this.delegate.timestamp();
}

/**
* Return the size of the serialized, uncompressed key in bytes.
* @return the size of the serialized key.
*/
public int serializedKeySize() {
return this.delegate.serializedKeySize();
}

/**
* Return the size of the serialized, uncompressed value in bytes.
* @return the size of the serialized value.
*/
public int serializedValueSize() {
return this.delegate.serializedValueSize();
}

/**
* Return the topic name the record was appended to.
* @return the topic name.
*/
public String topic() {
return this.delegate.topic();
}

/**
* Return the partition the record was sent to.
* @return the partition.
*/
public int partition() {
return this.delegate.partition();
}

/**
* Return the timestamp type for this record.
* @return the timestamp type.
*/
public TimestampType timestampType() {
return this.timestampType;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,53 @@
/**
* Package for retryable topic handling.
* <h2>Retry Topic Infrastructure</h2>
*
* <p>This package provides comprehensive support for implementing retry patterns with Kafka,
* featuring automatic retry topic management and dead-letter topic (DLT) handling.
*
* <h3>Core Concepts</h3>
*
* <p><b>Retry Topics:</b> When message processing fails, messages are automatically sent to
* retry topics with configurable back-off delays, allowing for progressive retry attempts
* without blocking the main consumer thread.
*
* <p><b>Dead-Letter Topics (DLT):</b> Messages that exhaust all retry attempts are routed to
* a dead-letter topic for manual inspection, reprocessing, or logging.
*
* <h3>Key Components</h3>
*
* <ul>
* <li>{@link org.springframework.kafka.retrytopic.RetryTopicConfiguration RetryTopicConfiguration} -
* Main configuration container for retry behavior</li>
* <li>{@link org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder RetryTopicConfigurationBuilder} -
* Fluent API for building retry configurations</li>
* <li>{@link org.springframework.kafka.retrytopic.DestinationTopic DestinationTopic} -
* Represents a single retry or DLT destination</li>
* <li>{@link org.springframework.kafka.retrytopic.DestinationTopicResolver DestinationTopicResolver} -
* Resolves which destination topic to use for failed messages</li>
* <li>{@link org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory DeadLetterPublishingRecovererFactory} -
* Creates recoverers that publish failed messages to appropriate destinations</li>
* <li>{@link org.springframework.kafka.retrytopic.DltStrategy DltStrategy} -
* Defines strategies for DLT handling (always send, only on certain exceptions, etc.)</li>
* </ul>
*
* <h3>Typical Usage</h3>
*
* <p>Configure retry topics using the {@code @RetryableTopic} annotation on listener methods,
* or programmatically via {@link org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder}.
* The framework automatically creates the necessary retry topic infrastructure and routes
* failed messages through the configured retry chain.
*
* <h3>Back-off Configuration</h3>
*
* <p>The package supports various back-off strategies:
* <ul>
* <li>Fixed delay between retries</li>
* <li>Exponential back-off with configurable multiplier</li>
* <li>Maximum retry attempts</li>
* <li>Custom back-off policies</li>
* </ul>
*
* @since 2.7
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.retrytopic;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.reflect.Method;
import java.util.Arrays;

import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.BeanCurrentlyInCreationException;
Expand Down Expand Up @@ -80,6 +81,7 @@ public EndpointHandlerMethod(Object bean, Method method) {
* Return the method.
* @return the method.
*/
@NonNull
public Method getMethod() {
if (this.beanOrClass instanceof Class<?> clazz) {
return forClass(clazz);
Expand All @@ -100,6 +102,7 @@ public Method getMethod() {
* @return the name.
* @since 2.8
*/
@NonNull
public String getMethodName() {
Assert.state(this.methodName != null, "Unexpected call to getMethodName()");
return this.methodName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.reflect.Method;
import java.util.List;

import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -52,6 +53,7 @@ public EndpointHandlerMultiMethod(Object bean, @Nullable Method defaultMethod, L
* Return the method list.
* @return the method list.
*/
@NonNull
public List<Method> getMethods() {
return this.methods;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public abstract class KafkaHeaders {
* Consumer group that failed to consumer a record published to another topic.
* @since 2.8
*/
public static final String ORIGINAL_CONSUMER_GROUP = PREFIX + "dlt-original-consumer-group";
public static final String ORIGINAL_CONSUMER_GROUP = PREFIX + "original-consumer-group";

/**
* Original timestamp for a record published to another topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jspecify.annotations.NonNull;

/**
* Result for a {@link java.util.concurrent.CompletableFuture} after a send.
Expand All @@ -39,10 +40,12 @@ public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMeta
this.recordMetadata = recordMetadata;
}

@NonNull
public ProducerRecord<K, V> getProducerRecord() {
return this.producerRecord;
}

@NonNull
public RecordMetadata getRecordMetadata() {
return this.recordMetadata;
}
Expand Down