Skip to content

Commit e96d404

Browse files
committed
Improved error logging for bulk write exceptions.
KAFKA-237
1 parent e7ce896 commit e96d404

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
## 1.6.0
66

77
### Improvements
8+
- [KAFKA-237](https://jira.mongodb.org/browse/KAFKA-237) Improved error logging for bulk write exceptions.
89
- [KAFKA-181](https://jira.mongodb.org/browse/KAFKA-181) Added support for serverApi.
910
- [KAFKA-228](https://jira.mongodb.org/browse/KAFKA-228) Added support for automatic timeseries collection creation for 5.0
1011
- [KAFKA-215](https://jira.mongodb.org/browse/KAFKA-215) Added mongo specific override options for error handling properties

src/integrationTest/java/com/mongodb/kafka/connect/sink/MongoSinkTaskIntegrationTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.kafka.connect.errors.DataException;
4141
import org.apache.kafka.connect.sink.SinkRecord;
4242
import org.apache.kafka.connect.sink.SinkTask;
43+
import org.apache.log4j.Level;
44+
import org.apache.log4j.Logger;
4345
import org.junit.jupiter.api.AfterEach;
4446
import org.junit.jupiter.api.BeforeEach;
4547
import org.junit.jupiter.api.DisplayName;
@@ -49,6 +51,7 @@
4951

5052
import org.bson.Document;
5153

54+
import com.mongodb.kafka.connect.log.LogCapture;
5255
import com.mongodb.kafka.connect.mongodb.MongoKafkaTestCase;
5356

5457
@RunWith(JUnitPlatform.class)
@@ -116,6 +119,38 @@ void testSinkProcessesTimeseriesData() {
116119
}
117120
}
118121

122+
@Test
123+
@DisplayName("Ensure bulk write operation error write models are included in the log")
124+
void testBulkWriteOperationErrorWriteModelsIncludedInTheLog() {
125+
try (LogCapture logCapture = new LogCapture(Logger.getLogger(MongoSinkTask.class))) {
126+
try (AutoCloseableSinkTask task = createSinkTask()) {
127+
Map<String, String> cfg = createSettings();
128+
cfg.put(
129+
MongoSinkTopicConfig.WRITEMODEL_STRATEGY_CONFIG,
130+
"com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy");
131+
task.start(cfg);
132+
List<Document> documents = createDocuments(rangeClosed(1, 10));
133+
documents.add(new Document("_id", 4));
134+
List<SinkRecord> sinkRecords = createRecords(documents);
135+
try {
136+
task.put(sinkRecords);
137+
} catch (Exception e) {
138+
// ignore
139+
}
140+
}
141+
142+
assertTrue(
143+
logCapture.getEvents().stream()
144+
.filter(e -> e.getLevel().equals(Level.ERROR))
145+
.anyMatch(
146+
e ->
147+
e.getMessage()
148+
.toString()
149+
.startsWith(
150+
"WriteErrors: [BulkWriteError{writeModel=InsertOneModel{document={\"_id\": 4}}")));
151+
}
152+
}
153+
119154
@Test
120155
@DisplayName("Ensure sink can handle Tombstone null events")
121156
void testSinkCanHandleTombstoneNullEvents() {

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTask.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static com.mongodb.kafka.connect.util.TimeseriesValidation.validateCollection;
2727
import static java.util.Collections.emptyList;
2828

29+
import java.util.ArrayList;
2930
import java.util.Collection;
3031
import java.util.HashSet;
3132
import java.util.List;
@@ -54,6 +55,7 @@
5455
import com.mongodb.MongoClientSettings;
5556
import com.mongodb.MongoException;
5657
import com.mongodb.MongoNamespace;
58+
import com.mongodb.bulk.BulkWriteError;
5759
import com.mongodb.bulk.BulkWriteResult;
5860
import com.mongodb.client.MongoClient;
5961
import com.mongodb.client.MongoClients;
@@ -236,7 +238,7 @@ private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
236238
"Writing {} document(s) into collection [{}] failed.",
237239
writeModels.size(),
238240
namespace.getFullName());
239-
handleMongoException(config, e);
241+
handleMongoException(config, writeModels, e);
240242
} catch (InterruptedException e) {
241243
Thread.currentThread().interrupt();
242244
throw new DataException("Rate limiting was interrupted", e);
@@ -279,14 +281,19 @@ private AtomicInteger getRemainingRetriesForTopic(final String topic) {
279281
return remainingRetriesTopicMap.get(topic);
280282
}
281283

282-
private void handleMongoException(final MongoSinkTopicConfig config, final MongoException e) {
284+
private void handleMongoException(
285+
final MongoSinkTopicConfig config,
286+
final List<WriteModel<BsonDocument>> writeModels,
287+
final MongoException e) {
283288
if (getRemainingRetriesForTopic(config.getTopic()).decrementAndGet() <= 0) {
284289
if (config.logErrors()) {
285290
LOGGER.error("Error on mongodb operation", e);
286291
if (e instanceof MongoBulkWriteException) {
287292
LOGGER.error("Mongodb bulk write (partially) failed", e);
288293
LOGGER.error("WriteResult: {}", ((MongoBulkWriteException) e).getWriteResult());
289-
LOGGER.error("WriteErrors: {}", ((MongoBulkWriteException) e).getWriteErrors());
294+
LOGGER.error(
295+
"WriteErrors: {}",
296+
generateWriteErrors(((MongoBulkWriteException) e).getWriteErrors(), writeModels));
290297
LOGGER.error(
291298
"WriteConcernError: {}", ((MongoBulkWriteException) e).getWriteConcernError());
292299
}
@@ -301,4 +308,29 @@ private void handleMongoException(final MongoSinkTopicConfig config, final Mongo
301308
throw new RetriableException(e.getMessage(), e);
302309
}
303310
}
311+
312+
private String generateWriteErrors(
313+
final List<BulkWriteError> bulkWriteErrorList,
314+
final List<WriteModel<BsonDocument>> writeModels) {
315+
List<String> errorString = new ArrayList<>();
316+
for (final BulkWriteError bulkWriteError : bulkWriteErrorList) {
317+
if (bulkWriteError.getIndex() < writeModels.size()) {
318+
errorString.add(
319+
"BulkWriteError{"
320+
+ "writeModel="
321+
+ writeModels.get(bulkWriteError.getIndex())
322+
+ ", code="
323+
+ bulkWriteError.getCode()
324+
+ ", message='"
325+
+ bulkWriteError.getMessage()
326+
+ '\''
327+
+ ", details="
328+
+ bulkWriteError.getDetails()
329+
+ '}');
330+
} else {
331+
errorString.add(bulkWriteError.toString());
332+
}
333+
}
334+
return "[" + String.join(", ", errorString) + "]";
335+
}
304336
}

0 commit comments

Comments
 (0)