Skip to content

Commit dfcfecf

Browse files
authored
Merge pull request #8 from MohamedEL59/MohamedEL59-patch-1
API more flexible to add extra configuration settings
2 parents 2720a2d + b1a7be7 commit dfcfecf

File tree

11 files changed

+145
-25
lines changed

11 files changed

+145
-25
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ name := "kmq"
55

66
lazy val commonSettings = Seq(
77
organization := "com.softwaremill.kmq",
8-
version := "0.2.1",
8+
version := "0.2.2",
99
scalaVersion := "2.12.4",
1010
crossScalaVersions := List(scalaVersion.value, "2.11.11"),
1111

core/src/main/java/com/softwaremill/kmq/KafkaClients.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
2121
return createProducer(keySerializer, valueSerializer, Collections.emptyMap());
2222
}
2323

24-
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
24+
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
2525
Map<String, Object> extraConfig) {
2626
Properties props = new Properties();
2727
props.put("bootstrap.servers", bootstrapServers);
@@ -39,9 +39,13 @@ public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>>
3939
return new KafkaProducer<>(props);
4040
}
4141

42+
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) {
43+
return createConsumer(groupId, keyDeserializer, valueDeserializer, Collections.emptyMap());
44+
}
45+
4246
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
4347
Class<? extends Deserializer<K>> keyDeserializer,
44-
Class<? extends Deserializer<V>> valueDeserializer) {
48+
Class<? extends Deserializer<V>> valueDeserializer, Map<String, Object> extraConfig) {
4549
Properties props = new Properties();
4650
props.put("bootstrap.servers", bootstrapServers);
4751
props.put("enable.auto.commit", "false");
@@ -51,6 +55,10 @@ public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
5155
if (groupId != null) {
5256
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
5357
}
58+
// extraConfig : configure the kafka parameters (ex: ssl, ...)
59+
for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
60+
props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
61+
}
5462

5563
return new KafkaConsumer<>(props);
5664
}

core/src/main/java/com/softwaremill/kmq/KmqClient.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
import java.io.Closeable;
1515
import java.io.IOException;
16-
import java.util.ArrayList;
17-
import java.util.Collections;
18-
import java.util.List;
16+
import java.util.*;
1917
import java.util.concurrent.Future;
2018

