Skip to content

Commit 66479fe

Browse files
committed
Updated Retryable errors
Updated what is retryable for SDAM, bulk writes and change streams. JAVA-2847
1 parent fa16d59 commit 66479fe

40 files changed

+2940
-205
lines changed

driver-async/src/test/functional/com/mongodb/async/client/RetryableWritesTest.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@
4444
import java.util.ArrayList;
4545
import java.util.Collection;
4646
import java.util.List;
47-
import java.util.Map;
4847

4948
import static com.mongodb.ClusterFixture.getDefaultDatabaseName;
5049
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
5150
import static com.mongodb.ClusterFixture.isSharded;
5251
import static com.mongodb.ClusterFixture.isStandalone;
5352
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
53+
import static com.mongodb.ClusterFixture.serverVersionLessThan;
5454
import static com.mongodb.async.client.Fixture.getMongoClientBuilderFromConnectionString;
5555
import static org.junit.Assert.assertEquals;
5656
import static org.junit.Assume.assumeFalse;
@@ -59,13 +59,14 @@
5959
// See https://github.com/mongodb/specifications/tree/master/source/retryable-writes/tests
6060
@RunWith(Parameterized.class)
6161
public class RetryableWritesTest extends DatabaseTestCase {
62-
private static MongoClient mongoClient;
6362
private final String filename;
6463
private final String description;
6564
private final String databaseName;
6665
private final String collectionName;
6766
private final BsonArray data;
6867
private final BsonDocument definition;
68+
private MongoClient mongoClient;
69+
private CollectionHelper<Document> collectionHelper;
6970
private MongoCollection<BsonDocument> collection;
7071
private JsonPoweredCrudTestHelper helper;
7172

@@ -80,15 +81,10 @@ public RetryableWritesTest(final String filename, final String description, fina
8081

8182
@BeforeClass
8283
public static void beforeClass() {
83-
com.mongodb.MongoClientSettings.Builder builder = getMongoClientBuilderFromConnectionString();
84-
mongoClient = MongoClients.create(builder.retryWrites(true).build());
8584
}
8685

8786
@AfterClass
8887
public static void afterClass() {
89-
if (mongoClient != null) {
90-
mongoClient.close();
91-
}
9288
}
9389

9490
@Before
@@ -116,27 +112,37 @@ public void setUp() {
116112
}
117113
}
118114
}
115+
collectionHelper = new CollectionHelper<Document>(new DocumentCodec(), new MongoNamespace(databaseName, collectionName));
116+
BsonDocument clientOptions = definition.getDocument("clientOptions", new BsonDocument());
117+
mongoClient = MongoClients.create(getMongoClientBuilderFromConnectionString()
118+
.retryWrites(clientOptions.getBoolean("retryWrites", BsonBoolean.FALSE).getValue())
119+
.build());
119120

120121
List<BsonDocument> documents = new ArrayList<BsonDocument>();
121122
for (BsonValue document : data) {
122123
documents.add(document.asDocument());
123124
}
124-
CollectionHelper<Document> collectionHelper = new CollectionHelper<Document>(new DocumentCodec(),
125-
new MongoNamespace(databaseName, collectionName));
126125

127126
collectionHelper.drop();
128127
collectionHelper.insertDocuments(documents);
129128

130129
MongoDatabase database = mongoClient.getDatabase(databaseName);
131130
collection = database.getCollection(collectionName, BsonDocument.class);
132131
helper = new JsonPoweredCrudTestHelper(description, database, collection);
133-
setFailPoint();
132+
if (definition.containsKey("failPoint")) {
133+
collectionHelper.runAdminCommand(definition.getDocument("failPoint"));
134+
}
134135
}
135136

