Skip to content

Commit 2f1b034

Browse files
authored
Fixed Source Connector recovery during getMore call (#79)
KAFKA-230 KAFKA-219
1 parent cb43188 commit 2f1b034

File tree

3 files changed

+122
-27
lines changed

3 files changed

+122
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
### Bug Fixes
1616
- [KAFKA-218](https://jira.mongodb.org/browse/KAFKA-218) Fixed bug in LazyBsonDocument#clone ignoring any changes made once unwrapped.
1717
- [KAFKA-220](https://jira.mongodb.org/browse/KAFKA-220) Fixed bug with timestamp integer overflow.
18+
- [KAFKA-219](https://jira.mongodb.org/browse/KAFKA-219) Fixed Source Connector unable to recover from broken change stream due to event > 16MB
19+
- [KAFKA-230](https://jira.mongodb.org/browse/KAFKA-230) Fixed Source Connector recovery during getMore call
1820

1921
## 1.5.1
2022

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static java.util.Collections.singletonList;
2626
import static java.util.Collections.singletonMap;
2727
import static java.util.stream.Collectors.toList;
28+
import static java.util.stream.IntStream.range;
2829
import static java.util.stream.IntStream.rangeClosed;
2930
import static org.junit.jupiter.api.Assertions.assertAll;
3031
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -69,6 +70,7 @@
6970

7071
import com.mongodb.client.MongoCollection;
7172
import com.mongodb.client.MongoDatabase;
73+
import com.mongodb.client.model.Updates;
7274

7375
import com.mongodb.kafka.connect.log.LogCapture;
7476
import com.mongodb.kafka.connect.mongodb.ChangeStreamOperations.ChangeStreamOperation;
@@ -750,6 +752,49 @@ void testErrorToleranceNoneSupport() {
750752
}
751753
}
752754

755+
@Test
756+
@DisplayName("Ensure source honours error tolerance all and > 16mb change stream message")
757+
void testErrorToleranceAllSupport16MbError() {
758+
try (AutoCloseableSourceTask task = createSourceTask(Logger.getLogger(MongoSourceTask.class))) {
759+
MongoCollection<Document> coll = getAndCreateCollection();
760+
HashMap<String, String> cfg =
761+
new HashMap<String, String>() {
762+
{
763+
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
764+
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
765+
put(MongoSourceConfig.FULL_DOCUMENT_CONFIG, "updateLookup");
766+
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "5");
767+
put(MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.ALL.value());
768+
}
769+
};
770+
771+
task.start(cfg);
772+
773+
insertMany(rangeClosed(1, 5), coll);
774+
List<SourceRecord> poll = getNextResults(task);
775+
assertEquals(5, poll.size());
776+
777+
// Poison the change stream
778+
coll.updateOne(new Document("_id", 3), Updates.set("y", new byte[(1024 * 1024 * 16) - 30]));
779+
task.poll(); // Use poll directly as no results are expected.
780+
781+
// Insert some new data and confirm new events are available post change stream restart
782+
insertMany(range(10, 15), coll);
783+
poll = getNextResults(task);
784+
785+
assertEquals(5, poll.size());
786+
assertTrue(
787+
task.logCapture.getEvents().stream()
788+
.filter(e -> e.getLevel().equals(Level.WARN))
789+
.anyMatch(
790+
e ->
791+
e.getMessage()
792+
.toString()
793+
.startsWith(
794+
"Failed to resume change stream: Query failed with error code 10334")));
795+
}
796+
}
797+
753798
private void assertSourceRecordValues(
754799
final List<? extends ChangeStreamOperation> expectedChangeStreamOperations,
755800
final List<SourceRecord> allSourceRecords,

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,18 @@
3232
import static com.mongodb.kafka.connect.util.ConfigHelper.getMongoDriverInformation;
3333
import static com.mongodb.kafka.connect.util.ServerApiConfig.setServerApi;
3434
import static java.lang.String.format;
35+
import static java.util.Arrays.asList;
3536
import static java.util.Collections.singletonMap;
3637

3738
import java.util.ArrayList;
3839
import java.util.Collections;
3940
import java.util.HashMap;
41+
import java.util.HashSet;
4042
import java.util.List;
4143
import java.util.Locale;
4244
import java.util.Map;
4345
import java.util.Optional;
46+
import java.util.Set;
4447
import java.util.concurrent.atomic.AtomicBoolean;
4548
import java.util.function.Supplier;
4649

@@ -63,6 +66,7 @@
6366
import com.mongodb.ConnectionString;
6467
import com.mongodb.MongoClientSettings;
6568
import com.mongodb.MongoCommandException;
69+
import com.mongodb.MongoException;
6670
import com.mongodb.client.ChangeStreamIterable;
6771
import com.mongodb.client.MongoChangeStreamCursor;
6872
import com.mongodb.client.MongoClient;
@@ -119,12 +123,24 @@ public final class MongoSourceTask extends SourceTask {
119123
private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
120124
private static final int ILLEGAL_OPERATION_ERROR = 20;
121125
private static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
126+
private static final int CHANGE_STREAM_FATAL_ERROR = 280;
127+
private static final int CHANGE_STREAM_HISTORY_LOST = 286;
128+
private static final int BSON_OBJECT_TOO_LARGE = 10334;
129+
private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
130+
new HashSet<>(
131+
asList(
132+
INVALIDATED_RESUME_TOKEN_ERROR,
133+
CHANGE_STREAM_FATAL_ERROR,
134+
CHANGE_STREAM_HISTORY_LOST,
135+
BSON_OBJECT_TOO_LARGE));
122136
private static final int UNKNOWN_FIELD_ERROR = 40415;
123137
private static final int FAILED_TO_PARSE_ERROR = 9;
124138
private static final String RESUME_TOKEN = "resume token";
139+
private static final String RESUME_POINT = "resume point";
125140
private static final String NOT_FOUND = "not found";
126141
private static final String DOES_NOT_EXIST = "does not exist";
127142
private static final String INVALID_RESUME_TOKEN = "invalid resume token";
143+
private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";
128144

129145
private final Time time;
130146
private final AtomicBoolean isRunning = new AtomicBoolean();
@@ -366,6 +382,28 @@ MongoChangeStreamCursor<? extends BsonDocument> createCursor(
366382
return tryCreateCursor(sourceConfig, mongoClient, getResumeToken(sourceConfig));
367383
}
368384

385+
private MongoChangeStreamCursor<? extends BsonDocument> tryRecreateCursor(
386+
final MongoException e) {
387+
int errorCode =
388+
e instanceof MongoCommandException
389+
? ((MongoCommandException) e).getErrorCode()
390+
: e.getCode();
391+
String errorMessage =
392+
e instanceof MongoCommandException
393+
? ((MongoCommandException) e).getErrorMessage()
394+
: e.getMessage();
395+
LOGGER.warn(
396+
"Failed to resume change stream: {} {}\n"
397+
+ "===================================================================================\n"
398+
+ "When the resume token is no longer available there is the potential for data loss.\n\n"
399+
+ "Restarting the change stream with no resume token because `errors.tolerance=all`.\n"
400+
+ "===================================================================================\n",
401+
errorMessage,
402+
errorCode);
403+
invalidatedCursor = true;
404+
return tryCreateCursor(sourceConfig, mongoClient, null);
405+
}
406+
369407
private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
370408
final MongoSourceConfig sourceConfig,
371409
final MongoClient mongoClient,
@@ -394,17 +432,8 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
394432
} else if (doesNotSupportsStartAfter(e)) {
395433
supportsStartAfter = false;
396434
return tryCreateCursor(sourceConfig, mongoClient, resumeToken);
397-
} else if (sourceConfig.tolerateErrors() && resumeTokenNotFound(e)) {
398-
LOGGER.warn(
399-
"Failed to resume change stream: {} {}\n"
400-
+ "===================================================================================\n"
401-
+ "When the resume token is no longer available there is the potential for data loss.\n\n"
402-
+ "Restarting the change stream with no resume token because `errors.tolerance=all`.\n"
403-
+ "===================================================================================\n",
404-
e.getErrorMessage(),
405-
e.getErrorCode());
406-
invalidatedCursor = true;
407-
return tryCreateCursor(sourceConfig, mongoClient, null);
435+
} else if (sourceConfig.tolerateErrors() && changeStreamNotValid(e)) {
436+
return tryRecreateCursor(e);
408437
}
409438
}
410439
if (e.getErrorCode() == NAMESPACE_NOT_FOUND_ERROR) {
@@ -438,7 +467,7 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
438467
+ "=====================================================================================\n",
439468
e.getErrorMessage(),
440469
e.getErrorCode());
441-
if (resumeTokenNotFound(e)) {
470+
if (changeStreamNotValid(e)) {
442471
throw new ConnectException(
443472
"ResumeToken not found. Cannot create a change stream cursor", e);
444473
}
@@ -456,12 +485,19 @@ private boolean invalidatedResumeToken(final MongoCommandException e) {
456485
return e.getErrorCode() == INVALIDATED_RESUME_TOKEN_ERROR;
457486
}
458487

459-
private boolean resumeTokenNotFound(final MongoCommandException e) {
460-
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
461-
return errorMessage.contains(RESUME_TOKEN)
488+
private boolean changeStreamNotValid(final MongoException e) {
489+
if (INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode())) {
490+
return true;
491+
}
492+
String errorMessage =
493+
e instanceof MongoCommandException
494+
? ((MongoCommandException) e).getErrorMessage().toLowerCase(Locale.ROOT)
495+
: e.getMessage().toLowerCase(Locale.ROOT);
496+
return (errorMessage.contains(RESUME_TOKEN) || errorMessage.contains(RESUME_POINT))
462497
&& (errorMessage.contains(NOT_FOUND)
463498
|| errorMessage.contains(DOES_NOT_EXIST)
464-
|| errorMessage.contains(INVALID_RESUME_TOKEN));
499+
|| errorMessage.contains(INVALID_RESUME_TOKEN)
500+
|| errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
465501
}
466502

467503
Map<String, Object> createPartitionMap(final MongoSourceConfig sourceConfig) {
@@ -588,26 +624,38 @@ private Optional<BsonDocument> getNextDocument() {
588624
next = cursor != null ? cursor.tryNext() : null;
589625
}
590626
return Optional.ofNullable(next);
591-
} catch (Exception e) {
592-
if (cursor != null) {
593-
try {
594-
cursor.close();
595-
} catch (Exception e1) {
596-
// ignore
627+
} catch (MongoException e) {
628+
closeCursor();
629+
if (isRunning.get()) {
630+
if (sourceConfig.tolerateErrors() && changeStreamNotValid(e)) {
631+
cursor = tryRecreateCursor(e);
632+
} else {
633+
LOGGER.info(
634+
"An exception occurred when trying to get the next item from the Change Stream", e);
597635
}
598-
cursor = null;
599636
}
637+
return Optional.empty();
638+
} catch (Exception e) {
639+
closeCursor();
600640
if (isRunning.get()) {
601-
LOGGER.info(
602-
"An exception occurred when trying to get the next item from the Change Stream: {}",
603-
e.getMessage());
641+
throw new ConnectException("Unexpected error: " + e.getMessage(), e);
604642
}
605-
return Optional.empty();
606643
}
607644
}
608645
return Optional.empty();
609646
}
610647

648+
private void closeCursor() {
649+
if (cursor != null) {
650+
try {
651+
cursor.close();
652+
} catch (Exception e1) {
653+
// ignore
654+
}
655+
cursor = null;
656+
}
657+
}
658+
611659
private void invalidateCursorAndReinitialize() {
612660
invalidatedCursor = true;
613661
cursor.close();

0 commit comments

Comments
 (0)