2119
/**
@@ -50,10 +48,31 @@ public KmqClient(KmqConfig config, KafkaClients clients,
5048
this.msgPollTimeout = msgPollTimeout;
5149

5250
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
53-
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
5451
this.markerProducer = clients.createProducer(
55-
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
56-
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
52+
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
53+
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
54+
55+
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
56+
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
57+
}
58+
59+
public KmqClient(KmqConfig config, KafkaClients clients,
60+
Class<? extends Deserializer<K>> keyDeserializer,
61+
Class<? extends Deserializer<V>> valueDeserializer,
62+
long msgPollTimeout, Map<String, Object> extraConfig) {
63+
64+
this.config = config;
65+
this.msgPollTimeout = msgPollTimeout;
66+
67+
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
68+
// Adding the PARTITIONER_CLASS_CONFIG in extraConfig map, if extraConfig is not empty
69+
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
70+
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
71+
this.markerProducer = clients.createProducer(
72+
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
73+
extraConfig);
74+
75+
5776

5877
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
5978
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));

core/src/main/java/com/softwaremill/kmq/RedeliveryTracker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import com.softwaremill.kmq.redelivery.RedeliveryActors;
44

5+
import scala.Option;
6+
57
import java.io.Closeable;
8+
import java.util.Map;
69

710
/**
811
* Tracks which messages has been processed, and redelivers the ones which are not processed until their redelivery
@@ -12,4 +15,8 @@ public class RedeliveryTracker {
1215
public static Closeable start(KafkaClients clients, KmqConfig config) {
1316
return RedeliveryActors.start(clients, config);
1417
}
18+
19+
public static Closeable start(KafkaClients clients, KmqConfig config, Option<Map<String, Object>> extraConfig) {
20+
return RedeliveryActors.start(clients, config, extraConfig);
21+
}
1522
}

core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
1010
import scala.collection.JavaConverters._
1111
import scala.concurrent.duration._
1212

13-
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients) extends Actor with StrictLogging {
13+
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
1414

15-
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
15+
private val consumer = extraConfig match {
16+
// extraConfig is not empty
17+
case Some(cfg) => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], cfg
18+
// extraConfig is empty
19+
case None => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
20+
}
1621

1722
private var toCommit: Map[Partition, Offset] = Map()
1823

core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
1212

1313
import scala.collection.JavaConverters._
1414

15-
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Actor with StrictLogging {
15+
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
1616

1717
private val OneSecond = 1000L
1818

@@ -26,11 +26,22 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
2626
private var commitMarkerOffsetsActor: ActorRef = _
2727

2828
override def preStart(): Unit = {
29-
markerConsumer = clients.createConsumer(config.getRedeliveryConsumerGroupId,
30-
classOf[MarkerKey.MarkerKeyDeserializer],
31-
classOf[MarkerValue.MarkerValueDeserializer])
32-
33-
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
29+
markerConsumer = extraConfig match {
30+
// extraConfig is not empty
31+
case Some(cfg) => clients.createConsumer(config.getRedeliveryConsumerGroupId,
32+
classOf[MarkerKey.MarkerKeyDeserializer],
33+
classOf[MarkerValue.MarkerValueDeserializer], cfg)
34+
// extraConfig is empty
35+
case None => clients.createConsumer(config.getRedeliveryConsumerGroupId,
36+
classOf[MarkerKey.MarkerKeyDeserializer],
37+
classOf[MarkerValue.MarkerValueDeserializer])
38+
}
39+
producer = extraConfig match {
40+
// extraConfig is not empty
41+
case Some(cfg) => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], cfg)
42+
// extraConfig is empty
43+
case None => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
44+
}
3445

3546
setupMarkerConsumer()
3647
setupOffsetCommitting()
@@ -62,7 +73,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
6273

6374
private def partitionAssigned(p: Partition, endOffset: Offset): Unit = {
6475
val redeliverActorProps = Props(
65-
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients))))
76+
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients, extraConfig))))
6677
.withDispatcher("kmq.redeliver-dispatcher")
6778
val redeliverActor = context.actorOf(
6879
redeliverActorProps,
@@ -75,7 +86,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
7586

7687
private def setupOffsetCommitting(): Unit = {
7788
commitMarkerOffsetsActor = context.actorOf(
78-
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients)),
89+
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients, extraConfig)),
7990
"commit-marker-offsets")
8091

8192
commitMarkerOffsetsActor ! DoCommit
@@ -170,4 +181,4 @@ case object DoCommit
170181
case class RedeliverMarkers(markers: List[MarkerKey])
171182
case object DoRedeliver
172183

173-
case object DoConsume
184+
case object DoConsume

core/src/main/scala/com.softwaremill.kmq/redelivery/Redeliverer.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,20 @@ trait Redeliverer {
2121

2222
class DefaultRedeliverer(
2323
partition: Partition, producer: KafkaProducer[Array[Byte], Array[Byte]],
24-
config: KmqConfig, clients: KafkaClients)
24+
config: KmqConfig, clients: KafkaClients, extraConfig: Option[java.util.Map[String, Object]] = None)
2525
extends Redeliverer with StrictLogging {
2626

2727
private val SendTimeoutSeconds = 60L
2828

2929
private val tp = new TopicPartition(config.getMsgTopic, partition)
3030

3131
private val reader = {
32-
val c = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
32+
val c = extraConfig match {
33+
// extraConfig is not empty
34+
case Some(cfg) => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], cfg)
35+
// extraConfig is empty
36+
case None => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
37+
}
3338
c.assign(Collections.singleton(tp))
3439
new SingleOffsetReader(tp, c)
3540
}
@@ -133,4 +138,4 @@ private class SingleOffsetReader(tp: TopicPartition, consumer: KafkaConsumer[Arr
133138
def close(): Unit = {
134139
consumer.close()
135140
}
136-
}
141+
}

core/src/main/scala/com.softwaremill.kmq/redelivery/RedeliveryActors.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ import com.typesafe.scalalogging.StrictLogging
99
import scala.concurrent.Await
1010
import scala.concurrent.duration._
1111

12+
import scala.collection.JavaConverters._
13+
1214
object RedeliveryActors extends StrictLogging {
1315
def start(clients: KafkaClients, config: KmqConfig): Closeable = {
16+
start(clients, config)
17+
}
18+
19+
def start(clients: KafkaClients, config: KmqConfig, extraConfig: Option[java.util.Map[String, Object]] = None): Closeable = {
1420
val system = ActorSystem("kmq-redelivery")
1521

16-
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config)), "consume-markers-actor")
22+
val consumeMakersActor = system.actorOf(Props(new ConsumeMarkersActor(clients, config, extraConfig)), "consume-markers-actor")
1723
consumeMakersActor ! DoConsume
1824

1925
logger.info("Started redelivery actors")

example-java/src/main/java/com/softwaremill/kmq/example/standalone/StandaloneProcessor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
import com.softwaremill.kmq.KmqClient;
44
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
55
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.CommonClientConfigs;
67
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
8+
import org.apache.kafka.common.config.SslConfigs;
9+
710
import org.slf4j.Logger;
811
import org.slf4j.LoggerFactory;
912

1013
import java.io.IOException;
1114
import java.nio.ByteBuffer;
1215
import java.time.Clock;
1316
import java.util.Map;
17+
import java.util.HashMap;
1418
import java.util.Random;
1519
import java.util.concurrent.ConcurrentHashMap;
1620
import java.util.concurrent.ExecutorService;
@@ -24,6 +28,22 @@ class StandaloneProcessor {
2428

2529
public static void main(String[] args) throws InterruptedException, IOException {
2630
UncaughtExceptionHandling.setup();
31+
32+
/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
33+
Map extraConfig = new HashMap();
34+
//configure the following three settings for SSL Encryption
35+
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
36+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
37+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
38+
39+
// configure the following three settings for SSL Authentication
40+
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
41+
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
42+
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
43+
44+
KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
45+
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100, extraConfig);
46+
*/
2747

