Skip to content

Commit eadbc5c

Browse files
committed
Support postBatchResumeToken in change streams
JAVA-3200 Fix result assertion in change streams spec test JAVA-3327 Amend change stream missing resume token tests for wire version 8+ JAVA-3271 Clarify resume token used in resuming and getResumeToken JAVA-3297
1 parent dd835f5 commit eadbc5c

32 files changed

+1576
-224
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client;
18+
19+
import com.mongodb.MongoChangeStreamException;
20+
import com.mongodb.MongoCommandException;
21+
import com.mongodb.MongoException;
22+
import com.mongodb.MongoNamespace;
23+
import com.mongodb.MongoQueryException;
24+
import com.mongodb.async.AsyncBatchCursor;
25+
import com.mongodb.async.FutureResultCallback;
26+
import com.mongodb.client.model.Aggregates;
27+
import com.mongodb.client.model.changestream.ChangeStreamDocument;
28+
import com.mongodb.client.test.CollectionHelper;
29+
import org.bson.BsonArray;
30+
import org.bson.BsonDocument;
31+
import org.bson.BsonInt32;
32+
import org.bson.BsonString;
33+
import org.bson.Document;
34+
import org.bson.codecs.DocumentCodec;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
38+
import java.util.List;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
42+
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
43+
import static com.mongodb.async.client.Fixture.getDefaultDatabaseName;
44+
import static java.util.Arrays.asList;
45+
import static java.util.Collections.singletonList;
46+
import static org.junit.Assert.assertEquals;
47+
import static org.junit.Assert.assertNotNull;
48+
import static org.junit.Assert.assertTrue;
49+
import static org.junit.Assume.assumeTrue;
50+
51+
// See https://github.com/mongodb/specifications/tree/master/source/change-streams/tests/README.rst#prose-tests
52+
public class ChangeStreamProseTest extends DatabaseTestCase {
53+
private BsonDocument failPointDocument;
54+
private CollectionHelper<Document> collectionHelper;
55+
56+
@Before
57+
@Override
58+
public void setUp() {
59+
assumeTrue(canRunTests());
60+
super.setUp();
61+
62+
collectionHelper = new CollectionHelper<Document>(new DocumentCodec(), new MongoNamespace(getDefaultDatabaseName(), "test"));
63+
64+
// create the collection before starting tests
65+
FutureResultCallback<Void> callback = new FutureResultCallback<Void>();
66+
collection.insertOne(Document.parse("{ _id : 0 }"), callback);
67+
futureResult(callback);
68+
}
69+
70+
//
71+
// Test that the ChangeStream will throw an exception if the server response is missing the resume token (if wire version is < 8).
72+
//
73+
@Test
74+
public void testMissingResumeTokenThrowsException() {
75+
boolean exceptionFound = false;
76+
AsyncBatchCursor<ChangeStreamDocument<Document>> cursor =
77+
createChangeStreamCursor(collection.watch(singletonList(Aggregates.project(Document.parse("{ _id : 0 }")))));
78+
79+
try {
80+
insertOneDocument();
81+
getNextBatch(cursor);
82+
} catch (MongoChangeStreamException e) {
83+
exceptionFound = true;
84+
} catch (MongoQueryException e) {
85+
if (serverVersionAtLeast(4, 1)) {
86+
exceptionFound = true;
87+
}
88+
} finally {
89+
cursor.close();
90+
}
91+
assertTrue(exceptionFound);
92+
}
93+
94+
//
95+
// Test that the ChangeStream will automatically resume one time on a resumable error (including not master)
96+
// with the initial pipeline and options, except for the addition/update of a resumeToken.
97+
//
98+
@Test
99+
public void testResumeOneTimeOnError() {
100+
assumeTrue(serverVersionAtLeast(4, 0));
101+
AsyncBatchCursor<ChangeStreamDocument<Document>> cursor = createChangeStreamCursor();
102+
103+
insertOneDocument();
104+
setFailPoint("getMore", 10107);
105+
try {
106+
assertNotNull(getNextBatch(cursor));
107+
} finally {
108+
disableFailPoint();
109+
cursor.close();
110+
}
111+
}
112+
113+
//
114+
// Test that ChangeStream will not attempt to resume on any error encountered while executing an aggregate command.
115+
//
116+
@Test
117+
public void testNoResumeForAggregateErrors() {
118+
boolean exceptionFound = false;
119+
AsyncBatchCursor<ChangeStreamDocument<Document>> cursor = null;
120+
121+
try {
122+
cursor = createChangeStreamCursor(collection.watch(singletonList(Document.parse("{ $unsupportedStage: { _id : 0 } }"))));
123+
} catch (MongoCommandException e) {
124+
exceptionFound = true;
125+
} finally {
126+
if (cursor != null) {
127+
cursor.close();
128+
}
129+
}
130+
assertTrue(exceptionFound);
131+
}
132+
133+
//
134+
// ChangeStream will not attempt to resume after encountering error code 11601 (Interrupted), 136 (CappedPositionLost),
135+
// or 237 (CursorKilled) while executing a getMore command.
136+
//
137+
@Test
138+
public void testNoResumeErrors() {
139+
assumeTrue(serverVersionAtLeast(4, 0));
140+
AsyncBatchCursor<ChangeStreamDocument<Document>> cursor = createChangeStreamCursor();
141+
insertOneDocument();
142+
143+
for (int errCode : asList(136, 237, 11601)) {
144+
try {
145+
setFailPoint("getMore", errCode);
146+
getNextBatch(cursor);
147+
} catch (MongoException e) {
148+
assertEquals(errCode, e.getCode());
149+
} finally {
150+
disableFailPoint();
151+
}
152+
}
153+
cursor.close();
154+
}
155+
156+
private void insertOneDocument() {
157+
FutureResultCallback<Void> callback = new FutureResultCallback<Void>();
158+
collection.insertOne(Document.parse("{ x: 1 }"), callback);
159+
futureResult(callback);
160+
}
161+
162+
private AsyncBatchCursor<ChangeStreamDocument<Document>> createChangeStreamCursor() {
163+
return createChangeStreamCursor(collection.watch());
164+
}
165+
166+
private AsyncBatchCursor<ChangeStreamDocument<Document>> createChangeStreamCursor(
167+
final ChangeStreamIterable<Document> changeStreamIterable) {
168+
FutureResultCallback<AsyncBatchCursor<ChangeStreamDocument<Document>>> callback =
169+
new FutureResultCallback<AsyncBatchCursor<ChangeStreamDocument<Document>>>();
170+
171+
changeStreamIterable.batchCursor(callback);
172+
return futureResult(callback);
173+
}
174+
175+
private List<ChangeStreamDocument<Document>> getNextBatch(final AsyncBatchCursor<ChangeStreamDocument<Document>> cursor) {
176+
FutureResultCallback<List<ChangeStreamDocument<Document>>> callback =
177+
new FutureResultCallback<List<ChangeStreamDocument<Document>>>();
178+
cursor.next(callback);
179+
return futureResult(callback);
180+
}
181+
182+
private void setFailPoint(final String command, final int errCode) {
183+
failPointDocument = new BsonDocument("configureFailPoint", new BsonString("failCommand"))
184+
.append("mode", new BsonDocument("times", new BsonInt32(1)))
185+
.append("data", new BsonDocument("failCommands", new BsonArray(asList(new BsonString(command))))
186+
.append("errorCode", new BsonInt32(errCode)));
187+
collectionHelper.runAdminCommand(failPointDocument);
188+
}
189+
190+
private void disableFailPoint() {
191+
collectionHelper.runAdminCommand(failPointDocument.append("mode", new BsonString("off")));
192+
}
193+
194+
private boolean canRunTests() {
195+
return isDiscoverableReplicaSet() && serverVersionAtLeast(3, 6);
196+
}
197+
198+
private <T> T futureResult(final FutureResultCallback<T> callback) {
199+
try {
200+
return callback.get(30, TimeUnit.SECONDS);
201+
} catch (Throwable t) {
202+
throw MongoException.fromThrowable(t);
203+
}
204+
}
205+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async;
18+
19+
import org.bson.BsonDocument;
20+
import org.bson.BsonTimestamp;
21+
22+
/**
23+
* Extends the async batch cursor interface to include information included in an aggregate or getMore response.
24+
*
25+
* @param <T> The type of documents the cursor contains
26+
* @mongodb.driver.manual ../meta-driver/latest/legacy/mongodb-wire-protocol/#wire-op-get-more OP_GET_MORE
27+
* @since 3.11
28+
*/
29+
@Deprecated
30+
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
31+
/**
32+
* Returns the postBatchResumeToken.
33+
*
34+
* @return the postBatchResumeToken
35+
*/
36+
BsonDocument getPostBatchResumeToken();
37+
38+
/**
39+
* Returns the operation time found in the aggregate or getMore response.
40+
*
41+
* @return the operation time
42+
*/
43+
BsonTimestamp getOperationTime();
44+
45+
/**
46+
* Returns true if the first batch was empty.
47+
*
48+
* @return true if the first batch was empty
49+
*/
50+
boolean isFirstBatchEmpty();
51+
}