136137
@After
137138
public void cleanUp() {
138-
if (canRunTests()) {
139-
unsetFailPoint();
139+
if (mongoClient != null) {
140+
mongoClient.close();
141+
}
142+
if (collectionHelper != null && definition.containsKey("failPoint")) {
143+
collectionHelper.runAdminCommand(new BsonDocument("configureFailPoint",
144+
definition.getDocument("failPoint").getString("configureFailPoint"))
145+
.append("mode", new BsonString("off")));
140146
}
141147
}
142148

@@ -172,6 +178,10 @@ public static Collection<Object[]> data() throws URISyntaxException, IOException
172178
List<Object[]> data = new ArrayList<Object[]>();
173179
for (File file : JsonPoweredTestHelper.getTestFiles("/retryable-writes")) {
174180
BsonDocument testDocument = JsonPoweredTestHelper.getTestDocument(file);
181+
if (testDocument.containsKey("minServerVersion")
182+
&& serverVersionLessThan(testDocument.getString("minServerVersion").getValue())) {
183+
continue;
184+
}
175185
for (BsonValue test : testDocument.getArray("tests")) {
176186
data.add(new Object[]{file.getName(), test.asDocument().getString("description").getValue(),
177187
testDocument.getArray("data"), test.asDocument()});
@@ -184,25 +194,6 @@ private boolean canRunTests() {
184194
return serverVersionAtLeast(3, 6) && isDiscoverableReplicaSet();
185195
}
186196

187-
private void setFailPoint() {
188-
if (definition.containsKey("failPoint")) {
189-
BsonDocument command = new BsonDocument("configureFailPoint", new BsonString("onPrimaryTransactionalWrite"));
190-
for (Map.Entry<String, BsonValue> args : definition.getDocument("failPoint").entrySet()) {
191-
command.put(args.getKey(), args.getValue());
192-
}
193-
FutureResultCallback<Document> futureResultCallback = new FutureResultCallback<Document>();
194-
mongoClient.getDatabase("admin").runCommand(command, futureResultCallback);
195-
futureResult(futureResultCallback);
196-
}
197-
}
198-
199-
private void unsetFailPoint() {
200-
FutureResultCallback<Document> futureResultCallback = new FutureResultCallback<Document>();
201-
mongoClient.getDatabase("admin").runCommand(
202-
BsonDocument.parse("{ configureFailPoint: 'onPrimaryTransactionalWrite', mode: 'off'}"), futureResultCallback);
203-
futureResult(futureResultCallback);
204-
}
205-
206197
<T> T futureResult(final FutureResultCallback<T> callback) {
207198
try {
208199
return callback.get();

driver-async/src/test/functional/com/mongodb/async/client/TransactionsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public void cleanUp() {
220220
mongoClient.close();
221221
}
222222

223-
if (definition.containsKey("failPoint")) {
223+
if (collectionHelper != null && definition.containsKey("failPoint")) {
224224
collectionHelper.runAdminCommand(new BsonDocument("configureFailPoint",
225225
definition.getDocument("failPoint").getString("configureFailPoint"))
226226
.append("mode", new BsonString("off")));

driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,18 +191,23 @@ public void onResult(final T result, final Throwable t) {
191191
}, LOGGER));
192192
}
193193

194+
@SuppressWarnings("unchecked")
194195
@Override
195196
public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection connection,
196197
final SessionContext sessionContext) {
197198
try {
198199
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext, clusterClock));
199200
return protocol.execute(connection);
201+
} catch (MongoWriteConcernWithResponseException e) {
202+
invalidate();
203+
return (T) e.getResponse();
200204
} catch (MongoException e) {
201205
handleThrowable(e);
202206
throw e;
203207
}
204208
}
205209

210+
@SuppressWarnings("unchecked")
206211
@Override
207212
public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalConnection connection,
208213
final SessionContext sessionContext, final SingleResultCallback<T> callback) {
@@ -211,8 +216,13 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
211216
@Override
212217
public void onResult(final T result, final Throwable t) {
213218
if (t != null) {
214-
handleThrowable(t);
215-
callback.onResult(null, t);
219+
if (t instanceof MongoWriteConcernWithResponseException) {
220+
invalidate();
221+
callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null);
222+
} else {
223+
handleThrowable(t);
224+
callback.onResult(null, t);
225+
}
216226
} else {
217227
callback.onResult(result, null);
218228
}

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
6262
import static com.mongodb.internal.connection.MessageHeader.MESSAGE_HEADER_LENGTH;
6363
import static com.mongodb.internal.connection.OpCode.OP_COMPRESSED;
64+
import static com.mongodb.internal.connection.ProtocolHelper.createSpecialWriteConcernException;
6465
import static com.mongodb.internal.connection.ProtocolHelper.getClusterTime;
6566
import static com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException;
6667
import static com.mongodb.internal.connection.ProtocolHelper.getMessageSettings;
@@ -295,7 +296,7 @@ private <T> T receiveCommandMessageResponse(final CommandMessage message, final
295296

296297
commandEventSender.sendSucceededEvent(responseBuffers);
297298

298-
return new ReplyMessage<T>(responseBuffers, decoder, message.getId()).getDocuments().get(0);
299+
return getCommandResult(decoder, responseBuffers, message.getId());
299300
} finally {
300301
responseBuffers.close();
301302
}
@@ -373,8 +374,8 @@ public void onResult(final ResponseBuffers responseBuffers, final Throwable t) {
373374
throw commandFailureException;
374375
}
375376
commandEventSender.sendSucceededEvent(responseBuffers);
376-
T result = new ReplyMessage<T>(responseBuffers, decoder, messageId).getDocuments().get(0);
377377

378+
T result = getCommandResult(decoder, responseBuffers, messageId);
378379
callback.onResult(result, null);
379380
} catch (Throwable localThrowable) {
380381
callback.onResult(null, localThrowable);
@@ -388,6 +389,15 @@ public void onResult(final ResponseBuffers responseBuffers, final Throwable t) {
388389
});
389390
}
390391

