Skip to content

Commit d61fa44

Browse files
authored
Update ConsumeMarkersActor.scala
1 parent 477d3e4 commit d61fa44

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

core/src/main/scala/com.softwaremill.kmq/redelivery/ConsumeMarkersActor.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer
1212

1313
import scala.collection.JavaConverters._
1414

15-
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Actor with StrictLogging {
15+
class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
1616

1717
private val OneSecond = 1000L
1818

@@ -26,11 +26,22 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
2626
private var commitMarkerOffsetsActor: ActorRef = _
2727

2828
override def preStart(): Unit = {
29-
markerConsumer = clients.createConsumer(config.getRedeliveryConsumerGroupId,
30-
classOf[MarkerKey.MarkerKeyDeserializer],
31-
classOf[MarkerValue.MarkerValueDeserializer])
32-
33-
producer = clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
29+
markerConsumer = extraConfig match {
30+
// extraConfig is not empty
31+
case Some(cfg) => clients.createConsumer(config.getRedeliveryConsumerGroupId,
32+
classOf[MarkerKey.MarkerKeyDeserializer],
33+
classOf[MarkerValue.MarkerValueDeserializer], cfg)
34+
// extraConfig is empty
35+
case None => clients.createConsumer(config.getRedeliveryConsumerGroupId,
36+
classOf[MarkerKey.MarkerKeyDeserializer],
37+
classOf[MarkerValue.MarkerValueDeserializer])
38+
}
39+
producer = extraConfig match {
40+
// extraConfig is not empty
41+
case Some(cfg) => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer], cfg)
42+
// extraConfig is empty
43+
case None => clients.createProducer(classOf[ByteArraySerializer], classOf[ByteArraySerializer])
44+
}
3445

3546
setupMarkerConsumer()
3647
setupOffsetCommitting()
@@ -62,7 +73,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
6273

6374
private def partitionAssigned(p: Partition, endOffset: Offset): Unit = {
6475
val redeliverActorProps = Props(
65-
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients))))
76+
new RedeliverActor(p, new RetryingRedeliverer(new DefaultRedeliverer(p, producer, config, clients, extraConfig))))
6677
.withDispatcher("kmq.redeliver-dispatcher")
6778
val redeliverActor = context.actorOf(
6879
redeliverActorProps,
@@ -75,7 +86,7 @@ class ConsumeMarkersActor(clients: KafkaClients, config: KmqConfig) extends Acto
7586

7687
private def setupOffsetCommitting(): Unit = {
7788
commitMarkerOffsetsActor = context.actorOf(
78-
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients)),
89+
Props(new CommitMarkerOffsetsActor(config.getMarkerTopic, clients, extraConfig)),
7990
"commit-marker-offsets")
8091

8192
commitMarkerOffsetsActor ! DoCommit
@@ -170,4 +181,4 @@ case object DoCommit
170181
case class RedeliverMarkers(markers: List[MarkerKey])
171182
case object DoRedeliver
172183

173-
case object DoConsume
184+
case object DoConsume

0 commit comments

Comments
 (0)