Skip to content

Commit 55f1d1f

Browse files
committed
DOCSP-41140: Change Streams
1 parent ae8646c commit 55f1d1f

File tree

3 files changed

+336
-4
lines changed

3 files changed

+336
-4
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import com.mongodb.ConnectionString
2+
import com.mongodb.MongoClientSettings
3+
import com.mongodb.client.model.Aggregates
4+
import com.mongodb.client.model.Filters
5+
import com.mongodb.client.model.Updates
6+
import com.mongodb.client.model.changestream.FullDocument
7+
import com.mongodb.kotlin.client.MongoClient
8+
9+
// start-data-class
10+
data class Restaurant(
11+
val name: String,
12+
val cuisine: String,
13+
)
14+
// end-data-class
15+
16+
fun main() {
17+
val uri = "<connection string URI>"
18+
19+
val settings = MongoClientSettings.builder()
20+
.applyConnectionString(ConnectionString(uri))
21+
.retryWrites(true)
22+
.build()
23+
24+
val mongoClient = MongoClient.create(settings)
25+
val database = mongoClient.getDatabase("sample_restaurants")
26+
val collection = database.getCollection<Restaurant>("restaurants")
27+
28+
// start-open-change-stream
29+
collection.watch().forEach { change ->
30+
println(change)
31+
}
32+
// end-open-change-stream
33+
34+
// start-update-for-change-stream
35+
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
36+
val update = Updates.set(Restaurant::cuisine.name, "Irish")
37+
38+
val result = collection.updateOne(filter, update)
39+
// end-update-for-change-stream
40+
41+
// start-change-stream-pipeline
42+
val pipeline = listOf(
43+
Aggregates.match(Filters.eq("operationType", "update"))
44+
)
45+
46+
collection.watch(pipeline).forEach { change ->
47+
println(change)
48+
}
49+
// end-change-stream-pipeline
50+
51+
// start-change-stream-post-image
52+
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
53+
println("Received a change: $change")
54+
}
55+
// end-change-stream-post-image
56+
}
57+

source/read.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Read Data from MongoDB
2828
/read/specify-documents-to-return
2929
/read/count
3030
/read/distinct
31+
/read/change-streams
3132

3233
Overview
3334
--------
@@ -55,9 +56,9 @@ the relevant values for your MongoDB deployment.
5556
:linenos:
5657
:emphasize-lines: 20-22
5758

58-
.. .. tip::
59+
.. tip::
5960

60-
.. For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-quick-start>`.
61+
For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-download-install>`.
6162

6263
Find Documents
6364
--------------
@@ -150,5 +151,5 @@ subsequent change events in that collection:
150151
:copyable:
151152
:dedent:
152153

153-
.. TODO: To learn more about the ``watch()`` method, see the
154-
.. :ref:`kotlin-sync-change-streams` guide.
154+
To learn more about the ``watch()`` method, see the
155+
:ref:`kotlin-sync-change-streams` guide.

