Skip to content

Commit cc61bf1

Browse files
committed
Simplify passing extraConfig
1 parent dfcfecf commit cc61bf1

File tree

7 files changed

+32
-60
lines changed

7 files changed

+32
-60
lines changed

core/src/main/java/com/softwaremill/kmq/KafkaClients.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ public KafkaClients(String bootstrapServers) {
1717
this.bootstrapServers = bootstrapServers;
1818
}
1919

20-
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer) {
20+
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer,
21+
Class<? extends Serializer<V>> valueSerializer) {
2122
return createProducer(keySerializer, valueSerializer, Collections.emptyMap());
2223
}
2324

24-
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
25-
Map<String, Object> extraConfig) {
25+
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer,
26+
Class<? extends Serializer<V>> valueSerializer,
27+
Map<String, Object> extraConfig) {
2628
Properties props = new Properties();
2729
props.put("bootstrap.servers", bootstrapServers);
2830
props.put("acks", "all");
@@ -39,13 +41,16 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
3941
return new KafkaProducer<>(props);
4042
}
4143

42-
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) {
43-
return createConsumer(groupId, keyDeserializer, valueDeserializer, Collections.emptyMap());
44-
}
44+
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
45+
Class<? extends Deserializer<K>> keyDeserializer,
46+
Class<? extends Deserializer<V>> valueDeserializer) {
47+
return createConsumer(groupId, keyDeserializer, valueDeserializer, Collections.emptyMap());
48+
}
4549

4650
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
4751
Class<? extends Deserializer<K>> keyDeserializer,
48-
Class<? extends Deserializer<V>> valueDeserializer, Map<String, Object> extraConfig) {
52+
Class<? extends Deserializer<V>> valueDeserializer,
53+
Map<String, Object> extraConfig) {
4954
Properties props = new Properties();
5055
props.put("bootstrap.servers", bootstrapServers);
5156
props.put("enable.auto.commit", "false");
@@ -57,7 +62,7 @@ public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
5762
}
5863
// extraConfig : configure the kafka parameters (ex: ssl, ...)
5964
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
60-
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
65+
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
6166
}
6267

6368
return new KafkaConsumer<>(props);

core/src/main/java/com/softwaremill/kmq/KmqClient.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,7 @@ public KmqClient(KmqConfig config, KafkaClients clients,
4343
Class<? extends Deserializer<K>> keyDeserializer,
4444
Class<? extends Deserializer<V>> valueDeserializer,
4545
long msgPollTimeout) {
46-
47-
this.config = config;
48-
this.msgPollTimeout = msgPollTimeout;
49-
50-
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
51-
this.markerProducer = clients.createProducer(
52-
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
53-
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
54-
55-
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
56-
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
46+
this(config, clients, keyDeserializer, valueDeserializer, msgPollTimeout, Collections.emptyMap());
5747
}
5848

5949
public KmqClient(KmqConfig config, KafkaClients clients,
@@ -66,13 +56,11 @@ public KmqClient(KmqConfig config, KafkaClients clients,
6656

6757
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
6858
// Adding the PARTITIONER_CLASS_CONFIG in extraConfig map, if extraConfig is not empty
69-
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
70-
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
71-
this.markerProducer = clients.createProducer(
72-
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
73-
extraConfig);
74-
75-
59+
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
60+
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
61+
this.markerProducer = clients.createProducer(
62+
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
63+
extraConfig);
7664

7765
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
7866
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));

core/src/main/java/com/softwaremill/kmq/RedeliveryTracker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import scala.Option;
66

77
import java.io.Closeable;
8+
import java.util.Collections;
89
import java.util.Map;
910

1011
/**
@@ -13,10 +14,10 @@
1314
*/
1415
public class RedeliveryTracker {
1516
public static Closeable start(KafkaClients clients, KmqConfig config) {
16-
return RedeliveryActors.start(clients, config);
17+
return start(clients, config, Collections.emptyMap());
1718
}
1819

19-
public static Closeable start(KafkaClients clients, KmqConfig config, Option<Map<String, Object>> extraConfig) {
20+
public static Closeable start(KafkaClients clients, KmqConfig config, Map<String, Object> extraConfig) {
2021
return RedeliveryActors.start(clients, config, extraConfig);
2122
}
2223
}

core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,9 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
1010
import scala.collection.JavaConverters._
1111
import scala.concurrent.duration._
1212

13-
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
13+
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
1414

15-
private val consumer = extraConfig match {
16-
// extraConfig is not empty
17-
case Some(cfg) => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], cfg
18-
// extraConfig is empty
19-
case None => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
20-
}
15+
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
2116

2217
private var toCommit: Map[Partition, Offset] = Map()
2318

core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
1212

1313
import scala.collection.JavaConverters._
1414

15-
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
15+
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object]) extends Actor with StrictLogging {
1616

1717
private val OneSecond = 1000L
1818

@@ -26,22 +26,10 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig:
2626
private var commitMarkerOffsetsActor: ActorRef = _
2727

2828
override def preStart(): Unit = {
29-
markerConsumer = extraConfig match {
30-
// extraConfig is not empty
31-
case Some(cfg) => clients.createConsumer(config.getRedeliveryConsumerGroupId,
29+
markerConsumer = clients.createConsumer(config.getRedeliveryConsumerGroupId,
3230
classOf[MarkerKey.MarkerKeyDeserializer],
33-
classOf[MarkerValue.MarkerValueDeserializer], cfg)
34-
// extraConfig is empty
35-
case None => clients.createConsumer(config.getRedeliveryConsumerGroupId,
36-
classOf[MarkerKey.MarkerKeyDeserializer],
37-
classOf[MarkerValue.MarkerValueDeserializer])
38-
}
39-
producer = extraConfig match {
40-
// extraConfig is not empty
41-
case Some(cfg) => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], cfg)
42-
// extraConfig is empty
43-
case None => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
44-
}
31+
classOf[MarkerValue.MarkerValueDeserializer], extraConfig)
32+
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], extraConfig)
4533

