Skip to content

Commit 9b150bc

Browse files
authored
Merge pull request #14 from cricket007/fix-non-existing-offsets
Fix NPE when offsets not existing on auto.offset.reset=none
2 parents e29ea61 + b30c648 commit 9b150bc

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

src/main/java/com/comcast/kafka/connect/kafka/KafkaSourceTask.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,18 @@
1010

1111
package com.comcast.kafka.connect.kafka;
1212

13+
import org.apache.kafka.clients.consumer.ConsumerConfig;
1314
import org.apache.kafka.clients.consumer.ConsumerRecord;
1415
import org.apache.kafka.clients.consumer.ConsumerRecords;
1516
import org.apache.kafka.clients.consumer.KafkaConsumer;
17+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1618
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
1719
import org.apache.kafka.common.TopicPartition;
1820
import org.apache.kafka.common.errors.WakeupException;
1921
import org.apache.kafka.common.header.Header;
2022
import org.apache.kafka.common.header.Headers;
2123
import org.apache.kafka.connect.data.Schema;
24+
import org.apache.kafka.connect.errors.ConnectException;
2225
import org.apache.kafka.connect.header.ConnectHeaders;
2326
import org.apache.kafka.connect.source.SourceTask;
2427
import org.apache.kafka.connect.source.SourceRecord;
@@ -69,13 +72,13 @@ public void start(Map<String, String> opts) {
6972
includeHeaders = sourceConnectorConfig.getBoolean(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG);
7073
String unknownOffsetResetPosition = sourceConnectorConfig
7174
.getString(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG);
72-
73-
// Get the leader topic partitions to work with
75+
76+
// Get the leader topic partitions to work with
7477
List<LeaderTopicPartition> leaderTopicPartitions = Arrays
7578
.asList(opts.get(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG).split(",")).stream()
7679
.map(LeaderTopicPartition::fromString).collect(Collectors.toList());
77-
78-
// retrieve the existing offsets (if any) for the configured partitions
80+
81+
// retrieve the existing offsets (if any) for the configured partitions
7982
List<Map<String, String>> offsetLookupPartitions = leaderTopicPartitions.stream()
8083
.map(leaderTopicPartition -> Collections.singletonMap(TOPIC_PARTITION_KEY,
8184
leaderTopicPartition.toTopicPartitionString()))
@@ -87,7 +90,7 @@ public void start(Map<String, String> opts) {
8790
.collect(Collectors.toMap(e -> e.getKey().get(TOPIC_PARTITION_KEY), e -> (long) e.getValue().get(OFFSET_KEY)));
8891
// Set up Kafka consumer
8992
consumer = new KafkaConsumer<byte[], byte[]>(sourceConnectorConfig.getKafkaConsumerProperties());
90-
93+
9194
// Get topic partitions and offsets so we can seek() to them
9295
Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
9396
List<TopicPartition> topicPartitionsWithUnknownOffset = new ArrayList<>();
@@ -101,7 +104,7 @@ public void start(Map<String, String> opts) {
101104
topicPartitionsWithUnknownOffset.add(topicPartition);
102105
}
103106
}
104-
107+
105108
// Set default offsets for partitions without stored offsets
106109
if (topicPartitionsWithUnknownOffset.size() > 0) {
107110
Map<TopicPartition, Long> defaultOffsets;
@@ -115,7 +118,16 @@ public void start(Map<String, String> opts) {
115118
} else if (unknownOffsetResetPosition.equals(OffsetResetStrategy.NONE.toString().toLowerCase())) {
116119
logger.info("Will try to use existing consumer group offsets for partitions.");
117120
defaultOffsets = topicPartitionsWithUnknownOffset.stream()
118-
.collect(Collectors.toMap(Function.identity(), tp -> consumer.committed(tp).offset()));
121+
.collect(Collectors.toMap(Function.identity(), tp -> {
122+
OffsetAndMetadata committed = consumer.committed(tp);
123+
if (committed == null) {
124+
throw new ConnectException(
125+
String.format("Unable to find committed offsets for consumer group for when %s=%s",
126+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
127+
OffsetResetStrategy.NONE.toString().toLowerCase()));
128+
}
129+
return committed.offset();
130+
}));
119131
} else {
120132
logger.warn(
121133
"Config value {}, is set to an unknown value: {}. Partitions without existing offset data will not be consumed.",
@@ -124,11 +136,11 @@ public void start(Map<String, String> opts) {
124136
}
125137
topicPartitionOffsets.putAll(defaultOffsets);
126138
}
127-
139+
128140
// List of topic partitions to assign
129141
List<TopicPartition> topicPartitionsToAssign = new ArrayList<>(topicPartitionOffsets.keySet());
130142
consumer.assign(topicPartitionsToAssign);
131-
143+
132144
// Seek to desired offset for each partition
133145
topicPartitionOffsets.forEach((key, value) -> consumer.seek(key, value));
134146
}

0 commit comments

Comments
 (0)