392+
private <T> T getCommandResult(final Decoder<T> decoder, final ResponseBuffers responseBuffers, final int messageId) {
393+
T result = new ReplyMessage<T>(responseBuffers, decoder, messageId).getDocuments().get(0);
394+
MongoException writeConcernBasedError = createSpecialWriteConcernException(responseBuffers, description.getServerAddress());
395+
if (writeConcernBasedError != null) {
396+
throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result);
397+
}
398+
return result;
399+
}
400+
391401
@Override
392402
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
393403
notNull("stream is open", stream);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.connection;
18+
19+
import com.mongodb.MongoException;
20+
21+
public class MongoWriteConcernWithResponseException extends MongoException {
22+
private static final long serialVersionUID = 1707360842648550287L;
23+
private final MongoException cause;
24+
private final Object response;
25+
26+
public MongoWriteConcernWithResponseException(final MongoException exception, final Object response) {
27+
super(exception.getCode(), exception.getMessage(), exception);
28+
this.cause = exception;
29+
this.response = response;
30+
}
31+
32+
@Override
33+
public MongoException getCause() {
34+
return cause;
35+
}
36+
37+
public Object getResponse() {
38+
return response;
39+
}
40+
}

driver-core/src/main/com/mongodb/internal/connection/ProtocolHelper.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,17 @@
4949
import org.bson.io.BsonOutput;
5050
import org.bson.io.ByteBufferBsonInput;
5151

52+
import java.util.List;
53+
5254
import static java.lang.String.format;
55+
import static java.util.Arrays.asList;
5356
import static org.bson.codecs.BsonValueCodecProvider.getClassForBsonType;
5457
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
5558

