Skip to content

Commit 7f97934

Browse files
authored
DOCSP-41140: Change Streams (mongodb#31)
1 parent 3897174 commit 7f97934

File tree

3 files changed

+330
-4
lines changed

3 files changed

+330
-4
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import com.mongodb.ConnectionString
2+
import com.mongodb.MongoClientSettings
3+
import com.mongodb.client.model.Aggregates
4+
import com.mongodb.client.model.Filters
5+
import com.mongodb.client.model.Updates
6+
import com.mongodb.client.model.changestream.FullDocument
7+
import com.mongodb.kotlin.client.MongoClient
8+
9+
// start-data-class
10+
data class Restaurant(
11+
val name: String,
12+
val cuisine: String,
13+
)
14+
// end-data-class
15+
16+
fun main() {
17+
val uri = "<connection string URI>"
18+
19+
val settings = MongoClientSettings.builder()
20+
.applyConnectionString(ConnectionString(uri))
21+
.build()
22+
23+
val mongoClient = MongoClient.create(settings)
24+
val database = mongoClient.getDatabase("sample_restaurants")
25+
val collection = database.getCollection<Restaurant>("restaurants")
26+
27+
// start-open-change-stream
28+
collection.watch().forEach { change ->
29+
println(change)
30+
}
31+
// end-open-change-stream
32+
33+
// start-update-for-change-stream
34+
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
35+
val update = Updates.set(Restaurant::cuisine.name, "Irish")
36+
37+
val result = collection.updateOne(filter, update)
38+
// end-update-for-change-stream
39+
40+
// start-change-stream-pipeline
41+
val pipeline = listOf(
42+
Aggregates.match(Filters.eq("operationType", "update"))
43+
)
44+
45+
collection.watch(pipeline).forEach { change ->
46+
println(change)
47+
}
48+
// end-change-stream-pipeline
49+
50+
// start-change-stream-post-image
51+
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
52+
println("Received a change: $change")
53+
}
54+
// end-change-stream-post-image
55+
}
56+

source/read.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Read Data from MongoDB
2828
/read/specify-documents-to-return
2929
/read/count
3030
/read/distinct
31+
/read/change-streams
3132

3233
Overview
3334
--------
@@ -55,9 +56,9 @@ the relevant values for your MongoDB deployment.
5556
:linenos:
5657
:emphasize-lines: 20-22
5758

58-
.. .. tip::
59+
.. tip::
5960

60-
.. For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-quick-start>`.
61+
For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-download-install>`.
6162

6263
Find Documents
6364
--------------
@@ -150,5 +151,5 @@ subsequent change events in that collection:
150151
:copyable:
151152
:dedent:
152153

153-
.. TODO: To learn more about the ``watch()`` method, see the
154-
.. :ref:`kotlin-sync-change-streams` guide.
154+
To learn more about the ``watch()`` method, see the
155+
:ref:`kotlin-sync-change-streams` guide.

