Skip to content

Commit f0ad2a2

Browse files
committed
ChangeStream spec Resumable Error definition is too broad
JAVA-3293
1 parent d8cea99 commit f0ad2a2

File tree

4 files changed

+70
-7
lines changed

4 files changed

+70
-7
lines changed

driver-async/src/test/functional/com/mongodb/async/client/ChangeStreamsTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.net.URISyntaxException;
4848
import java.util.ArrayList;
4949
import java.util.Collection;
50+
import java.util.Collections;
5051
import java.util.LinkedList;
5152
import java.util.List;
5253
import java.util.Queue;
@@ -59,6 +60,7 @@
5960
import static java.util.Collections.singletonList;
6061
import static org.junit.Assert.assertEquals;
6162
import static org.junit.Assert.assertNull;
63+
import static org.junit.Assert.assertTrue;
6264
import static org.junit.Assume.assumeFalse;
6365
import static org.junit.Assume.assumeNotNull;
6466

@@ -143,8 +145,20 @@ public void shouldPassAllOutcomes() {
143145
private void checkStreamValues(final BsonDocument result, final AsyncBatchCursor<ChangeStreamDocument<BsonDocument>> cursor){
144146

145147
BsonArray expectedResults = result.getArray("success", new BsonArray());
148+
Queue<ChangeStreamDocument<BsonDocument>> results = null;
146149

147-
Queue<ChangeStreamDocument<BsonDocument>> results = getResults(cursor);
150+
try {
151+
results = getResults(cursor);
152+
} catch (MongoException e) {
153+
if (result.containsKey("error")) {
154+
final BsonDocument error = result.getDocument("error");
155+
assertTrue(e.getCode() == error.getInt32("code").intValue()
156+
|| !Collections.disjoint(e.getErrorLabels(), error.getArray("errorLabels")));
157+
return;
158+
} else {
159+
throw e;
160+
}
161+
}
148162
for (BsonValue expectedResult : expectedResults) {
149163
BsonDocument expected = expectedResult.asDocument();
150164

@@ -195,7 +209,7 @@ private AsyncBatchCursor<ChangeStreamDocument<BsonDocument>> createCursor(final
195209
}
196210

197211
private void checkExpectations() {
198-
if (definition.getArray("expectations").size() > 0) {
212+
if (definition.containsKey("expectations") && definition.getArray("expectations").size() > 0) {
199213

200214
String database = definition.getString("target").getValue().equals("client") ? "admin" : namespace.getDatabaseName();
201215
List<CommandEvent> expectedEvents = getExpectedEvents(definition.getArray("expectations"), database, new BsonDocument());

driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursorHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import com.mongodb.MongoNotPrimaryException;
2323
import com.mongodb.MongoSocketException;
2424

25+
import java.util.Collections;
2526
import java.util.List;
2627

2728
import static java.util.Arrays.asList;
2829

2930
final class ChangeStreamBatchCursorHelper {
3031
private static final List<Integer> UNRETRYABLE_SERVER_ERROR_CODES = asList(136, 237, 280, 11601);
32+
private static final List<String> NONRESUMABLE_CHANGE_STREAM_ERROR_LABELS = asList("NonResumableChangeStreamError");
3133

3234
static boolean isRetryableError(final Throwable t) {
3335
if (!(t instanceof MongoException) || t instanceof MongoChangeStreamException) {
@@ -36,7 +38,8 @@ static boolean isRetryableError(final Throwable t) {
3638
|| t instanceof MongoSocketException) {
3739
return true;
3840
} else {
39-
return !UNRETRYABLE_SERVER_ERROR_CODES.contains(((MongoException) t).getCode());
41+
return !UNRETRYABLE_SERVER_ERROR_CODES.contains(((MongoException) t).getCode())
42+
&& Collections.disjoint(NONRESUMABLE_CHANGE_STREAM_ERROR_LABELS, ((MongoException) t).getErrorLabels());
4043
}
4144
}
4245

driver-core/src/test/resources/change-streams/change-streams-errors.json

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@
5454
"cursor": {},
5555
"pipeline": [
5656
{
57-
"$changeStream": {
58-
"fullDocument": "default"
59-
}
57+
"$changeStream": {}
6058
},
6159
{
6260
"$unsupported": "foo"
@@ -73,6 +71,43 @@
7371
"code": 40324
7472
}
7573
}
74+
},
75+
{
76+
"description": "Change Stream should error when _id is projected out",
77+
"minServerVersion": "4.1.11",
78+
"target": "collection",
79+
"topology": [
80+
"replicaset",
81+
"sharded"
82+
],
83+
"changeStreamPipeline": [
84+
{
85+
"$project": {
86+
"_id": 0
87+
}
88+
}
89+
],
90+
"changeStreamOptions": {},
91+
"operations": [
92+
{
93+
"database": "change-stream-tests",
94+
"collection": "test",
95+
"name": "insertOne",
96+
"arguments": {
97+
"document": {
98+
"z": 3
99+
}
100+
}
101+
}
102+
],
103+
"result": {
104+
"error": {
105+
"code": 280,
106+
"errorLabels": [
107+
"NonResumableChangeStreamError"
108+
]
109+
}
110+
}
76111
}
77112
]
78113
}

driver-sync/src/test/functional/com/mongodb/client/ChangeStreamsTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.net.URISyntaxException;
4545
import java.util.ArrayList;
4646
import java.util.Collection;
47+
import java.util.Collections;
4748
import java.util.List;
4849

4950
import static com.mongodb.JsonTestServerVersionChecker.skipTest;
@@ -52,6 +53,7 @@
5253
import static java.lang.String.format;
5354
import static java.util.Collections.singletonList;
5455
import static org.junit.Assert.assertEquals;
56+
import static org.junit.Assert.assertTrue;
5557
import static org.junit.Assume.assumeFalse;
5658

5759
// See https://github.com/mongodb/specifications/tree/master/source/change-streams/tests
@@ -154,6 +156,15 @@ private void checkStreamValues(final BsonDocument result, final MongoCursor<Chan
154156

155157
assertEquals(expected.get("fullDocument"), actual.getFullDocument());
156158
}
159+
if (result.containsKey("error")) {
160+
BsonDocument error = result.getDocument("error");
161+
try {
162+
cursor.next();
163+
} catch (MongoException e) {
164+
assertTrue(e.getCode() == error.getInt32("code").intValue()
165+
|| !Collections.disjoint(e.getErrorLabels(), error.getArray("errorLabels")));
166+
}
167+
}
157168
}
158169

159170
@Nullable
@@ -169,7 +180,7 @@ private MongoCursor<ChangeStreamDocument<BsonDocument>> createCursor(final BsonD
169180
}
170181

171182
private void checkExpectations() {
172-
if (definition.getArray("expectations").size() > 0) {
183+
if (definition.containsKey("expectations") && definition.getArray("expectations").size() > 0) {
173184

174185
String database = definition.getString("target").getValue().equals("client") ? "admin" : namespace.getDatabaseName();
175186
List<CommandEvent> expectedEvents = getExpectedEvents(definition.getArray("expectations"), database, new BsonDocument());

0 commit comments

Comments
 (0)