Skip to content

Commit 8fbb7e3

Browse files
garyrussellartembilan
authored andcommitted
GH-1520: Add Producer/Consumer Post Processors
Resolves #1520 To assist with Sleuth instrumentation.
1 parent 28ff099 commit 8fbb7e3

File tree

8 files changed

+271
-1
lines changed

8 files changed

+271
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.util.Collections;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Properties;
2123

@@ -136,6 +138,72 @@ default Deserializer<V> getValueDeserializer() {
136138
return null;
137139
}
138140

141+
/**
142+
* Remove a listener.
143+
* @param listener the listener.
144+
* @return true if removed.
145+
* @since 2.5.3
146+
*/
147+
default boolean removeListener(Listener<K, V> listener) {
148+
return false;
149+
}
150+
151+
/**
152+
* Add a listener at a specific index.
153+
* @param index the index (list position).
154+
* @param listener the listener.
155+
* @since 2.5.3
156+
*/
157+
default void addListener(int index, Listener<K, V> listener) {
158+
159+
}
160+
161+
/**
162+
* Add a listener.
163+
* @param listener the listener.
164+
* @since 2.5.3
165+
*/
166+
default void addListener(Listener<K, V> listener) {
167+
168+
}
169+
170+
/**
171+
* Get the current list of listeners.
172+
* @return the listeners.
173+
* @since 2.5.3
174+
*/
175+
default List<Listener<K, V>> getListeners() {
176+
return Collections.emptyList();
177+
}
178+
179+
/**
180+
* Add a post processor.
181+
* @param postProcessor the post processor.
182+
* @since 2.5.3
183+
*/
184+
default void addPostProcessor(ConsumerPostProcessor<K, V> postProcessor) {
185+
186+
}
187+
188+
/**
189+
* Remove a post processor.
190+
* @param postProcessor the post processor.
191+
* @return true if removed.
192+
* @since 2.5.3
193+
*/
194+
default boolean removePostProcessor(ConsumerPostProcessor<K, V> postProcessor) {
195+
return false;
196+
}
197+
198+
/**
199+
* Get the current list of post processors.
200+
* @return the post processor.
201+
* @since 2.5.3
202+
*/
203+
default List<ConsumerPostProcessor<K, V>> getPostProcessors() {
204+
return Collections.emptyList();
205+
}
206+
139207
/**
140208
* Called whenever a consumer is added or removed.
141209
*
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.function.Function;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
23+
/**
24+
* Called by consumer factories to perform post processing on newly created consumers.
25+
*
26+
* @param <K> the key type.
27+
* @param <V> the value type
28+
*
29+
* @author Gary Russell
30+
* @since 2.5.3
31+
*
32+
*/
33+
public interface ConsumerPostProcessor<K, V> extends Function<Consumer<K, V>, Consumer<K, V>> {
34+
35+
}

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
7878

7979
private final List<Listener<K, V>> listeners = new ArrayList<>();
8080

81+
private final List<ConsumerPostProcessor<K, V>> postProcessors = new ArrayList<>();
82+
8183
private Supplier<Deserializer<K>> keyDeserializerSupplier;
8284

8385
private Supplier<Deserializer<V>> valueDeserializerSupplier;
@@ -165,15 +167,22 @@ public Deserializer<V> getValueDeserializer() {
165167
* @return the listeners.
166168
* @since 2.5
167169
*/
170+
@Override
168171
public List<Listener<K, V>> getListeners() {
169172
return Collections.unmodifiableList(this.listeners);
170173
}
171174

175+
@Override
176+
public List<ConsumerPostProcessor<K, V>> getPostProcessors() {
177+
return Collections.unmodifiableList(this.postProcessors);
178+
}
179+
172180
/**
173181
* Add a listener.
174182
* @param listener the listener.
175183
* @since 2.5
176184
*/
185+
@Override
177186
public void addListener(Listener<K, V> listener) {
178187
Assert.notNull(listener, "'listener' cannot be null");
179188
this.listeners.add(listener);
@@ -185,6 +194,7 @@ public void addListener(Listener<K, V> listener) {
185194
* @param listener the listener.
186195
* @since 2.5
187196
*/
197+
@Override
188198
public void addListener(int index, Listener<K, V> listener) {
189199
Assert.notNull(listener, "'listener' cannot be null");
190200
if (index >= this.listeners.size()) {
@@ -195,12 +205,24 @@ public void addListener(int index, Listener<K, V> listener) {
195205
}
196206
}
197207

208+
@Override
209+
public void addPostProcessor(ConsumerPostProcessor<K, V> postProcessor) {
210+
Assert.notNull(postProcessor, "'postProcessor' cannot be null");
211+
this.postProcessors.add(postProcessor);
212+
}
213+
214+
@Override
215+
public boolean removePostProcessor(ConsumerPostProcessor<K, V> postProcessor) {
216+
return this.postProcessors.remove(postProcessor);
217+
}
218+
198219
/**
199220
* Remove a listener.
200221
* @param listener the listener.
201222
* @return true if removed.
202223
* @since 2.5
203224
*/
225+
@Override
204226
public boolean removeListener(Listener<K, V> listener) {
205227
return this.listeners.remove(listener);
206228
}
@@ -301,6 +323,9 @@ protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
301323
listener.consumerAdded(id, kafkaConsumer);
302324
}
303325
}
326+
for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
327+
pp.apply(kafkaConsumer);
328+
}
304329
return kafkaConsumer;
305330
}
306331

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
132132

