Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ The `@KafkaListener` annotation now has the `filter` attribute, to override the

The `@KafkaListener` annotation now has the `info` attribute; this is used to populate the new listener container property `listenerInfo`.
This is then used to populate a `KafkaHeaders.LISTENER_INFO` header in each record which can be used in `RecordInterceptor`, `RecordFilterStrategy`, or the listener itself.
See xref:kafka/annotation-error-handling.adoc#li-header[Listener Info Header] and xref:kafka/container-props.adoc#alc-props[Abstract Listener Container Properties] for more information.
See xref:kafka/annotation-error-handling.adoc#li-header[Listener Info Header] and xref:kafka/container-props.adoc#amlc-props[AbstractMessageListenerContainer Properties] for more information.

[[x28-template]]
=== `KafkaTemplate` Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Also see `idleBeforeDataMultiplier`.
|[[listenerTaskExecutor]]<<listenerTaskExecutor,`listenerTaskExecutor`>>
|`SimpleAsyncTaskExecutor`
|A task executor to run the consumer threads.
The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container.
The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-m` where `m` is incremented for each child container. See xref:kafka/receiving-messages/container-thread-naming.adoc#container-thread-naming[Container Thread Naming].

|[[logContainerConfig]]<<logContainerConfig,`logContainerConfig`>>
|`false`
Expand Down Expand Up @@ -239,8 +239,8 @@ Mutually exclusive; at least one must be provided; enforced by `ContainerPropert
|Deprecated since 3.2, see <<kafkaAwareTransactionManager>>, xref:kafka/transactions.adoc#transaction-synchronization[Other transaction managers].
|===

[[alc-props]]
.`AbstractListenerContainer` Properties
[[amlc-props]]
.`AbstractMessageListenerContainer` Properties
[cols="9,10,16", options="header"]
|===
| Property
Expand Down Expand Up @@ -320,10 +320,6 @@ Also see `interceptBeforeTx`.
|(read only)
|The partitions currently assigned to this container (explicitly or not).

|[[assignedPartitionsByClientId]]<<assignedPartitionsByClientId,`assignedPartitionsByClientId`>>
|(read only)
|The partitions currently assigned to this container (explicitly or not).

|[[clientIdSuffix]]<<clientIdSuffix,`clientIdSuffix`>>
|`null`
|Used by the concurrent container to give each child container's consumer a unique `client.id`.
Expand All @@ -348,10 +344,6 @@ Also see `interceptBeforeTx`.
|(read only)
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not).

|[[assignedPartitionsByClientId2]]<<assignedPartitionsByClientId2,`assignedPartitionsByClientId`>>
|(read only)
|The partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not), keyed by the child container's consumer's `client.id` property.

|[[concurrency]]<<concurrency,`concurrency`>>
|1
|The number of child `KafkaMessageListenerContainer`+++s+++ to manage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ You can use this event to restart the container after such a condition:

[source, java]
----
if (event.getReason.equals(Reason.FENCED)) {
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
----
Expand Down Expand Up @@ -205,7 +205,7 @@ Consequently, in the preceding example, we narrow the events received based on t
Since containers created for the `@KafkaListener` support concurrency, the actual containers are named `id-n` where the `n` is a unique value for each instance to support the concurrency.
That is why we use `startsWith` in the condition.

CAUTION: If you wish to use the idle event to stop the lister container, you should not call `container.stop()` on the thread that calls the listener.
CAUTION: If you wish to use the idle event to stop the listener container, you should not call `container.stop()` on the thread that calls the listener.
Doing so causes delays and unnecessary log messages.
Instead, you should hand off the event to a different thread that can then stop the container.
Also, you should not `stop()` the container instance if it is a child container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The following example shows how to do so:
----
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
return new KafkaTemplate<String, String>(producerFactory()) {

@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ The following listing shows the definition of the `ProducerListener` interface:
----
public interface ProducerListener<K, V> {

default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}

default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
}

}
----
Expand Down