4634
setupMarkerConsumer()
4735
setupOffsetCommitting()

core/src/main/scala/com.softwaremill.kmq/redelivery/Redeliverer.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,15 @@ trait Redeliverer {
2121

2222
class DefaultRedeliverer(
2323
partition: Partition, producer: KafkaProducer[Array[Byte], Array[Byte]],
24-
config: KmqConfig, clients: KafkaClients, extraConfig: Option[java.util.Map[String, Object]] = None)
24+
config: KmqConfig, clients: KafkaClients, extraConfig: java.util.Map[String, Object])
2525
extends Redeliverer with StrictLogging {
2626

2727
private val SendTimeoutSeconds = 60L
2828

2929
private val tp = new TopicPartition(config.getMsgTopic, partition)
3030

3131
private val reader = {
32-
val c = extraConfig match {
33-
// extraConfig is not empty
34-
case Some(cfg) => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], cfg)
35-
// extraConfig is empty
36-
case None => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
37-
}
32+
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], extraConfig)
3833
c.assign(Collections.singleton(tp))
3934
new SingleOffsetReader(tp, c)
4035
}

core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliveryActors.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package com.softwaremill.kmq.redelivery
22

33
import java.io.Closeable
4+
import java.util.Collections
45

56
import akka.actor.{ActorSystem, Props}
67
import com.softwaremill.kmq.{KafkaClients, KmqConfig}
78
import com.typesafe.scalalogging.StrictLogging
89

910
import scala.concurrent.Await
1011
import scala.concurrent.duration._
11-
1212
import scala.collection.JavaConverters._
1313

1414
object RedeliveryActors extends StrictLogging {
1515
def start(clients: KafkaClients, config: KmqConfig): Closeable = {
1616
start(clients, config)
1717
}
1818

19-
def start(clients: KafkaClients, config: KmqConfig, extraConfig: Option[java.util.Map[String, Object]] = None): Closeable = {
19+
def start(clients: KafkaClients, config: KmqConfig, extraConfig: java.util.Map[String, Object] = Collections.emptyMap()): Closeable = {
2020
val system = ActorSystem("kmq-redelivery")
2121

2222
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config, extraConfig)), "consume-markers-actor")

0 commit comments

Comments
 (0)