133133
private final List<Listener<K, V>> listeners = new ArrayList<>();
134134

135+
private final List<ProducerPostProcessor<K, V>> postProcessors = new ArrayList<>();
136+
135137
private Supplier<Serializer<K>> keySerializerSupplier;
136138

137139
private Supplier<Serializer<V>> valueSerializerSupplier;
@@ -337,15 +339,22 @@ public Map<String, Object> getConfigurationProperties() {
337339
* @return the listeners.
338340
* @since 2.5
339341
*/
342+
@Override
340343
public List<Listener<K, V>> getListeners() {
341344
return Collections.unmodifiableList(this.listeners);
342345
}
343346

347+
@Override
348+
public List<ProducerPostProcessor<K, V>> getPostProcessors() {
349+
return Collections.unmodifiableList(this.postProcessors);
350+
}
351+
344352
/**
345353
* Add a listener.
346354
* @param listener the listener.
347355
* @since 2.5
348356
*/
357+
@Override
349358
public void addListener(Listener<K, V> listener) {
350359
Assert.notNull(listener, "'listener' cannot be null");
351360
this.listeners.add(listener);
@@ -357,6 +366,7 @@ public void addListener(Listener<K, V> listener) {
357366
* @param listener the listener.
358367
* @since 2.5
359368
*/
369+
@Override
360370
public void addListener(int index, Listener<K, V> listener) {
361371
Assert.notNull(listener, "'listener' cannot be null");
362372
if (index >= this.listeners.size()) {
@@ -373,10 +383,22 @@ public void addListener(int index, Listener<K, V> listener) {
373383
* @return true if removed.
374384
* @since 2.5
375385
*/
386+
@Override
376387
public boolean removeListener(Listener<K, V> listener) {
377388
return this.listeners.remove(listener);
378389
}
379390

391+
@Override
392+
public void addPostProcessor(ProducerPostProcessor<K, V> postProcessor) {
393+
Assert.notNull(postProcessor, "'postProcessor' cannot be null");
394+
this.postProcessors.add(postProcessor);
395+
}
396+
397+
@Override
398+
public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
399+
return this.postProcessors.remove(postProcessor);
400+
}
401+
380402
/**
381403
* When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream.
382404
*/
@@ -642,7 +664,10 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
642664
}
643665

644666
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
645-
return new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
667+
KafkaProducer<K, V> kafkaProducer =
668+
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
669+
this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
670+
return kafkaProducer;
646671
}
647672

648673
@Nullable

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.List;
2022
import java.util.Map;
2123
import java.util.function.Supplier;
2224

@@ -168,6 +170,72 @@ default Duration getPhysicalCloseTimeout() {
168170
return DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
169171
}
170172

173+
/**
174+
* Add a listener.
175+
* @param listener the listener.
176+
* @since 2.5.3
177+
*/
178+
default void addListener(Listener<K, V> listener) {
179+
180+
}
181+
182+
/**
183+
* Add a listener at a specific index.
184+
* @param index the index (list position).
185+
* @param listener the listener.
186+
* @since 2.5.3
187+
*/
188+
default void addListener(int index, Listener<K, V> listener) {
189+
190+
}
191+
192+
/**
193+
* Remove a listener.
194+
* @param listener the listener.
195+
* @return true if removed.
196+
* @since 2.5.3
197+
*/
198+
default boolean removeListener(Listener<K, V> listener) {
199+
return false;
200+
}
201+
202+
/**
203+
* Get the current list of listeners.
204+
* @return the listeners.
205+
* @since 2.5.3
206+
*/
207+
default List<Listener<K, V>> getListeners() {
208+
return Collections.emptyList();
209+
}
210+
211+
/**
212+
* Add a post processor.
213+
* @param postProcessor the post processor.
214+
* @since 2.5.3
215+
*/
216+
default void addPostProcessor(ProducerPostProcessor<K, V> postProcessor) {
217+
218+
}
219+
220+
/**
221+
* Remove a post processor.
222+
* @param postProcessor the post processor.
223+
* @return true if removed.
224+
* @since 2.5.3
225+
*/
226+
default boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
227+
return false;
228+
}
229+
230+
/**
231+
* Get the current list of post processors.
232+
* @return the post processors.
233+
* @since 2.5.3
234+
*/
235+
default List<ProducerPostProcessor<K, V>> getPostProcessors() {
236+
return Collections.emptyList();
237+
}
238+
171239
/**
172240
* Called whenever a producer is added or removed.
173241
*
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.function.Function;
20+
21+
import org.apache.kafka.clients.producer.Producer;
22+
23+
/**
24+
* Called by producer factories to perform post processing on newly created producers.
25+
*
26+
* @param <K> the key type.
27+
* @param <V> the value type
28+
*
29+
* @author Gary Russell
30+
* @since 2.5.3
31+
*
32+
*/
33+
public interface ProducerPostProcessor<K, V> extends Function<Producer<K, V>, Producer<K, V>> {
34+
35+
}

0 commit comments

Comments
 (0)