source/read/change-streams.txt

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
.. _kotlin-sync-change-streams:
2+
3+
====================
4+
Monitor Data Changes
5+
====================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
.. facet::
14+
:name: genre
15+
:values: reference
16+
17+
.. meta::
18+
:keywords: watch, code example
19+
20+
Overview
21+
--------
22+
23+
In this guide, you can learn how to use the {+driver-short+} to monitor a **change stream**,
24+
allowing you to view real-time changes to your database. A change stream is a {+mdb-server+} feature that
25+
publishes data changes on a collection, database, or deployment. Your application can
26+
subscribe to a change stream and use events to perform other actions.
27+
28+
Sample Data
29+
~~~~~~~~~~~
30+
31+
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
32+
database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
33+
free MongoDB Atlas cluster and load the sample datasets, see the
34+
:atlas:`Get Started with Atlas </getting-started>` guide.
35+
36+
The following {+language+} data class models the documents in this collection:
37+
38+
.. literalinclude:: /includes/read/change-streams.kt
39+
:start-after: start-data-class
40+
:end-before: end-data-class
41+
:language: kotlin
42+
:copyable:
43+
44+
Open a Change Stream
45+
--------------------
46+
47+
To open a change stream, call the ``watch()`` method. The instance on which you
48+
call the ``watch()`` method on determines the scope of events that the change
49+
stream listens for. You can call the ``watch()`` method on instances of the following
50+
classes:
51+
52+
- ``MongoClient``: To monitor all changes in the MongoDB deployment
53+
- ``MongoDatabase``: To monitor changes in all collections in the database
54+
- ``MongoCollection``: To monitor changes in the collection
55+
56+
Open a Change Stream Example
57+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
58+
59+
The following example opens a change stream on the ``restaurants`` collection
60+
and prints changes as they occur:
61+
62+
.. literalinclude:: /includes/read/change-streams.kt
63+
:start-after: start-open-change-stream
64+
:end-before: end-open-change-stream
65+
:language: kotlin
66+
:copyable:
67+
:dedent:
68+
69+
To begin watching for changes, run the application. Then, in a separate
70+
application or shell, perform a write operation on the ``restaurants`` collection. The
71+
following example updates a document in which the value of the ``name`` is ``"Blarney Castle"``:
72+
73+
.. _kotlin-sync-change-stream-update:
74+
75+
.. literalinclude:: /includes/read/change-streams.kt
76+
:start-after: start-update-for-change-stream
77+
:end-before: end-update-for-change-stream
78+
:language: kotlin
79+
:copyable:
80+
:dedent:
81+
82+
When you update the collection, the change stream application prints the change
83+
as it occurs. The printed change event resembles the
84+
following:
85+
86+
.. code-block:: json
87+
88+
{
89+
"_id": { ... },
90+
"operationType": "update",
91+
"clusterTime": { ... },
92+
"ns": {
93+
"db": "sample_restaurants",
94+
"coll": "restaurants"
95+
},
96+
"updateDescription": {
97+
"updatedFields": {
98+
"cuisine": "Irish"
99+
},
100+
"removedFields": [],
101+
"truncatedArrays": []
102+
}
103+
...
104+
}
105+
106+
Modify the Change Stream Output
107+
-------------------------------
108+
109+
You can pass the ``pipeline`` parameter to the ``watch()`` method to modify the
110+
change stream output. This parameter allows you to watch for only specified
111+
change events. Format the parameter as a list of objects that each represents an
112+
aggregation stage.
113+
114+
You can specify the following stages in the ``pipeline`` parameter:
115+
116+
- ``$addFields``
117+
- ``$match``
118+
- ``$project``
119+
- ``$replaceRoot``
120+
- ``$replaceWith``
121+
- ``$redact``
122+
- ``$set``
123+
- ``$unset``
124+
125+
Match Specific events Example
126+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
127+
128+
The following example uses the ``pipeline`` parameter that includes a ``$match``
129+
to open a change stream that only records update operations:
130+
131+
.. literalinclude:: /includes/read/change-streams.kt
132+
:start-after: start-change-stream-pipeline
133+
:end-before: end-change-stream-pipeline
134+
:language: kotlin
135+
:copyable:
136+
:dedent:
137+
138+
To learn more about modifying your change stream output, see the
139+
:manual:`Modify Change Stream Output
140+
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
141+
manual.
142+
143+
Modify watch() Behavior
144+
-----------------------
145+
146+
You can modify the ``watch()`` by chaining methods to the ``ChangeStreamIterable``
147+
object returned by the ``watch()`` method call. If you don't specify any options, the
148+
driver does not customize the operation.
149+
150+
The following table describes methods you can use to customize the behavior
151+
of ``watch()``:
152+
153+
.. list-table::
154+
:widths: 30 70
155+
:header-rows: 1
156+
157+
* - Method
158+
- Description
159+
160+
* - ``batchSize()``
161+
- | Sets the number of documents to return per batch.
162+
163+
* - ``collation()``
164+
- | Specifies the kind of language collation to use when sorting
165+
results. For more information, see :manual:`Collation </reference/collation/#std-label-collation>`
166+
in the {+mdb-server+} manual.
167+
168+
* - ``comment()``
169+
- | Specifies a comment to attach to the operation.
170+
171+
* - ``fullDocument()``
172+
- | Sets the ``fullDocument`` value. To learn more, see the
173+
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.
174+
175+
* - ``fullDocumentBeforeChange()``
176+
- | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the
177+
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.
178+
179+
* - ``maxAwaitTime()``
180+
- | Sets the maximum await execution time on the server for this operation, in
181+
milliseconds.
182+
183+
For a complete list of methods you can use to configure the ``watch()`` method, see
184+
the `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__
185+
API documentation.
186+
187+
.. _kotlin-sync-change-stream-pre-post-image:
188+
189+
Include Pre-Images and Post-Images
190+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
191+
192+
.. important::
193+
194+
You can enable pre-images and post-images on collections only if your
195+
deployment uses MongoDB v6.0 or later.
196+
197+
By default, when you perform an operation on a collection, the
198+
corresponding change event includes only the delta of the fields
199+
modified by that operation. To see the full document before or after a
200+
change, chain the ``fullDocumentBeforeChange()`` or the ``fullDocument()``
201+
methods to the ``watch()`` method.
202+
203+
The **pre-image** is the full version of a document *before* a change. To include the
204+
pre-image in the change stream event, pass one of the following options to the
205+
``fullDocumentBeforeChange()`` method:
206+
207+
- ``FullDocumentBeforeChange.WHEN_AVAILABLE``: The change event includes a pre-image of the
208+
modified document for change events only if the pre-image is available.
209+
- ``FullDocumentBeforeChange.REQUIRED``: The change event includes a pre-image of the
210+
modified document for change events. If the pre-image is not available, the
211+
driver raises an error.
212+
213+
The **post-image** is the full version of a document *after* a change. To include the
214+
post-image in the change stream event, pass one of the following options to the
215+
``fullDocument()`` method:
216+
217+
- ``FullDocument.UPDATE_LOOKUP``: The change event includes a copy of the entire changed
218+
document from some time after the change.
219+
- ``FullDocument.WHEN_AVAILABLE``: The change event includes a post-image of the
220+
modified document for change events only if the post-image is available.
221+
- ``FullDocument.REQUIRED``: The change event includes a post-image of the
222+
modified document for change events. If the post-image is not available, the
223+
driver raises an error.
224+
225+
The following example calls the ``watch()`` method on a collection and includes the post-image
226+
of updated documents in the results by specifying the ``fullDocument`` parameter:
227+
228+
.. literalinclude:: /includes/read/change-streams.kt
229+
:start-after: start-change-stream-post-image
230+
:end-before: end-change-stream-post-image
231+
:language: kotlin
232+
:copyable:
233+
:dedent:
234+
235+
With the change stream application running, updating a document in the
236+
``restaurants`` collection by using the :ref:`preceding update example
237+
<kotlin-sync-change-stream-update>` prints a change event resembling the following:
238+
239+
.. code-block:: none
240+
241+
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
242+
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
243+
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
244+
clusterTime=Timestamp{value=..., seconds=..., inc=...},
245+
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
246+
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
247+
wallTime=BsonDateTime{value=...}}
248+
249+
250+
To learn more about pre-images and post-images, see
251+
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
252+
in the {+mdb-server+} manual.
253+
254+
Additional Information
255+
----------------------
256+
257+
To learn more about change streams, see :manual:`Change Streams
258+
</changeStreams>` in the {+mdb-server+} manual.
259+
260+
API Documentation
261+
~~~~~~~~~~~~~~~~~
262+
263+
To learn more about any of the methods or types discussed in this
264+
guide, see the following API documentation:
265+
266+
- `MongoClient.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-client/watch.html>`__
267+
- `MongoDatabase.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-database/watch.html>`__
268+
- `MongoCollection.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-collection/watch.html>`__
269+
- `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__

0 commit comments

Comments
 (0)