@@ -21,15 +21,20 @@ trait Redeliverer {
21
21
22
22
class DefaultRedeliverer (
23
23
partition : Partition , producer : KafkaProducer [Array [Byte ], Array [Byte ]],
24
- config : KmqConfig , clients : KafkaClients )
24
+ config : KmqConfig , clients : KafkaClients , extraConfig : Option [java.util. Map [ String , Object ]] = None )
25
25
extends Redeliverer with StrictLogging {
26
26
27
27
private val SendTimeoutSeconds = 60L
28
28
29
29
private val tp = new TopicPartition (config.getMsgTopic, partition)
30
30
31
31
private val reader = {
32
- val c = clients.createConsumer(null , classOf [ByteArrayDeserializer ], classOf [ByteArrayDeserializer ])
32
+ val c = extraConfig match {
33
+ // extraConfig is not empty
34
+ case Some (cfg) => clients.createConsumer(null , classOf [ByteArrayDeserializer ], classOf [ByteArrayDeserializer ], cfg)
35
+ // extraConfig is empty
36
+ case None => clients.createConsumer(null , classOf [ByteArrayDeserializer ], classOf [ByteArrayDeserializer ])
37
+ }
33
38
c.assign(Collections .singleton(tp))
34
39
new SingleOffsetReader (tp, c)
35
40
}
@@ -133,4 +138,4 @@ private class SingleOffsetReader(tp: TopicPartition, consumer: KafkaConsumer[Arr
133
138
def close (): Unit = {
134
139
consumer.close()
135
140
}
136
- }
141
+ }
0 commit comments