Skip to content

Commit bae2c93

Browse files
authored
Update KmqClient.java
1 parent 7339ba1 commit bae2c93

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

core/src/main/java/com/softwaremill/kmq/KmqClient.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@
1010
import org.apache.kafka.common.serialization.Deserializer;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
13+
import scala.Option;
1314

1415
import java.io.Closeable;
1516
import java.io.IOException;
16-
import java.util.ArrayList;
17-
import java.util.Collections;
18-
import java.util.List;
17+
import java.util.*;
1918
import java.util.concurrent.Future;
2019

2120
/**
@@ -50,10 +49,31 @@ public KmqClient(KmqConfig config, KafkaClients clients,
5049
this.msgPollTimeout = msgPollTimeout;
5150

5251
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
53-
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
5452
this.markerProducer = clients.createProducer(
55-
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
56-
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
53+
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
54+
Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));
55+
56+
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
57+
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
58+
}
59+
60+
public KmqClient(KmqConfig config, KafkaClients clients,
61+
Class<? extends Deserializer<K>> keyDeserializer,
62+
Class<? extends Deserializer<V>> valueDeserializer,
63+
long msgPollTimeout, Map<String, Object> extraConfig) {
64+
65+
this.config = config;
66+
this.msgPollTimeout = msgPollTimeout;
67+
68+
// Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
69+
// Adding the PARTITIONER_CLASS_CONFIG in extraConfig map, if extraConfig is not empty
70+
this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer, extraConfig);
71+
extraConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class);
72+
this.markerProducer = clients.createProducer(
73+
MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
74+
extraConfig);
75+
76+
5777

5878
LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
5979
msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));

0 commit comments

Comments
 (0)