Skip to content

Commit 31d4887

Browse files
committed
Change streams now save the startAtOperationTime
JAVA-2889
1 parent af121ae commit 31d4887

24 files changed

+122
-224
lines changed

config/codenarc/codenarc.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@
110110
<rule-config name='ParameterCount'>
111111
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
112112
</rule-config>
113+
<rule-config name='NestedBlockDepth'>
114+
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
115+
</rule-config>
113116
<exclude name='CrapMetric'/>
114117
<exclude name='AbcMetric'/>
115118
<exclude name='MethodSize'/>

driver-async/src/main/com/mongodb/async/client/ChangeStreamIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public interface ChangeStreamIterable<TResult> extends MongoIterable<ChangeStrea
9191
<TDocument> MongoIterable<TDocument> withDocumentClass(Class<TDocument> clazz);
9292

9393
/**
94-
* The change stream will only provides changes that occurred after the specified timestamp.
94+
* The change stream will only provide changes that occurred at or after the specified timestamp.
9595
*
9696
* <p>Any command run against the server will return an operation time that can be used here.</p>
9797
* <p>The default value is an operation time obtained from the server before the change stream was created.</p>

driver-async/src/main/com/mongodb/async/client/ClientSessionBinding.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.mongodb.connection.ServerDescription;
2626
import com.mongodb.internal.session.ClientSessionContext;
2727
import com.mongodb.session.SessionContext;
28-
import org.bson.BsonTimestamp;
2928

3029
import static com.mongodb.assertions.Assertions.notNull;
3130

@@ -66,11 +65,6 @@ public SessionContext getSessionContext() {
6665
return sessionContext;
6766
}
6867

69-
@Override
70-
public BsonTimestamp getClusterTime() {
71-
return wrapped.getClusterTime();
72-
}
73-
7468
@Override
7569
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
7670
wrapped.getReadConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {

driver-async/src/test/functional/com/mongodb/async/client/ChangeStreamsTest.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.mongodb.client.test.CollectionHelper;
2828
import com.mongodb.connection.ServerVersion;
2929
import com.mongodb.event.CommandEvent;
30-
import com.mongodb.event.CommandStartedEvent;
3130
import com.mongodb.internal.connection.TestCommandListener;
3231
import com.mongodb.lang.Nullable;
3332
import org.bson.BsonArray;
@@ -216,21 +215,6 @@ private void checkExpectations() {
216215
for (int i = 0; i < expectedEvents.size(); i++) {
217216
CommandEvent expectedEvent = expectedEvents.get(i);
218217
CommandEvent event = events.get(i);
219-
if (expectedEvent instanceof CommandStartedEvent && event instanceof CommandStartedEvent) {
220-
BsonDocument eventCommand = ((CommandStartedEvent) event).getCommand();
221-
222-
BsonDocument eventChangeStream = eventCommand.getArray("pipeline").get(0).asDocument().getDocument("$changeStream");
223-
224-
BsonDocument expectedChangeStream = ((CommandStartedEvent) expectedEvent).getCommand().getArray("pipeline").get(0)
225-
.asDocument().getDocument("$changeStream");
226-
227-
if (expectedChangeStream.containsKey("startAtOperationTime") && eventChangeStream.containsKey("startAtOperationTime")) {
228-
eventChangeStream.put("startAtOperationTime", expectedChangeStream.get("startAtOperationTime"));
229-
} else {
230-
eventChangeStream.remove("startAtOperationTime");
231-
}
232-
}
233-
234218
CommandMonitoringTestHelper.assertEventsEquality(singletonList(expectedEvent), singletonList(event));
235219
}
236220
}

driver-core/src/main/com/mongodb/binding/AsyncClusterBinding.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.mongodb.selector.ServerSelector;
2929
import com.mongodb.selector.WritableServerSelector;
3030
import com.mongodb.session.SessionContext;
31-
import org.bson.BsonTimestamp;
3231

3332
import static com.mongodb.assertions.Assertions.notNull;
3433

@@ -85,11 +84,6 @@ public SessionContext getSessionContext() {
8584
return new ReadConcernAwareNoOpSessionContext(readConcern);
8685
}
8786

88-
@Override
89-
public BsonTimestamp getClusterTime() {
90-
return cluster.getClusterTime();
91-
}
92-
9387
@Override
9488
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
9589
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), callback);

driver-core/src/main/com/mongodb/binding/AsyncReadBinding.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.mongodb.ReadPreference;
2020
import com.mongodb.async.SingleResultCallback;
2121
import com.mongodb.session.SessionContext;
22-
import org.bson.BsonTimestamp;
2322

2423
/**
2524
* An asynchronous factory of connection sources to servers that can be read from and that satisfy the specified read preference.
@@ -42,14 +41,6 @@ public interface AsyncReadBinding extends ReferenceCounted {
4241
*/
4342
SessionContext getSessionContext();
4443

