Skip to content

Commit 3358d75

Browse files
committed
Added MongoClient.watch and MongoDatabase.watch
JAVA-2873
1 parent dc19aad commit 3358d75

File tree

53 files changed

+2897
-301
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2897
-301
lines changed

driver-async/src/main/com/mongodb/async/client/ChangeStreamIterable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mongodb.client.model.changestream.FullDocument;
2222
import com.mongodb.lang.Nullable;
2323
import org.bson.BsonDocument;
24+
import org.bson.BsonTimestamp;
2425

2526
import java.util.concurrent.TimeUnit;
2627

@@ -89,4 +90,17 @@ public interface ChangeStreamIterable<TResult> extends MongoIterable<ChangeStrea
8990
*/
9091
<TDocument> MongoIterable<TDocument> withDocumentClass(Class<TDocument> clazz);
9192

93+
/**
94+
* The change stream will only provides changes that occurred after the specified timestamp.
95+
*
96+
* <p>Any command run against the server will return an operation time that can be used here.</p>
97+
* <p>The default value is an operation time obtained from the server before the change stream was created.</p>
98+
*
99+
* @param startAtOperationTime the start at operation time
100+
* @since 3.8
101+
* @return this
102+
* @mongodb.server.release 4.0
103+
* @mongodb.driver.manual reference/method/db.runCommand/
104+
*/
105+
ChangeStreamIterable<TResult> startAtOperationTime(BsonTimestamp startAtOperationTime);
92106
}

driver-async/src/main/com/mongodb/async/client/ChangeStreamIterableImpl.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import com.mongodb.async.AsyncBatchCursor;
2323
import com.mongodb.client.model.Collation;
2424
import com.mongodb.client.model.changestream.ChangeStreamDocument;
25+
import com.mongodb.client.model.changestream.ChangeStreamLevel;
2526
import com.mongodb.client.model.changestream.FullDocument;
2627
import com.mongodb.lang.Nullable;
2728
import com.mongodb.operation.AsyncReadOperation;
2829
import com.mongodb.operation.ChangeStreamOperation;
2930
import org.bson.BsonDocument;
31+
import org.bson.BsonTimestamp;
3032
import org.bson.codecs.Codec;
3133
import org.bson.codecs.configuration.CodecRegistry;
3234
import org.bson.conversions.Bson;
@@ -44,21 +46,33 @@ final class ChangeStreamIterableImpl<TResult> extends MongoIterableImpl<ChangeSt
4446
private final CodecRegistry codecRegistry;
4547
private final List<? extends Bson> pipeline;
4648
private final Codec<ChangeStreamDocument<TResult>> codec;
49+
private final ChangeStreamLevel changeStreamLevel;
4750

4851
private FullDocument fullDocument = FullDocument.DEFAULT;
4952
private BsonDocument resumeToken;
5053
private long maxAwaitTimeMS;
5154
private Collation collation;
55+
private BsonTimestamp startAtOperationTime;
5256

5357

58+
ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final String databaseName, final CodecRegistry codecRegistry,
59+
final ReadPreference readPreference, final ReadConcern readConcern, final OperationExecutor executor,
60+
final List<? extends Bson> pipeline, final Class<TResult> resultClass,
61+
final ChangeStreamLevel changeStreamLevel) {
62+
this(clientSession, new MongoNamespace(databaseName, "ignored"), codecRegistry, readPreference, readConcern, executor, pipeline,
63+
resultClass, changeStreamLevel);
64+
}
65+
5466
ChangeStreamIterableImpl(@Nullable final ClientSession clientSession, final MongoNamespace namespace, final CodecRegistry codecRegistry,
5567
final ReadPreference readPreference, final ReadConcern readConcern, final OperationExecutor executor,
56-
final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
68+
final List<? extends Bson> pipeline, final Class<TResult> resultClass,
69+
final ChangeStreamLevel changeStreamLevel) {
5770
super(clientSession, executor, readConcern, readPreference);
5871
this.namespace = notNull("namespace", namespace);
5972
this.codecRegistry = notNull("codecRegistry", codecRegistry);
6073
this.pipeline = notNull("pipeline", pipeline);
6174
this.codec = ChangeStreamDocument.createCodec(notNull("resultClass", resultClass), codecRegistry);
75+
this.changeStreamLevel = notNull("changeStreamLevel", changeStreamLevel);
6276
}
6377

