Skip to content

Commit 00cfc5e

Browse files
committed
Ensure the heartbeat message schema is optional
A non optional schema breaks value conversion. KAFKA-207 KAFKA-208
1 parent d2e2927 commit 00cfc5e

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.junit.jupiter.api.DisplayName;
4545
import org.junit.jupiter.api.Test;
4646

47+
import org.bson.BsonDocument;
4748
import org.bson.Document;
4849

4950
import com.mongodb.client.MongoCollection;
@@ -308,6 +309,33 @@ void testSourceUsesHeartbeatsForOffsets() {
308309
}
309310
}
310311

312+
@Test
313+
@DisplayName("Ensure Source heartbeats have a valid schema")
314+
void testSourceHeartbeatsHaveValidSchema() {
315+
assumeTrue(isGreaterThanFourDotZero());
316+
317+
MongoCollection<Document> coll = getAndCreateCollection();
318+
319+
String heartbeatTopic = "__HEARTBEAT_SCHEMA";
320+
321+
Properties sourceProperties = new Properties();
322+
sourceProperties.put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
323+
sourceProperties.put(
324+
MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
325+
sourceProperties.put(MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
326+
sourceProperties.put(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, heartbeatTopic);
327+
sourceProperties.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
328+
sourceProperties.put("key.converter.schemas.enable", "false");
329+
sourceProperties.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
330+
sourceProperties.put("value.converter.schemas.enable", "false");
331+
332+
addSourceConnector(sourceProperties);
333+
BsonDocument heartbeat = getHeartbeat(heartbeatTopic);
334+
335+
assertTrue(heartbeat.get("key").isDocument());
336+
assertTrue(heartbeat.get("value").isNull());
337+
}
338+
311339
@Test
312340
@DisplayName("Ensure Source provides friendly error messages for invalid pipelines")
313341
void testSourceHasFriendlyErrorMessagesForInvalidPipelines() {

src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.kafka.common.serialization.BytesDeserializer;
4141
import org.apache.kafka.common.serialization.Deserializer;
4242
import org.apache.kafka.common.utils.Bytes;
43+
import org.apache.kafka.connect.json.JsonDeserializer;
4344
import org.junit.jupiter.api.extension.RegisterExtension;
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
@@ -329,6 +330,19 @@ public void assertProducedDocs(final List<Document> docs, final MongoCollection<
329330

330331
private static final Deserializer<Bytes> BYTES_DESERIALIZER = new BytesDeserializer();
331332

333+
public BsonDocument getHeartbeat(final String topicName) {
334+
return getProduced(
335+
topicName,
336+
new JsonDeserializer(),
337+
new JsonDeserializer(),
338+
c ->
339+
BsonDocument.parse(
340+
format("{key: %s, value: %s}", c.key().textValue(), c.value().textValue())),
341+
1,
342+
1)
343+
.get(0);
344+
}
345+
332346
public List<String> getProducedStrings(final String topicName, final int expectedSize) {
333347
return getProduced(
334348
topicName,
@@ -353,16 +367,6 @@ public <T> List<T> getProduced(
353367
maxRetryCount);
354368
}
355369

356-
public List<String> getProducedKeys(final MongoCollection<?> collection, final int expectedSize) {
357-
return getProduced(
358-
collection.getNamespace().getFullName(),
359-
new MappingDeserializer<>(Bytes::toString),
360-
BYTES_DESERIALIZER,
361-
ConsumerRecord::key,
362-
expectedSize,
363-
DEFAULT_MAX_RETRIES);
364-
}
365-
366370
public static class MappingDeserializer<T> implements Deserializer<T> {
367371
private final Function<Bytes, T> mapper;
368372

src/main/java/com/mongodb/kafka/connect/source/heartbeat/HeartbeatManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Optional<SourceRecord> heartbeat() {
8585
heartbeatTopicName,
8686
Schema.STRING_SCHEMA,
8787
resumeToken,
88-
Schema.BYTES_SCHEMA,
88+
Schema.OPTIONAL_BYTES_SCHEMA,
8989
null);
9090
}
9191
return null;

0 commit comments

Comments
 (0)