Skip to content

Commit 5757e73

Browse files
committed
Use white list for change stream resumability
JAVA-3639 update
1 parent 73e6d55 commit 5757e73

File tree

39 files changed

+3629
-368
lines changed

39 files changed

+3629
-368
lines changed

driver-core/src/main/com/mongodb/MongoBulkWriteException.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,10 @@ public MongoBulkWriteException(final BulkWriteResult writeResult, final List<Bul
7777
this.writeConcernError = writeConcernError;
7878
this.serverAddress = serverAddress;
7979

80-
for (final String errorLabel : errorLabels) {
81-
addLabel(errorLabel);
82-
}
80+
addLabels(errorLabels);
8381

8482
if (writeConcernError != null) {
85-
for (final String errorLabel : writeConcernError.getErrorLabels()) {
86-
addLabel(errorLabel);
87-
}
83+
addLabels(writeConcernError.getErrorLabels());
8884
}
8985
}
9086

driver-core/src/main/com/mongodb/MongoCommandException.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.bson.BsonDocument;
2121
import org.bson.BsonInt32;
2222
import org.bson.BsonString;
23-
import org.bson.BsonValue;
2423
import org.bson.codecs.BsonDocumentCodec;
2524
import org.bson.codecs.EncoderContext;
2625
import org.bson.json.JsonWriter;
@@ -50,9 +49,7 @@ public MongoCommandException(final BsonDocument response, final ServerAddress ad
5049
format("Command failed with error %s: '%s' on server %s. The full response is %s", extractErrorCodeAndName(response),
5150
extractErrorMessage(response), address, getResponseAsJson(response)), address);
5251
this.response = response;
53-
for (BsonValue curErrorLabel : response.getArray("errorLabels", new BsonArray())) {
54-
addLabel(curErrorLabel.asString().getValue());
55-
}
52+
addLabels(response.getArray("errorLabels", new BsonArray()));
5653
}
5754

5855
/**

driver-core/src/main/com/mongodb/MongoException.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
package com.mongodb;
1818

1919
import com.mongodb.lang.Nullable;
20+
import org.bson.BsonArray;
21+
import org.bson.BsonDocument;
22+
import org.bson.BsonValue;
2023

24+
import java.util.Collection;
2125
import java.util.Collections;
2226
import java.util.HashSet;
2327
import java.util.Set;
@@ -115,12 +119,22 @@ public MongoException(final int code, final String msg, final Throwable t) {
115119
super(msg, t);
116120
this.code = code;
117121
if (t instanceof MongoException) {
118-
for (final String errorLabel : ((MongoException) t).getErrorLabels()) {
119-
addLabel(errorLabel);
120-
}
122+
addLabels(((MongoException) t).getErrorLabels());
121123
}
122124
}
123125

126+
/**
127+
* @param code the error code
128+
* @param msg the message
129+
* @param response the response
130+
* @since 4.1
131+
*/
132+
public MongoException(final int code, final String msg, final BsonDocument response) {
133+
super(msg);
134+
this.code = code;
135+
addLabels(response.getArray("errorLabels", new BsonArray()));
136+
}
137+
124138
/**
125139
* Gets the exception code
126140
*
@@ -176,4 +190,15 @@ public boolean hasErrorLabel(final String errorLabel) {
176190
return errorLabels.contains(errorLabel);
177191
}
178192

193+
protected void addLabels(final BsonArray labels) {
194+
for (final BsonValue errorLabel : labels) {
195+
addLabel(errorLabel.asString().getValue());
196+
}
197+
}
198+
199+
protected void addLabels(final Collection<String> labels) {
200+
for (final String errorLabel : labels) {
201+
addLabel(errorLabel);
202+
}
203+
}
179204
}

driver-core/src/main/com/mongodb/MongoExecutionTimeoutException.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb;
1818

19+
import org.bson.BsonDocument;
20+
1921
/**
2022
* Exception indicating that the execution of the current operation timed out as a result of the maximum operation time being exceeded.
2123
*
@@ -34,4 +36,17 @@ public MongoExecutionTimeoutException(final int code, final String message) {
3436
super(code, message);
3537

3638
}
39+
40+
/**
41+
* Construct a new instance.
42+
*
43+
* @param code the error code
44+
* @param message the error message
45+
* @param response the response
46+
* @since 4.1
47+
*/
48+
public MongoExecutionTimeoutException(final int code, final String message, final BsonDocument response) {
49+
super(code, message, response);
50+
51+
}
3752
}

