Skip to content

Commit be9b6d8

Browse files
committed
Defensively handle the heartbeat manager
Also ensure its reset on restart. KAFKA-161
1 parent 0e60679 commit be9b6d8

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ public void start(final Map<String, String> props) {
165165
throw new ConnectException("Failed to start new task", e);
166166
}
167167

168+
heartbeatManager = null;
168169
createPartitionMap(sourceConfig, true);
169170

170171
mongoClient =
@@ -210,7 +211,10 @@ public List<SourceRecord> poll() {
210211
if (!sourceRecords.isEmpty()) {
211212
return sourceRecords;
212213
}
213-
return heartbeatManager.heartbeat().map(Collections::singletonList).orElse(null);
214+
if (heartbeatManager != null) {
215+
return heartbeatManager.heartbeat().map(Collections::singletonList).orElse(null);
216+
}
217+
return null;
214218
} else {
215219
BsonDocument changeStreamDocument = next.get();
216220

0 commit comments

Comments
 (0)