56-
final class ProtocolHelper {
59+
/**
60+
* This class is NOT part of the public API. It may change at any time without notification.
61+
*/
62+
public final class ProtocolHelper {
5763
private static final Logger PROTOCOL_EVENT_LOGGER = Loggers.getLogger("protocol.event");
5864
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
5965

@@ -94,6 +100,15 @@ static boolean isCommandOk(final ResponseBuffers responseBuffers) {
94100
}
95101
}
96102

103+
static MongoException createSpecialWriteConcernException(final ResponseBuffers responseBuffers, final ServerAddress serverAddress) {
104+
BsonValue writeConcernError = getField(createBsonReader(responseBuffers), "writeConcernError");
105+
if (writeConcernError == null) {
106+
return null;
107+
} else {
108+
return createSpecialException(writeConcernError.asDocument(), serverAddress, "errmsg");
109+
}
110+
}
111+
97112
static BsonTimestamp getOperationTime(final ResponseBuffers responseBuffers) {
98113
try {
99114
BsonValue operationTime = getField(createBsonReader(responseBuffers), "operationTime");
@@ -215,19 +230,28 @@ static RequestMessage.EncodingMetadata encodeMessageWithMetadata(final RequestMe
215230
}
216231
}
217232

218-
private static MongoException createSpecialException(final BsonDocument response, final ServerAddress serverAddress,
219-
final String errorMessageFieldName) {
220-
if (ErrorCategory.fromErrorCode(getErrorCode(response)) == ErrorCategory.EXECUTION_TIMEOUT) {
221-
return new MongoExecutionTimeoutException(getErrorCode(response), getErrorMessage(response, errorMessageFieldName));
222-
} else if (getErrorMessage(response, errorMessageFieldName).startsWith("not master")) {
223-
return new MongoNotPrimaryException(serverAddress);
224-
} else if (getErrorMessage(response, errorMessageFieldName).startsWith("node is recovering")) {
233+
private static final List<Integer> NOT_MASTER_CODES = asList(10107, 13435);
234+
private static final List<Integer> RECOVERING_CODES = asList(11600, 11602, 13436, 189, 91);
235+
public static MongoException createSpecialException(final BsonDocument response, final ServerAddress serverAddress,
236+
final String errorMessageFieldName) {
237+
int errorCode = getErrorCode(response);
238+
String errorMessage = getErrorMessage(response, errorMessageFieldName);
239+
if (ErrorCategory.fromErrorCode(errorCode) == ErrorCategory.EXECUTION_TIMEOUT) {
240+
return new MongoExecutionTimeoutException(errorCode, errorMessage);
241+
} else if (errorMessage.contains("not master or secondary") || errorMessage.contains("node is recovering")
242+
|| RECOVERING_CODES.contains(errorCode)) {
225243
return new MongoNodeIsRecoveringException(serverAddress);
244+
} else if (errorMessage.contains("not master") || NOT_MASTER_CODES.contains(errorCode)) {
245+
return new MongoNotPrimaryException(serverAddress);
246+
} else if (response.containsKey("writeConcernError")) {
247+
return createSpecialException(response.getDocument("writeConcernError"), serverAddress, "errmsg");
226248
} else {
227249
return null;
228250
}
229251
}
230252

253+
254+
231255
private static boolean hasWriteError(final BsonDocument response) {
232256
String err = WriteConcernException.extractErrorMessage(response);
233257
return err != null && err.length() > 0;

driver-core/src/main/com/mongodb/internal/operation/WriteConcernHelper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.MongoException;
1920
import com.mongodb.MongoWriteConcernException;
2021
import com.mongodb.ServerAddress;
2122
import com.mongodb.WriteConcern;
2223
import com.mongodb.WriteConcernResult;
2324
import com.mongodb.bulk.WriteConcernError;
2425
import com.mongodb.connection.ConnectionDescription;
26+
import com.mongodb.internal.connection.ProtocolHelper;
2527
import org.bson.BsonDocument;
2628
import org.bson.BsonString;
2729

@@ -41,10 +43,18 @@ public static void appendWriteConcernToCommand(final WriteConcern writeConcern,
4143

4244
public static void throwOnWriteConcernError(final BsonDocument result, final ServerAddress serverAddress) {
4345
if (hasWriteConcernError(result)) {
46+
throwOnSpecialException(result, serverAddress);
4447
throw createWriteConcernException(result, serverAddress);
4548
}
4649
}
4750

51+
public static void throwOnSpecialException(final BsonDocument result, final ServerAddress serverAddress) {
52+
MongoException specialException = ProtocolHelper.createSpecialException(result, serverAddress, "errmsg");
53+
if (specialException != null) {
54+
throw specialException;
55+
}
56+
}
57+
4858
public static boolean hasWriteConcernError(final BsonDocument result) {
4959
return result.containsKey("writeConcernError");
5060
}

driver-core/src/main/com/mongodb/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717
package com.mongodb.operation;
1818

1919
import com.mongodb.MongoChangeStreamException;
20-
import com.mongodb.MongoCursorNotFoundException;
21-
import com.mongodb.MongoNotPrimaryException;
22-
import com.mongodb.MongoSocketException;
2320
import com.mongodb.async.AsyncBatchCursor;
2421
import com.mongodb.async.SingleResultCallback;
2522
import com.mongodb.binding.AsyncReadBinding;
@@ -30,6 +27,7 @@
3027
import java.util.List;
3128

3229
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
30+
import static com.mongodb.operation.ChangeStreamBatchCursorHelper.isRetryableError;
3331
import static com.mongodb.operation.OperationHelper.LOGGER;
3432

3533
final class AsyncChangeStreamBatchCursor<T> implements AsyncBatchCursor<T> {
@@ -104,9 +102,7 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
104102
public void onResult(final List<RawBsonDocument> result, final Throwable t) {
105103
if (t == null) {
106104
callback.onResult(result, null);
107-
} else if (t instanceof MongoNotPrimaryException
108-
|| t instanceof MongoCursorNotFoundException
109-
|| t instanceof MongoSocketException) {
105+
} else if (isRetryableError(t)) {
110106
wrapped.close();
111107
retryOperation(asyncBlock, callback);
112108
} else {

0 commit comments

Comments
 (0)