Skip to content

Commit 17ef77b

Browse files
committed
Support errors.tolerance
Scenarios covered: Source - Missing / invalid / not found Resume Tokens (Integration test with a mocked offsetStorageReader) - Poison pill message - invalid schema Sink - Poison pill messages - Invalid Key / Values types & invalid documents - Errors thrown by PostProcessors - Debezium CDC handler errors / poison pills KAFKA-105
1 parent 57e0cc8 commit 17ef77b

File tree

17 files changed

+1115
-92
lines changed

17 files changed

+1115
-92
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
- [KAFKA-78](https://jira.mongodb.org/browse/KAFKA-78) Added dead letter queue support for the source connector.
2222
- [KAFKA-157](https://jira.mongodb.org/browse/KAFKA-157) Improved error message for business key errors.
2323
- [KAFKA-155](https://jira.mongodb.org/browse/KAFKA-155) Fix business key update strategies to use dot notation for filters
24+
- [KAFKA-105](https://jira.mongodb.org/browse/KAFKA-105) Improve `errors.tolerance=all` support in the sink and source connectors.
2425

2526

2627
## 1.2.0

src/integrationTest/java/com/mongodb/kafka/connect/log/LogCapture.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public List<LoggingEvent> getEvents() {
4545
return new ArrayList<>(loggingEvents);
4646
}
4747

48+
public void reset() {
49+
loggingEvents.clear();
50+
}
51+
4852
@Override
4953
public void close() {
5054
logger.removeAppender(APPENDER_NAME);

src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,28 +157,30 @@ public MongoCollection<Document> getAndCreateCollection() {
157157

158158
private static final String SIMPLE_DOCUMENT = "{_id: %s}";
159159

160+
public List<Document> createDocuments(final IntStream stream) {
161+
return createDocuments(stream, SIMPLE_DOCUMENT);
162+
}
163+
164+
public List<Document> createDocuments(final IntStream stream, final String json) {
165+
return stream.mapToObj(i -> Document.parse(format(json, i))).collect(toList());
166+
}
167+
160168
public List<Document> insertMany(
161169
final IntStream stream, final MongoCollection<?>... collections) {
162170
return insertMany(stream, SIMPLE_DOCUMENT, collections);
163171
}
164172

165173
public List<Document> insertMany(
166174
final IntStream stream, final String json, final MongoCollection<?>... collections) {
167-
List<Document> docs = stream.mapToObj(i -> Document.parse(format(json, i))).collect(toList());
175+
List<Document> docs = createDocuments(stream, json);
168176
for (MongoCollection<?> c : collections) {
169-
LOGGER.debug("Inserting into {} ", c.getNamespace().getFullName());
177+
LOGGER.debug("Inserting {} documents into {} ", docs.size(), c.getNamespace().getFullName());
170178
c.withDocumentClass(Document.class).insertMany(docs);
171179
}
172180
return docs;
173181
}
174182

175-
public void assertCollection(
176-
final MongoCollection<BsonDocument> source, final MongoCollection<BsonDocument> destination) {
177-
assertCollection(source.find().into(new ArrayList<>()), destination);
178-
}
179-
180-
public void assertCollection(
181-
final List<BsonDocument> expected, final MongoCollection<BsonDocument> destination) {
183+
public <T> void assertCollection(final List<T> expected, final MongoCollection<T> destination) {
182184
int counter = 0;
183185
int retryCount = 0;
184186
while (retryCount < DEFAULT_MAX_RETRIES) {

0 commit comments

Comments
 (0)