2222import java .util .Properties ;
2323import java .util .function .Supplier ;
2424
25+ import org .apache .commons .logging .LogFactory ;
2526import org .apache .kafka .clients .consumer .Consumer ;
2627import org .apache .kafka .clients .consumer .ConsumerConfig ;
2728import org .apache .kafka .clients .consumer .KafkaConsumer ;
2829import org .apache .kafka .common .serialization .Deserializer ;
2930
31+ import org .springframework .core .log .LogAccessor ;
3032import org .springframework .lang .Nullable ;
3133import org .springframework .util .StringUtils ;
3234
5759 */
5860public class DefaultKafkaConsumerFactory <K , V > implements ConsumerFactory <K , V > {
5961
62+ private static final LogAccessor LOGGER = new LogAccessor (LogFactory .getLog (DefaultKafkaConsumerFactory .class ));
63+
6064 private final Map <String , Object > configs ;
6165
6266 private Supplier <Deserializer <K >> keyDeserializerSupplier ;
@@ -179,6 +183,7 @@ private KafkaConsumer<K, V> createConsumerWithAdjustedProperties(String groupId,
179183 : modifiedConfigs .get (ConsumerConfig .CLIENT_ID_CONFIG )) + clientIdSuffix );
180184 }
181185 if (properties != null ) {
186+ checkForUnsupportedProps (properties );
182187 properties .stringPropertyNames ()
183188 .stream ()
184189 .filter (name -> !name .equals (ConsumerConfig .CLIENT_ID_CONFIG )
@@ -188,6 +193,15 @@ private KafkaConsumer<K, V> createConsumerWithAdjustedProperties(String groupId,
188193 return createKafkaConsumer (modifiedConfigs );
189194 }
190195
196+ private void checkForUnsupportedProps (Properties properties ) {
197+ properties .forEach ((key , value ) -> {
198+ if (!(key instanceof String ) || !(value instanceof String )) {
199+ LOGGER .warn (() -> "Property override for '" + key .toString ()
200+ + "' ignored, only <String, String> properties are supported; value is a(n) " + value .getClass ());
201+ }
202+ });
203+ }
204+
191205 protected KafkaConsumer <K , V > createKafkaConsumer (Map <String , Object > configProps ) {
192206 return new KafkaConsumer <>(configProps , this .keyDeserializerSupplier .get (),
193207 this .valueDeserializerSupplier .get ());
0 commit comments