Skip to content

Commit 111ff10

Browse files
committed
change streams
1 parent d50f6c4 commit 111ff10

File tree

4 files changed

+144
-38
lines changed

4 files changed

+144
-38
lines changed

source/includes/crud/Watch.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
package org.example;
2+
13
import java.util.Arrays;
24
import java.util.List;
35

@@ -17,7 +19,7 @@ public class Watch {
1719
public static void main( String[] args ) {
1820

1921
// Replace the uri string with your MongoDB deployment's connection string
20-
String uri = "<connection string URI>";
22+
String uri = "<connection string uri>";
2123

2224
try (MongoClient mongoClient = MongoClients.create(uri)) {
2325
MongoDatabase database = mongoClient.getDatabase("sample_mflix");

source/includes/usage-examples/code-snippets/WatchCompanion.java renamed to source/includes/crud/WatchCompanion.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
1-
// Performs CRUD operations to generate change events when run with the Watch application
2-
3-
4-
package usage.examples;
5-
6-
import java.util.Arrays;
1+
package org.example;
72

83
import org.bson.Document;
9-
import org.bson.types.ObjectId;
104

115
import com.mongodb.MongoException;
126
import com.mongodb.client.MongoClient;
137
import com.mongodb.client.MongoClients;
148
import com.mongodb.client.MongoCollection;
159
import com.mongodb.client.MongoDatabase;
1610
import com.mongodb.client.result.InsertOneResult;
11+
import com.mongodb.client.model.Updates;
12+
import com.mongodb.client.result.UpdateResult;
13+
import com.mongodb.client.result.DeleteResult;
1714

1815
public class WatchCompanion {
1916
public static void main(String[] args) {
@@ -27,7 +24,7 @@ public static void main(String[] args) {
2724
try {
2825
// Inserts a sample document into the "movies" collection and print its ID
2926
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
30-
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
27+
System.out.println("Inserted document id: " + insertResult.getInsertedId());
3128

3229
// Updates the sample document and prints the number of modified documents
3330
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
@@ -36,8 +33,8 @@ public static void main(String[] args) {
3633
// Deletes the sample document and prints the number of deleted documents
3734
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
3835
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
39-
40-
// Prints a message if any exceptions occur during the operations
36+
37+
// Prints a message if any exceptions occur during the operations
4138
} catch (MongoException me) {
4239
System.err.println("Unable to insert, update, or replace due to an error: " + me);
4340
}

source/logging-monitoring/change-streams.txt

Lines changed: 134 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Open Change Streams
88
.. contents:: On this page
99
:local:
1010
:backlinks: none
11-
:depth: 1
11+
:depth: 2
1212
:class: singlecol
1313

1414
Overview
@@ -39,6 +39,9 @@ Open a Change Stream
3939
You can open a change stream to subscribe to specific types of data changes
4040
and produce change events in your application.
4141

42+
Select a Scope to Watch
43+
~~~~~~~~~~~~~~~~~~~~~~~
44+
4245
To open a change stream, call the ``watch()`` method on an instance of a
4346
``MongoCollection``, ``MongoDatabase``, or ``MongoClient``.
4447

@@ -49,16 +52,53 @@ To open a change stream, call the ``watch()`` method on an instance of a
4952
see the :ref:`<replica-set-oplog>` {+mdb-server+} manual page.
5053

5154
The object on which you call the ``watch()`` method on determines the scope of
52-
events that the change stream listens for.
55+
events that the change stream listens for:
56+
57+
- ``MongoCollection.watch()`` monitors a collection.
58+
- ``MongoDatabase.watch()`` monitors all collections in a database.
59+
- ``MongoClient.watch()`` monitors all changes in the connected MongoDB deployment.
5360

54-
If you call ``watch()`` on a ``MongoCollection``, the change stream monitors
55-
a collection.
61+
Filter the Events
62+
~~~~~~~~~~~~~~~~~
5663

57-
If you call ``watch()`` on a ``MongoDatabase``, the change stream monitors all
58-
collections in that database.
64+
The ``watch()`` method optionally takes an **aggregation pipeline** which
65+
consists of an array of **stages** as the first parameter to filter and
66+
transform the change event output as follows:
67+
68+
.. code-block:: java
5969

60-
If you call ``watch()`` on a ``MongoClient``, the change stream monitors all
61-
changes in the connected MongoDB deployment.
70+
List<Bson> pipeline = Arrays.asList(
71+
Aggregates.match(
72+
Filters.lt("fullDocument.runtime", 15)));
73+
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
74+
75+
Manage the Output
76+
~~~~~~~~~~~~~~~~~
77+
78+
The ``watch()`` method returns an instance of ``ChangeStreamIterable``, a class
79+
that offers several methods to access, organize, and traverse the results.
80+
``ChangeStreamIterable`` also inherits methods from its parent class,
81+
``MongoIterable`` which implements the core Java interface ``Iterable``.
82+
83+
You can call ``forEach()`` on the ``ChangeStreamIterable`` to handle
84+
events as they occur, or you can use the ``iterator()`` method which
85+
returns a ``MongoCursor`` instance that you can use to traverse the results.
86+
87+
You can call methods on the ``MongoCursor`` such as ``hasNext()`` to check
88+
whether additional results exist, ``next()`` to return the next document
89+
in the collection, or ``tryNext()``, to immediately return either
90+
the next available element in the change stream or ``null``. Unlike the
91+
``MongoCursor`` returned by other queries, a ``MongoCursor`` associated
92+
with a change stream waits until a change event arrives before
93+
returning a result from ``next()``. As a result, calls to ``next()``
94+
using a change stream's ``MongoCursor`` never throw a
95+
``java.util.NoSuchElementException``.
96+
97+
To configure options for processing the documents returned from the change
98+
stream, use member methods of the ``ChangeStreamIterable`` object returned
99+
by ``watch()``. See the link to the ``ChangeStreamIterable`` API
100+
documentation at the bottom of this example for more details on the
101+
available methods.
62102

63103
Example
64104
~~~~~~~
@@ -90,36 +130,81 @@ An insert operation on the collection produces the following output:
90130
...
91131
}
92132

93-
Watch Example: Full File
94-
~~~~~~~~~~~~~~~~~~~~~~~~
133+
Watch Example: Full Files
134+
~~~~~~~~~~~~~~~~~~~~~~~~~
95135

96136
.. include:: /includes/crud/example-intro.rst
97137

98-
This file demonstrates how to open a change stream by using the watch method.
99-
The watch method takes a pipeline as an argument to filter for only ``"insert"``
100-
and ``"update"`` events. When an insert or update event occurs on the watched
101-
collection, a log of the even is printed to the screen.
138+
This example demonstrates how to open a change stream by using the watch method.
139+
The ``Watch.java`` file calls the ``watch()`` method with a pipeline as an
140+
argument to filter for only ``"insert"`` and ``"update"`` events. The
141+
``WatchCompanion.java`` file inserts, updates and deletes a document.
142+
143+
To use the following examples, run the files in this order:
144+
145+
#. Run the ``Watch.java`` file.
146+
#. Run the ``WatchCompanion.java`` file.
147+
148+
.. note::
102149

103-
To see output in your terminal:
150+
The ``Watch.java`` file will continue running until the ``WatchCompanion.java`` file
104151

105-
#. Run the file in your editor.
106-
#. Insert or update a document in the ``sample_mflix`` database ``movies`` collection.
152+
``Watch.java``:
107153

108-
.. tip::
154+
.. literalinclude:: /includes/crud/Watch.java
155+
:language: java
109156

110-
You can insert or update documents by using :atlas:`Atlas </documents/>` or :mongosh:`mongosh </crud/>`.
157+
``WatchCompanion.java``:
111158

112-
.. io-code-block::
159+
.. literalinclude:: /includes/crud/WatchCompanion.java
160+
:language: java
113161

114-
.. input:: /includes/crud/Watch.java
115-
:language: java
116-
:dedent:
162+
Full File Example Output
163+
````````````````````````
117164

118-
.. output::
119-
:language: none
120-
:visible: false
165+
The preceding applications will generate the following output:
121166

122-
Received a change to the collection: ChangeStreamDocument{ operationType=insert, resumeToken={...}, namespace=sample_mflix.movies, ... }
167+
``Watch.java`` will capture on the ``insert`` and ``update`` operations are
168+
printed, since the aggregation pipeline filters out the ``delete`` operation:
169+
170+
.. code-block::
171+
:copyable: false
172+
173+
Received a change to the collection: ChangeStreamDocument{
174+
operationType=OperationType{value='insert'},
175+
resumeToken={"_data": "825E..."},
176+
namespace=sample_mflix.movies,
177+
destinationNamespace=null,
178+
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
179+
documentKey={"_id": {"$oid": "5ec3..."}},
180+
clusterTime=Timestamp{...},
181+
updateDescription=null,
182+
txnNumber=null,
183+
lsid=null,
184+
wallTime=BsonDateTime{value=1657...}
185+
}
186+
Received a change to the collection: ChangeStreamDocument{
187+
operationType=OperationType{value='update'},
188+
resumeToken={"_data": "825E..."},
189+
namespace=sample_mflix.movies,
190+
destinationNamespace=null,
191+
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
192+
documentKey={"_id": {"$oid": "5ec3..."}},
193+
clusterTime=Timestamp{...},
194+
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
195+
txnNumber=null,
196+
lsid=null,
197+
wallTime=BsonDateTime{value=1657...}
198+
}
199+
200+
``WatchCompanion`` will print all of the operations it completed:
201+
202+
.. code-block::
203+
:copyable: false
204+
205+
Inserted document id: BsonObjectId{value=5ec3...}
206+
Updated 1 document.
207+
Deleted 1 document.
123208

124209

125210
To learn more about the ``watch()`` method, see the following API
@@ -370,3 +455,25 @@ output:
370455

371456
For a list of options, see the `FullDocument <{+api+}/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/FullDocument.html>`__
372457
API documentation.
458+
459+
Additional Information
460+
----------------------
461+
462+
To learn more about the methods and classes used to manage change streams, see the following API documentation:
463+
464+
API Documentation
465+
~~~~~~~~~~~~~~~~~
466+
467+
- `MongoCollection.watch() <{+api+}/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#watch()>`__
468+
- `MongoDatabase.watch() <{+api+}/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#watch()>`__
469+
- `MongoClient.watch() <{+api+}/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html#watch()>`__
470+
- `ChangeStreamIterable <{+api+}/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html>`__
471+
- `MongoCursor <{+api+}/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCursor.html>`__
472+
473+
Server Manual Entries
474+
~~~~~~~~~~~~~~~~~~~~~
475+
476+
- :manual:`Change Streams </changeStreams/>`
477+
- :manual:`Change Events </reference/change-events/>`
478+
- :manual:`Aggregation Pipeline </reference/operator/aggregation-pipeline/>`
479+
- :manual:`Aggregation Stages </changeStreams/#modify-change-stream-output>`

0 commit comments

Comments
 (0)