Skip to content

Commit 0e60679

Browse files
committed
Changed max.num.retries default to 1.
A safer default, especially as the driver now has retryable writes. KAFKA-106
1 parent 17ef77b commit 0e60679

File tree

4 files changed

+13
-8
lines changed

4 files changed

+13
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
- [KAFKA-157](https://jira.mongodb.org/browse/KAFKA-157) Improved error message for business key errors.
2323
- [KAFKA-155](https://jira.mongodb.org/browse/KAFKA-155) Fix business key update strategies to use dot notation for filters
2424
- [KAFKA-105](https://jira.mongodb.org/browse/KAFKA-105) Improve `errors.tolerance=all` support in the sink and source connectors.
25-
25+
- [KAFKA-106](https://jira.mongodb.org/browse/KAFKA-106) Changed `max.num.retries` default to 1. A safer default especially as the driver now has retryable writes.
2626

2727
## 1.2.0
2828
- [KAFKA-92](https://jira.mongodb.org/browse/KAFKA-92) Allow the Sink connector to use multiple tasks.

config/MongoSinkConnector.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ value.converter.schema.registry.url=http://localhost:8081
1313
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
1414
database=test
1515
collection=sink
16-
max.num.retries=3
16+
max.num.retries=1
1717
retries.defer.timeout=5000
1818

1919

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTask.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void put(final Collection<SinkRecord> records) {
146146
}
147147
}
148148
});
149+
resetRemainingRetriesForTopic(topicConfig);
149150
});
150151
}
151152

@@ -206,11 +207,10 @@ private void processSinkRecords(final MongoSinkTopicConfig config, final List<Si
206207
}
207208
} catch (MongoException e) {
208209
LOGGER.warn(
209-
"Writing {} document(s) into collection [{}] failed -> remaining retries ({})",
210+
"Writing {} document(s) into collection [{}] failed.",
210211
writeModels.size(),
211-
config.getNamespace().getFullName(),
212-
getRemainingRetriesForTopic(config.getTopic()).get());
213-
checkRetriableException(config, e);
212+
config.getNamespace().getFullName());
213+
handleMongoException(config, e);
214214
} catch (Exception e) {
215215
if (!config.tolerateErrors()) {
216216
throw new DataException("Failed to write mongodb documents", e);
@@ -228,7 +228,12 @@ private AtomicInteger getRemainingRetriesForTopic(final String topic) {
228228
return remainingRetriesTopicMap.get(topic);
229229
}
230230

231-
private void checkRetriableException(final MongoSinkTopicConfig config, final MongoException e) {
231+
private void resetRemainingRetriesForTopic(final MongoSinkTopicConfig topicConfig) {
232+
getRemainingRetriesForTopic(topicConfig.getTopic())
233+
.set(topicConfig.getInt(MAX_NUM_RETRIES_CONFIG));
234+
}
235+
236+
private void handleMongoException(final MongoSinkTopicConfig config, final MongoException e) {
232237
if (getRemainingRetriesForTopic(config.getTopic()).decrementAndGet() <= 0) {
233238
if (config.logErrors()) {
234239
LOGGER.error("Error on mongodb operation", e);

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public String value() {
103103
private static final String MAX_NUM_RETRIES_DISPLAY = "Max number of retries";
104104
private static final String MAX_NUM_RETRIES_DOC =
105105
"How often a retry should be done on write errors";
106-
private static final int MAX_NUM_RETRIES_DEFAULT = 3;
106+
private static final int MAX_NUM_RETRIES_DEFAULT = 1;
107107

108108
public static final String RETRIES_DEFER_TIMEOUT_CONFIG = "retries.defer.timeout";
109109
private static final String RETRIES_DEFER_TIMEOUT_DISPLAY = "Retry defer timeout";

0 commit comments

Comments
 (0)