Skip to content

Commit 7339ba1

Browse files
authored
Update KafkaClients.java
1 parent 4496323 commit 7339ba1

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

core/src/main/java/com/softwaremill/kmq/KafkaClients.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
2121
return createProducer(keySerializer, valueSerializer, Collections.emptyMap());
2222
}
2323

24-
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
24+
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
2525
Map<String, Object> extraConfig) {
2626
Properties props = new Properties();
2727
props.put("bootstrap.servers", bootstrapServers);
@@ -39,9 +39,13 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
3939
return new KafkaProducer<>(props);
4040
}
4141

42+
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) {
43+
return createConsumer(groupId, keyDeserializer, valueDeserializer, Collections.emptyMap());
44+
}
45+
4246
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
4347
Class<? extends Deserializer<K>> keyDeserializer,
44-
Class<? extends Deserializer<V>> valueDeserializer) {
48+
Class<? extends Deserializer<V>> valueDeserializer, Map<String, Object> extraConfig) {
4549
Properties props = new Properties();
4650
props.put("bootstrap.servers", bootstrapServers);
4751
props.put("enable.auto.commit", "false");
@@ -51,6 +55,10 @@ public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
5155
if (groupId != null) {
5256
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
5357
}
58+
// extraConfig : configure the kafka parameters (ex: ssl, ...)
59+
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
60+
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
61+
}
5462

5563
return new KafkaConsumer<>(props);
5664
}

0 commit comments

Comments
 (0)