driver-core/src/main/com/mongodb/operation/AggregateOperationImpl.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,14 @@ private QueryResult<T> createQueryResult(final BsonDocument result, final Connec
263263
}
264264
}
265265

266-
private CommandReadTransformer<BsonDocument, BatchCursor<T>> transformer() {
267-
return new CommandReadTransformer<BsonDocument, BatchCursor<T>>() {
266+
private CommandReadTransformer<BsonDocument, AggregateResponseBatchCursor<T>> transformer() {
267+
return new CommandReadTransformer<BsonDocument, AggregateResponseBatchCursor<T>>() {
268268
@Override
269-
public BatchCursor<T> apply(final BsonDocument result, final ConnectionSource source, final Connection connection) {
269+
public AggregateResponseBatchCursor<T> apply(final BsonDocument result, final ConnectionSource source,
270+
final Connection connection) {
270271
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
271272
return new QueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, source,
272-
connection);
273+
connection, result);
273274
}
274275
};
275276
}
@@ -281,7 +282,7 @@ public AsyncBatchCursor<T> apply(final BsonDocument result, final AsyncConnectio
281282
final AsyncConnection connection) {
282283
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
283284
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
284-
source, connection);
285+
source, connection, result);
285286
}
286287
};
287288
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.operation;
18+
19+
import com.mongodb.annotations.NotThreadSafe;
20+
import org.bson.BsonDocument;
21+
import org.bson.BsonTimestamp;
22+
23+
/**
24+
* Extends the batch cursor interface to include information included in an aggregate or getMore response.
25+
*
26+
* @param <T> The type of documents the cursor contains
27+
* @mongodb.driver.manual ../meta-driver/latest/legacy/mongodb-wire-protocol/#wire-op-get-more OP_GET_MORE
28+
* @since 3.11
29+
*/
30+
@NotThreadSafe
31+
@Deprecated
32+
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
33+
/**
34+
* Returns the postBatchResumeToken.
35+
*
36+
* @return the postBatchResumeToken
37+
*/
38+
BsonDocument getPostBatchResumeToken();
39+
40+
/**
41+
* Returns the operation time found in the aggregate or getMore response.
42+
*
43+
* @return the operation time
44+
*/
45+
BsonTimestamp getOperationTime();
46+
47+
/**
48+
* Returns true if the first batch was empty.
49+
*
50+
* @return true if the first batch was empty
51+
*/
52+
boolean isFirstBatchEmpty();
53+
}

0 commit comments

Comments
 (0)