6478
@Override
@@ -104,24 +118,24 @@ AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation() {
104118
};
105119
}
106120

121+
@Override
122+
public ChangeStreamIterable<TResult> startAtOperationTime(final BsonTimestamp startAtOperationTime) {
123+
this.startAtOperationTime = notNull("startAtOperationTime", startAtOperationTime);
124+
return this;
125+
}
126+
107127
@Override
108128
AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<TResult>>> asAsyncReadOperation() {
109129
return createChangeStreamOperation(codec);
110130
}
111131

112132
private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(final Codec<S> codec) {
113-
List<BsonDocument> aggregateList = createBsonDocumentList(pipeline);
114-
115-
ChangeStreamOperation<S> changeStreamOperation = new ChangeStreamOperation<S>(namespace, fullDocument, aggregateList, codec)
116-
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
133+
return new ChangeStreamOperation<S>(namespace, fullDocument, createBsonDocumentList(pipeline), codec, changeStreamLevel)
117134
.batchSize(getBatchSize())
118-
.collation(collation);
119-
120-
if (resumeToken != null) {
121-
changeStreamOperation.resumeAfter(resumeToken);
122-
}
123-
124-
return changeStreamOperation;
135+
.collation(collation)
136+
.maxAwaitTime(maxAwaitTimeMS, MILLISECONDS)
137+
.resumeAfter(resumeToken)
138+
.startAtOperationTime(startAtOperationTime);
125139
}
126140