driver-core/src/main/com/mongodb/MongoQueryException.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ public MongoQueryException(final ServerAddress address, final int errorCode, fin
4848
*/
4949
public MongoQueryException(final MongoCommandException commandException) {
5050
this(commandException.getServerAddress(), commandException.getErrorCode(), commandException.getErrorMessage());
51-
for (String label : commandException.getErrorLabels()) {
52-
addLabel(label);
53-
}
51+
addLabels(commandException.getErrorLabels());
5452
}
5553

5654
/**

driver-core/src/main/com/mongodb/internal/async/AsyncAggregateResponseBatchCursor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,11 @@ public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T
4646
* @return true if the first batch was empty
4747
*/
4848
boolean isFirstBatchEmpty();
49+
50+
/**
51+
* Returns the max wire version.
52+
*
53+
* @return the max wire version
54+
*/
55+
int getMaxWireVersion();
4956
}

driver-core/src/main/com/mongodb/internal/connection/ProtocolHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public static MongoException createSpecialException(final BsonDocument response,
236236
int errorCode = getErrorCode(response);
237237
String errorMessage = getErrorMessage(response, errorMessageFieldName);
238238
if (ErrorCategory.fromErrorCode(errorCode) == ErrorCategory.EXECUTION_TIMEOUT) {
239-
return new MongoExecutionTimeoutException(errorCode, errorMessage);
239+
return new MongoExecutionTimeoutException(errorCode, errorMessage, response);
240240
} else if (errorMessage.contains("not master or secondary") || errorMessage.contains("node is recovering")
241241
|| RECOVERING_CODES.contains(errorCode)) {
242242
return new MongoNodeIsRecoveringException(response, serverAddress);

driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,11 @@ public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
4949
* @return true if the first batch was empty
5050
*/
5151
boolean isFirstBatchEmpty();
52+
53+
/**
54+
* Returns the max wire version.
55+
*
56+
* @return the max wire version
57+
*/
58+
int getMaxWireVersion();
5259
}

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
4141
private final AsyncReadBinding binding;
4242
private final ChangeStreamOperation<T> changeStreamOperation;
43+
private final int maxWireVersion;
4344

4445
private volatile BsonDocument resumeToken;
4546
private volatile AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped;
@@ -53,12 +54,14 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
5354
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
5455
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
5556
final AsyncReadBinding binding,
56-
final BsonDocument resumeToken) {
57+
final BsonDocument resumeToken,
58+
final int maxWireVersion) {
5759
this.changeStreamOperation = changeStreamOperation;
5860
this.wrapped = wrapped;
5961
this.binding = binding;
6062
binding.retain();
6163
this.resumeToken = resumeToken;
64+
this.maxWireVersion = maxWireVersion;
6265
}
6366

6467
AsyncAggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
@@ -141,6 +144,11 @@ public boolean isFirstBatchEmpty() {
141144
return wrapped.isFirstBatchEmpty();
142145
}
143146

147+
@Override
148+
public int getMaxWireVersion() {
149+
return maxWireVersion;
150+
}
151+
144152
private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
145153
if (queryBatchCursor.getPostBatchResumeToken() != null) {
146154
resumeToken = queryBatchCursor.getPostBatchResumeToken();
@@ -209,7 +217,7 @@ public void onResult(final List<RawBsonDocument> result, final Throwable t) {
209217
if (t == null) {
210218
endOperationInProgress();
211219
callback.onResult(result, null);
212-
} else if (isRetryableError(t)) {
220+
} else if (isRetryableError(t, maxWireVersion)) {
213221
wrapped.close();
214222
retryOperation(asyncBlock, callback, tryNext);
215223
} else {

driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
7171
private volatile BsonDocument postBatchResumeToken;
7272
private volatile BsonTimestamp operationTime;
7373
private volatile boolean firstBatchEmpty;
74+
private volatile int maxWireVersion = 0;
7475

7576
/* protected by `this` */
7677
private boolean isOperationInProgress = false;
@@ -108,6 +109,9 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
108109
killCursor(connection);
109110
}
110111
}
112+
if (connection != null) {
113+
this.maxWireVersion = connection.getDescription().getMaxWireVersion();
114+
}
111115
}
112116

113117
@Override
@@ -177,6 +181,11 @@ public boolean isFirstBatchEmpty() {
177181
return firstBatchEmpty;
178182
}
179183

184+
@Override
185+
public int getMaxWireVersion() {
186+
return maxWireVersion;
187+
}
188+
180189
private void next(final SingleResultCallback<List<T>> callback, final boolean tryNext) {
181190
if (isClosed()) {
182191
callback.onResult(null, new MongoException(format("%s called after the cursor was closed.",

0 commit comments

Comments
 (0)