45-
/**
46-
* Get the last seen cluster time
47-
*
48-
* @since 3.8
49-
* @return the last seen cluster time
50-
*/
51-
BsonTimestamp getClusterTime();
52-
5344
/**
5445
* Returns a connection source to a server that satisfies the specified read preference.
5546
* @param callback the to be passed the connection source

driver-core/src/main/com/mongodb/binding/AsyncSingleConnectionReadBinding.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import com.mongodb.async.SingleResultCallback;
2121
import com.mongodb.connection.AsyncConnection;
2222
import com.mongodb.connection.ServerDescription;
23-
import com.mongodb.session.SessionContext;
2423
import com.mongodb.internal.connection.NoOpSessionContext;
25-
import org.bson.BsonTimestamp;
24+
import com.mongodb.session.SessionContext;
2625

2726
import static com.mongodb.assertions.Assertions.notNull;
2827

@@ -60,11 +59,6 @@ public SessionContext getSessionContext() {
6059
return NoOpSessionContext.INSTANCE;
6160
}
6261

63-
@Override
64-
public BsonTimestamp getClusterTime() {
65-
throw new UnsupportedOperationException();
66-
}
67-
6862
@Override
6963
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
7064
callback.onResult(new AsyncSingleConnectionSource(), null);

driver-core/src/main/com/mongodb/binding/ClusterBinding.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.mongodb.selector.ServerSelector;
2828
import com.mongodb.selector.WritableServerSelector;
2929
import com.mongodb.session.SessionContext;
30-
import org.bson.BsonTimestamp;
3130

3231
import static com.mongodb.assertions.Assertions.notNull;
3332

@@ -87,11 +86,6 @@ public SessionContext getSessionContext() {
8786
return new ReadConcernAwareNoOpSessionContext(readConcern);
8887
}
8988

90-
@Override
91-
public BsonTimestamp getClusterTime() {
92-
return cluster.getClusterTime();
93-
}
94-
9589
@Override
9690
public ConnectionSource getWriteConnectionSource() {
9791
return new ClusterBindingConnectionSource(new WritableServerSelector());

driver-core/src/main/com/mongodb/binding/ReadBinding.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.mongodb.ReadPreference;
2020
import com.mongodb.session.SessionContext;
21-
import org.bson.BsonTimestamp;
2221

2322
/**
2423
* A factory of connection sources to servers that can be read from and that satisfy the specified read preference.
@@ -47,14 +46,6 @@ public interface ReadBinding extends ReferenceCounted {
4746
*/
4847
SessionContext getSessionContext();
4948

50-
/**
51-
* Get the last seen cluster time
52-
*
53-
* @since 3.8
54-
* @return the last seen cluster time
55-
*/
56-
BsonTimestamp getClusterTime();
57-
5849
@Override
5950
ReadBinding retain();
6051
}

driver-core/src/main/com/mongodb/binding/SingleConnectionReadBinding.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import com.mongodb.ReadPreference;
2020
import com.mongodb.connection.Connection;
2121
import com.mongodb.connection.ServerDescription;
22-
import com.mongodb.session.SessionContext;
2322
import com.mongodb.internal.connection.NoOpSessionContext;
24-
import org.bson.BsonTimestamp;
23+
import com.mongodb.session.SessionContext;
2524

2625
import static com.mongodb.assertions.Assertions.notNull;
2726

@@ -65,11 +64,6 @@ public SessionContext getSessionContext() {
6564
return NoOpSessionContext.INSTANCE;
6665
}
6766

68-
@Override
69-
public BsonTimestamp getClusterTime() {
70-
throw new UnsupportedOperationException();
71-
}
72-
7367
@Override
7468
public ReadBinding retain() {
7569
super.retain();

0 commit comments

Comments
 (0)