127141
private List<BsonDocument> createBsonDocumentList(final List<? extends Bson> pipeline) {

driver-async/src/main/com/mongodb/async/client/ClientSessionBinding.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.mongodb.connection.ServerDescription;
2626
import com.mongodb.internal.session.ClientSessionContext;
2727
import com.mongodb.session.SessionContext;
28+
import org.bson.BsonTimestamp;
2829

2930
import static com.mongodb.assertions.Assertions.notNull;
3031

@@ -65,6 +66,11 @@ public SessionContext getSessionContext() {
6566
return sessionContext;
6667
}
6768

69+
@Override
70+
public BsonTimestamp getClusterTime() {
71+
return wrapped.getClusterTime();
72+
}
73+
6874
@Override
6975
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
7076
wrapped.getReadConnectionSource(new SingleResultCallback<AsyncConnectionSource>() {

driver-async/src/main/com/mongodb/async/client/MongoClient.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import com.mongodb.annotations.Immutable;
2121
import com.mongodb.async.SingleResultCallback;
2222
import org.bson.Document;
23+
import org.bson.conversions.Bson;
2324

2425
import java.io.Closeable;
26+
import java.util.List;
2527

2628
/**
2729
* A client-side representation of a MongoDB cluster. Instances can represent either a standalone MongoDB instance, a replica set,
@@ -146,4 +148,100 @@ public interface MongoClient extends Closeable {
146148
*/
147149
<TResult> ListDatabasesIterable<TResult> listDatabases(ClientSession clientSession, Class<TResult> resultClass);
148150

151+
/**
152+
* Creates a change stream for this client.
153+
*
154+
* @return the change stream iterable
155+
* @mongodb.driver.dochub core/changestreams Change Streams
156+
* @since 3.8
157+
* @mongodb.server.release 4.0
158+
*/
159+
ChangeStreamIterable<Document> watch();
160+
161+
/**
162+
* Creates a change stream for this client.
163+
*
164+
* @param resultClass the class to decode each document into
165+
* @param <TResult> the target document type of the iterable.
166+
* @return the change stream iterable
167+
* @mongodb.driver.dochub core/changestreams Change Streams
168+
* @since 3.8
169+
* @mongodb.server.release 4.0
170+
*/
171+
<TResult> ChangeStreamIterable<TResult> watch(Class<TResult> resultClass);
172+
173+
/**
174+
* Creates a change stream for this client.
175+
*
176+
* @param pipeline the aggregation pipeline to apply to the change stream.
177+
* @return the change stream iterable
178+
* @mongodb.driver.dochub core/changestreams Change Streams
179+
* @since 3.8
180+
* @mongodb.server.release 4.0
181+
*/
182+
ChangeStreamIterable<Document> watch(List<? extends Bson> pipeline);
183+
184+
/**
185+
* Creates a change stream for this client.
186+
*
187+
* @param pipeline the aggregation pipeline to apply to the change stream
188+
* @param resultClass the class to decode each document into
189+
* @param <TResult> the target document type of the iterable.
190+
* @return the change stream iterable
191+
* @mongodb.driver.dochub core/changestreams Change Streams
192+
* @since 3.8
193+
* @mongodb.server.release 4.0
194+
*/
195+
<TResult> ChangeStreamIterable<TResult> watch(List<? extends Bson> pipeline, Class<TResult> resultClass);
196+
197+
/**
198+
* Creates a change stream for this client.
199+
*
200+
* @param clientSession the client session with which to associate this operation
201+
* @return the change stream iterable
202+
* @since 3.8
203+
* @mongodb.server.release 4.0
204+
* @mongodb.driver.dochub core/changestreams Change Streams
205+
*/
206+
ChangeStreamIterable<Document> watch(ClientSession clientSession);
207+
208+
/**
209+
* Creates a change stream for this client.
210+
*
211+
* @param clientSession the client session with which to associate this operation
212+
* @param resultClass the class to decode each document into
213+
* @param <TResult> the target document type of the iterable.
214+
* @return the change stream iterable
215+
* @since 3.8
216+
* @mongodb.server.release 4.0
217+
* @mongodb.driver.dochub core/changestreams Change Streams
218+
*/
219+
<TResult> ChangeStreamIterable<TResult> watch(ClientSession clientSession, Class<TResult> resultClass);
220+
221+
/**
222+
* Creates a change stream for this client.
223+
*
224+
* @param clientSession the client session with which to associate this operation
225+
* @param pipeline the aggregation pipeline to apply to the change stream.
226+
* @return the change stream iterable
227+
* @since 3.8
228+
* @mongodb.server.release 4.0
229+
* @mongodb.driver.dochub core/changestreams Change Streams
230+
*/
231+
ChangeStreamIterable<Document> watch(ClientSession clientSession, List<? extends Bson> pipeline);
232+
233+
/**
234+
* Creates a change stream for this client.
235+
*
236+
* @param clientSession the client session with which to associate this operation
237+
* @param pipeline the aggregation pipeline to apply to the change stream
238+
* @param resultClass the class to decode each document into
239+
* @param <TResult> the target document type of the iterable.
240+
* @return the change stream iterable
241+
* @since 3.8
242+
* @mongodb.server.release 4.0
243+
* @mongodb.driver.dochub core/changestreams Change Streams
244+
*/
245+
<TResult> ChangeStreamIterable<TResult> watch(ClientSession clientSession, List<? extends Bson> pipeline, Class<TResult> resultClass);
246+
149247
}

driver-async/src/main/com/mongodb/async/client/MongoClientImpl.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@
2121
import com.mongodb.MongoClientException;
2222
import com.mongodb.ReadPreference;
2323
import com.mongodb.async.SingleResultCallback;
24+
import com.mongodb.client.model.changestream.ChangeStreamLevel;
2425
import com.mongodb.connection.Cluster;
2526
import com.mongodb.diagnostics.logging.Logger;
2627
import com.mongodb.diagnostics.logging.Loggers;
2728
import com.mongodb.internal.session.ServerSessionPool;
2829
import com.mongodb.lang.Nullable;
2930
import org.bson.BsonDocument;
3031
import org.bson.Document;
32+
import org.bson.conversions.Bson;
3133

3234
import java.io.Closeable;
3335
import java.io.IOException;
36+
import java.util.Collections;
37+
import java.util.List;
3438

3539
import static com.mongodb.assertions.Assertions.notNull;
3640
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
@@ -157,6 +161,56 @@ public <TResult> ListDatabasesIterable<TResult> listDatabases(final ClientSessio
157161
return createListDatabasesIterable(clientSession, resultClass);
158162
}
159163

164+
165+
@Override
166+
public ChangeStreamIterable<Document> watch() {
167+
return watch(Collections.<Bson>emptyList());
168+
}
169+
170+
@Override
171+
public <TResult> ChangeStreamIterable<TResult> watch(final Class<TResult> resultClass) {
172+
return watch(Collections.<Bson>emptyList(), resultClass);
173+
}
174+
175+
@Override
176+
public ChangeStreamIterable<Document> watch(final List<? extends Bson> pipeline) {
177+
return watch(pipeline, Document.class);
178+
}
179+
180+
@Override
181+
public <TResult> ChangeStreamIterable<TResult> watch(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
182+
return createChangeStreamIterable(null, pipeline, resultClass);
183+
}
184+
185+
@Override
186+
public ChangeStreamIterable<Document> watch(final ClientSession clientSession) {
187+
return watch(clientSession, Collections.<Bson>emptyList(), Document.class);
188+
}
189+
190+
@Override
191+
public <TResult> ChangeStreamIterable<TResult> watch(final ClientSession clientSession, final Class<TResult> resultClass) {
192+
return watch(clientSession, Collections.<Bson>emptyList(), resultClass);
193+
}
194+
195+
@Override
196+
public ChangeStreamIterable<Document> watch(final ClientSession clientSession, final List<? extends Bson> pipeline) {
197+
return watch(clientSession, pipeline, Document.class);
198+
}
199+
200+
@Override
201+
public <TResult> ChangeStreamIterable<TResult> watch(final ClientSession clientSession, final List<? extends Bson> pipeline,
202+
final Class<TResult> resultClass) {
203+
notNull("clientSession", clientSession);
204+
return createChangeStreamIterable(clientSession, pipeline, resultClass);
205+
}
206+
207+
private <TResult> ChangeStreamIterable<TResult> createChangeStreamIterable(@Nullable final ClientSession clientSession,
208+
final List<? extends Bson> pipeline,
209+
final Class<TResult> resultClass) {
210+
return new ChangeStreamIterableImpl<TResult>(clientSession, "admin", settings.getCodecRegistry(),
211+
settings.getReadPreference(), settings.getReadConcern(), executor, pipeline, resultClass, ChangeStreamLevel.CLIENT);
212+
}
213+
160214
private <T> ListDatabasesIterable<T> createListDatabasesIterable(@Nullable final ClientSession clientSession, final Class<T> clazz) {
161215
return new ListDatabasesIterableImpl<T>(clientSession, clazz, settings.getCodecRegistry(),
162216
ReadPreference.primary(), executor);

driver-async/src/main/com/mongodb/async/client/MongoCollectionImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.mongodb.client.model.ReplaceOptions;
4646
import com.mongodb.client.model.UpdateOptions;
4747
import com.mongodb.client.model.WriteModel;
48+
import com.mongodb.client.model.changestream.ChangeStreamLevel;
4849
import com.mongodb.client.result.DeleteResult;
4950
import com.mongodb.client.result.UpdateResult;
5051
import com.mongodb.internal.operation.AsyncOperations;
@@ -343,7 +344,7 @@ private <TResult> ChangeStreamIterable<TResult> createChangeStreamIterable(@Null
343344
final List<? extends Bson> pipeline,
344345
final Class<TResult> resultClass) {
345346
return new ChangeStreamIterableImpl<TResult>(clientSession, namespace, codecRegistry, readPreference, readConcern, executor,
346-
pipeline, resultClass);
347+
pipeline, resultClass, ChangeStreamLevel.COLLECTION);
347348
}
348349

349350
@Override

0 commit comments

Comments
 (0)