Skip to content

Commit 477d3e4

Browse files
authored
Update CommitMarkerOffsetsActor.scala
1 parent c9d8a05 commit 477d3e4

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

core/src/main/scala/com.softwaremill.kmq/redelivery/CommitMarkerOffsetsActor.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
1010
import scala.collection.JavaConverters._
1111
import scala.concurrent.duration._
1212

13-
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients) extends Actor with StrictLogging {
13+
class CommitMarkerOffsetsActor(markerTopic: String, clients: KafkaClients, extraConfig: Option[java.util.Map[String, Object]] = None) extends Actor with StrictLogging {
1414

15-
private val consumer = clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
15+
private val consumer = extraConfig match {
16+
// extraConfig is not empty
17+
case Some(cfg) => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer], cfg
18+
// extraConfig is empty
19+
case None => clients.createConsumer(null, classOf[ByteArrayDeserializer], classOf[ByteArrayDeserializer])
20+
}
1621

1722
private var toCommit: Map[Partition, Offset] = Map()
1823

0 commit comments

Comments
 (0)