source/read/change-streams.txt

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
.. _kotlin-sync-change-streams:
2+
3+
====================
4+
Monitor Data Changes
5+
====================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
.. facet::
14+
:name: genre
15+
:values: reference
16+
17+
.. meta::
18+
:keywords: watch, code example
19+
20+
Overview
21+
--------
22+
23+
In this guide, you can learn how to use a **change stream** to monitor real-time
24+
changes to your database. A change stream is a {+mdb-server+} feature that
25+
allows your application to subscribe to data changes on a collection, database,
26+
or deployment.
27+
28+
Sample Data
29+
~~~~~~~~~~~
30+
31+
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
32+
database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
33+
free MongoDB Atlas cluster and load the sample datasets, see the
34+
:atlas:`Get Started with Atlas </getting-started>` guide.
35+
36+
The following {+language+} data class models the documents in this collection:
37+
38+
.. literalinclude:: /includes/read/change-streams.kt
39+
:start-after: start-data-class
40+
:end-before: end-data-class
41+
:language: kotlin
42+
:copyable:
43+
44+
Open a Change Stream
45+
--------------------
46+
47+
To open a change stream, call the ``watch()`` method. The instance on which you
48+
call the ``watch()`` method on determines the scope of events that the change
49+
stream listens for. You can call the ``watch()`` method on the following
50+
classes:
51+
52+
- ``MongoClient``: To monitor all changes in the MongoDB deployment
53+
- ``MongoDatabase``: To monitor changes in all collections in the database
54+
- ``MongoCollection``: To monitor changes in the collection
55+
56+
The following example opens a change stream on the ``restaurants`` collection
57+
and prints changes as they occur:
58+
59+
.. literalinclude:: /includes/read/change-streams.kt
60+
:start-after: start-open-change-stream
61+
:end-before: end-open-change-stream
62+
:language: kotlin
63+
:copyable:
64+
65+
To begin watching for changes, run the application. Then, in a separate
66+
application or shell, modify the ``restaurants`` collection. The following
67+
example updates a document with a ``name`` field value of ``Blarney Castle``:
68+
69+
.. _kotlin-sync-change-stream-update:
70+
71+
.. literalinclude:: /includes/read/change-streams.kt
72+
:start-after: start-update-for-change-stream
73+
:end-before: end-update-for-change-stream
74+
:language: kotlin
75+
:copyable:
76+
77+
When you update the collection, the change stream application prints the change
78+
as it occurs. The printed change event resembles the
79+
following:
80+
81+
.. code-block:: json
82+
83+
{
84+
"cursor": {
85+
"nextBatch": [
86+
{
87+
"_id": { ... },
88+
"operationType": "update",
89+
"clusterTime": { ... },
90+
...
91+
"ns": {
92+
"db": "sample_restaurants",
93+
"coll": "restaurants"
94+
},
95+
...
96+
"updateDescription": {
97+
"updatedFields": {
98+
"cuisine": "Irish"
99+
},
100+
"removedFields": [],
101+
"truncatedArrays": []
102+
}
103+
}
104+
],
105+
...
106+
}
107+
}
108+
109+
Modify the Change Stream Output
110+
-------------------------------
111+
112+
You can pass the ``pipeline`` parameter to the ``watch()`` method to modify the
113+
change stream output. This parameter allows you to watch for only specified
114+
change events. Format the parameter as a list of objects that each represent an
115+
aggregation stage.
116+
117+
You can specify the following stages in the ``pipeline`` parameter:
118+
119+
- ``$addFields``
120+
- ``$match``
121+
- ``$project``
122+
- ``$replaceRoot``
123+
- ``$replaceWith``
124+
- ``$redact``
125+
- ``$set``
126+
- ``$unset``
127+
128+
The following example uses the ``pipeline`` parameter to open a change stream
129+
that records only update operations:
130+
131+
.. literalinclude:: /includes/read/change-streams.kt
132+
:start-after: start-change-stream-pipeline
133+
:end-before: end-change-stream-pipeline
134+
:language: kotlin
135+
:copyable:
136+
137+
To learn more about modifying your change stream output, see the
138+
:manual:`Modify Change Stream Output
139+
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
140+
manual.
141+
142+
Modify ``watch()`` Behavior
143+
---------------------------
144+
145+
The ``watch()`` method can be modified by chaining methods to the ``ChangeStreamIterable``
146+
object returned by the ``watch()`` method call. If you don't specify any options, the
147+
driver does not customize the operation.
148+
149+
The following table describes methods you can use to customize the behavior
150+
of ``watch()``:
151+
152+
.. list-table::
153+
:widths: 30 70
154+
:header-rows: 1
155+
156+
* - Method
157+
- Description
158+
159+
* - ``batchSize()``
160+
- | Sets the number of documents to return per batch.
161+
162+
* - ``collation()``
163+
- | Specifies the kind of language collation to use when sorting
164+
results. For more information, see :manual:`Collation </reference/collation/#std-label-collation>`
165+
in the {+mdb-server+} manual.
166+
167+
* - ``comment()``
168+
- | Specifies a comment to attach to the operation.
169+
170+
* - ``forEach()``
171+
- | Performs the given action on each element and safely closes the cursor.
172+
173+
* - ``fullDocument()``
174+
- | Sets the ``fullDocument`` value. To learn more, see the
175+
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.
176+
177+
* - ``fullDocumentBeforeChange()``
178+
- | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the
179+
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.
180+
181+
* - ``maxAwaitTime()``
182+
- | Sets the maximum await execution time on the server for this operation, in
183+
milliseconds.
184+
185+
* - ``toCollection()``
186+
- | Appends all elements to the given destination collection.
187+
188+
For a complete list of methods you can use to configure the ``watch()`` method, see
189+
the `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__
190+
API documentation.
191+
192+
.. _kotlin-sync-change-stream-pre-post-image:
193+
194+
Include Pre-Images and Post-Images
195+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
196+
197+
.. important::
198+
199+
You can enable pre-images and post-images on collections only if your
200+
deployment uses MongoDB v6.0 or later.
201+
202+
By default, when you perform an operation on a collection, the
203+
corresponding change event includes only the delta of the fields
204+
modified by that operation. To see the full document before or after a
205+
change, chain the ``fullDocumentBeforeChange()`` or the ``fullDocument()``
206+
methods to the ``watch()`` method.
207+
208+
The **pre-image** is the full version of a document *before* a change. To include the
209+
pre-image in the change stream event, pass the ``fullDocumentBeforeChange()`` method
210+
one of the following parameters:
211+
212+
- ``FullDocumentBeforeChange.WHEN_AVAILABLE``: The change event includes a pre-image of the
213+
modified document for change events only if the pre-image is available.
214+
- ``FullDocumentBeforeChange.REQUIRED``: The change event includes a pre-image of the
215+
modified document for change events. If the pre-image is not available, the
216+
driver raises an error.
217+
218+
The **post-image** is the full version of a document *after* a change. To include the
219+
post-image in the change stream event, pass the ``fullDocument`` method one of the
220+
following parameters:
221+
222+
- ``FullDocument.UPDATE_LOOKUP``: The change event includes a copy of the entire changed
223+
document from some time after the change.
224+
- ``FullDocument.WHEN_AVAILABLE``: The change event includes a post-image of the
225+
modified document for change events only if the post-image is available.
226+
- ``FullDocument.REQUIRED``: The change event includes a post-image of the
227+
modified document for change events. If the post-image is not available, the
228+
driver raises an error.
229+
230+
The following example calls the ``watch()`` method on a collection and includes the post-image
231+
of updated documents by specifying the ``fullDocument`` parameter:
232+
233+
.. literalinclude:: /includes/read/change-streams.kt
234+
:start-after: start-change-stream-post-image
235+
:end-before: end-change-stream-post-image
236+
:language: kotlin
237+
:copyable:
238+
239+
With the change stream application running, updating a document in the
240+
``restaurants`` collection by using the :ref:`preceding update example
241+
<kotlin-sync-change-stream-update>` prints a change event resembling the following:
242+
243+
244+
.. code-block:: sh
245+
246+
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
247+
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
248+
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
249+
clusterTime=Timestamp{value=..., seconds=..., inc=...},
250+
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
251+
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
252+
wallTime=BsonDateTime{value=...}}
253+
254+
255+
To learn more about pre-images and post-images, see
256+
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
257+
in the {+mdb-server+} manual.
258+
259+
Additional Information
260+
----------------------
261+
262+
To learn more about change streams, see :manual:`Change Streams
263+
</changeStreams>` in the {+mdb-server+} manual.
264+
265+
API Documentation
266+
~~~~~~~~~~~~~~~~~
267+
268+
To learn more about any of the methods or types discussed in this
269+
guide, see the following API documentation:
270+
271+
- `MongoClient.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-client/watch.html>`__
272+
- `MongoDatabase.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-database/watch.html>`__
273+
- `MongoCollection.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-collection/watch.html>`__
274+
- `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__

0 commit comments

Comments
 (0)