Skip to content

Commit 7d94fca

Browse files
Add new API addRecordInterceptor.
Signed-off-by: Sanghyeok An <[email protected]>
1 parent 7a6a385 commit 7d94fca

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,24 @@ public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterce
475475
this.recordInterceptor = recordInterceptor;
476476
}
477477

478+
/**
479+
* Add an interceptor to be called before calling the record listener
480+
* if the {@link AbstractMessageListenerContainer} is configured with a {@link CompositeRecordInterceptor}.
481+
* If a {@link CompositeRecordInterceptor} is not configured, the {@link AbstractMessageListenerContainer}
482+
* will not add {@link RecordInterceptor}.
483+
* Does not apply to batch listeners.
484+
* @param recordInterceptor the interceptor.
485+
* @since 4.0
486+
*/
487+
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
488+
if (this.recordInterceptor instanceof CompositeRecordInterceptor<K, V> compositeRecordInterceptor) {
489+
compositeRecordInterceptor.addRecordInterceptor(recordInterceptor);
490+
}
491+
else {
492+
this.logger.warn("Failed to add record interceptor.");
493+
}
494+
}
495+
478496
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
479497
return this.batchInterceptor;
480498
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,12 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
9292
this.delegates.forEach(del -> del.afterRecord(record, consumer));
9393
}
9494

95+
/**
96+
* Add an {@link RecordInterceptor} to delegates.
97+
* @param recordInterceptor the interceptor.
98+
*/
99+
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
100+
this.delegates.add(recordInterceptor);
101+
}
102+
95103
}

0 commit comments

Comments
 (0)