2848
KmqClient<ByteBuffer, ByteBuffer> kmqClient = new KmqClient<>(KMQ_CONFIG, KAFKA_CLIENTS,
2949
ByteBufferDeserializer.class, ByteBufferDeserializer.class, 100);

example-java/src/main/java/com/softwaremill/kmq/example/standalone/StandaloneRedeliveryTracker.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22

33
import com.softwaremill.kmq.RedeliveryTracker;
44
import com.softwaremill.kmq.example.UncaughtExceptionHandling;
5+
import org.apache.kafka.clients.CommonClientConfigs;
6+
import org.apache.kafka.common.config.SslConfigs;
57
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
79

810
import java.io.Closeable;
911
import java.io.IOException;
12+
import java.util.HashMap;
13+
import java.util.Map;
1014

1115
import static com.softwaremill.kmq.example.standalone.StandaloneConfig.*;
1216

@@ -15,6 +19,21 @@ class StandaloneRedeliveryTracker {
1519

1620
public static void main(String[] args) throws InterruptedException, IOException {
1721
UncaughtExceptionHandling.setup();
22+
23+
/* EXAMPLE with extraConfig : SSL Encryption & SSL Authentication
24+
Map extraConfig = new HashMap();
25+
//configure the following three settings for SSL Encryption
26+
extraConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
27+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/directory/kafka.client.truststore.jks");
28+
extraConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
29+
30+
// configure the following three settings for SSL Authentication
31+
extraConfig.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/directory/kafka.client.keystore.jks");
32+
extraConfig.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
33+
extraConfig.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
34+
35+
Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG, scala.Option.apply(extraConfig));
36+
*/
1837

1938
Closeable redelivery = RedeliveryTracker.start(KAFKA_CLIENTS, KMQ_CONFIG);
2039
LOG.info("Redelivery tracker started");
@@ -24,4 +43,4 @@ public static void main(String[] args) throws InterruptedException, IOException
2443
redelivery.close();
2544
LOG.info("Redelivery tracker stopped");
2645
}
27-
}
46+
}

0 commit comments

Comments
 (0)