Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions source/includes/read/change-streams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import com.mongodb.ConnectionString
import com.mongodb.MongoClientSettings
import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.kotlin.client.MongoClient

// start-data-class
data class Restaurant(
val name: String,
val cuisine: String,
)
// end-data-class

fun main() {
val uri = "<connection string URI>"

val settings = MongoClientSettings.builder()
.applyConnectionString(ConnectionString(uri))
.build()

val mongoClient = MongoClient.create(settings)
val database = mongoClient.getDatabase("sample_restaurants")
val collection = database.getCollection<Restaurant>("restaurants")

// start-open-change-stream
collection.watch().forEach { change ->
println(change)
}
// end-open-change-stream

// start-update-for-change-stream
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")
val update = Updates.set(Restaurant::cuisine.name, "Irish")

val result = collection.updateOne(filter, update)
// end-update-for-change-stream

// start-change-stream-pipeline
val pipeline = listOf(
Aggregates.match(Filters.eq("operationType", "update"))
)

collection.watch(pipeline).forEach { change ->
println(change)
}
// end-change-stream-pipeline

// start-change-stream-post-image
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change ->
println("Received a change: $change")
}
// end-change-stream-post-image
}

9 changes: 5 additions & 4 deletions source/read.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Read Data from MongoDB
/read/specify-documents-to-return
/read/count
/read/distinct
/read/change-streams

Overview
--------
Expand Down Expand Up @@ -55,9 +56,9 @@ the relevant values for your MongoDB deployment.
:linenos:
:emphasize-lines: 20-22

.. .. tip::
.. tip::

.. For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-quick-start>`.
For instructions about how to install the {+driver-short+}, see :ref:`<kotlin-sync-download-install>`.

Find Documents
--------------
Expand Down Expand Up @@ -150,5 +151,5 @@ subsequent change events in that collection:
:copyable:
:dedent:

.. TODO: To learn more about the ``watch()`` method, see the
.. :ref:`kotlin-sync-change-streams` guide.
To learn more about the ``watch()`` method, see the
:ref:`kotlin-sync-change-streams` guide.
284 changes: 284 additions & 0 deletions source/read/change-streams.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
.. _kotlin-sync-change-streams:

====================
Monitor Data Changes
====================

.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol

.. facet::
:name: genre
:values: reference

.. meta::
:keywords: watch, code example

Overview
--------

In this guide, you can learn how to use the {+driver-short+} to monitor a **change stream**,
allowing you to view real-time changes to your database. A change stream is a {+mdb-server+} feature that
publishes data changes on a collection, database, or deployment. Your application can
subscribe to a change stream and use events to perform other actions.

Sample Data
~~~~~~~~~~~

The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the
:atlas:`Get Started with Atlas </getting-started>` guide.

The following {+language+} data class models the documents in this collection:

.. literalinclude:: /includes/read/change-streams.kt
:start-after: start-data-class
:end-before: end-data-class
:language: kotlin
:copyable:

Open a Change Stream
--------------------

To open a change stream, call the ``watch()`` method. The instance on which you
call the ``watch()`` method on determines the scope of events that the change
stream listens for. You can call the ``watch()`` method on instances of the following
classes:

- ``MongoClient``: To monitor all changes in the MongoDB deployment
- ``MongoDatabase``: To monitor changes in all collections in the database
- ``MongoCollection``: To monitor changes in the collection

Open a Change Stream Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example opens a change stream on the ``restaurants`` collection
and prints changes as they occur:

.. literalinclude:: /includes/read/change-streams.kt
:start-after: start-open-change-stream
:end-before: end-open-change-stream
:language: kotlin
:copyable:
:dedent:

To begin watching for changes, run the application. Then, in a separate
application or shell, perform a write operation on the ``restaurants`` collection. The
following example updates a document in which the value of the ``name`` is ``"Blarney Castle"``:

.. _kotlin-sync-change-stream-update:

.. literalinclude:: /includes/read/change-streams.kt
:start-after: start-update-for-change-stream
:end-before: end-update-for-change-stream
:language: kotlin
:copyable:
:dedent:

When you update the collection, the change stream application prints the change
as it occurs. The printed change event resembles the
following:

.. code-block:: json
:emphasize-lines: 14-20

{
"cursor": {
"nextBatch": [
{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
...
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
...
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
}
],
...
}
}

Modify the Change Stream Output
-------------------------------

You can pass the ``pipeline`` parameter to the ``watch()`` method to modify the
change stream output. This parameter allows you to watch for only specified
change events. Format the parameter as a list of objects that each represents an
aggregation stage.

You can specify the following stages in the ``pipeline`` parameter:

- ``$addFields``
- ``$match``
- ``$project``
- ``$replaceRoot``
- ``$replaceWith``
- ``$redact``
- ``$set``
- ``$unset``

Match Specific events Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example uses the ``pipeline`` parameter that includes a ``$match``
to open a change stream that only records update operations:

.. literalinclude:: /includes/read/change-streams.kt
:start-after: start-change-stream-pipeline
:end-before: end-change-stream-pipeline
:language: kotlin
:copyable:
:dedent:

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.

Modify watch() Behavior
-----------------------

You can modify the ``watch()`` by chaining methods to the ``ChangeStreamIterable``
object returned by the ``watch()`` method call. If you don't specify any options, the
driver does not customize the operation.

The following table describes methods you can use to customize the behavior
of ``watch()``:

.. list-table::
:widths: 30 70
:header-rows: 1

* - Method
- Description

* - ``batchSize()``
- | Sets the number of documents to return per batch.

* - ``collation()``
- | Specifies the kind of language collation to use when sorting
results. For more information, see :manual:`Collation </reference/collation/#std-label-collation>`
in the {+mdb-server+} manual.

* - ``comment()``
- | Specifies a comment to attach to the operation.

* - ``forEach()``
- | Performs the given action on each element and safely closes the cursor.

* - ``fullDocument()``
- | Sets the ``fullDocument`` value. To learn more, see the
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.

* - ``fullDocumentBeforeChange()``
- | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document.

* - ``maxAwaitTime()``
- | Sets the maximum await execution time on the server for this operation, in
milliseconds.

* - ``toCollection()``
- | Inserts all elements into the given destination collection.

For a complete list of methods you can use to configure the ``watch()`` method, see
the `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__
API documentation.

.. _kotlin-sync-change-stream-pre-post-image:

Include Pre-Images and Post-Images
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. important::

You can enable pre-images and post-images on collections only if your
deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, chain the ``fullDocumentBeforeChange()`` or the ``fullDocument()``
methods to the ``watch()`` method.

The **pre-image** is the full version of a document *before* a change. To include the
pre-image in the change stream event, pass one of the following options to the
``fullDocumentBeforeChange()`` method:

- ``FullDocumentBeforeChange.WHEN_AVAILABLE``: The change event includes a pre-image of the
modified document for change events only if the pre-image is available.
- ``FullDocumentBeforeChange.REQUIRED``: The change event includes a pre-image of the
modified document for change events. If the pre-image is not available, the
driver raises an error.

The **post-image** is the full version of a document *after* a change. To include the
post-image in the change stream event, pass one of the following options to the
``fullDocument()`` method:

- ``FullDocument.UPDATE_LOOKUP``: The change event includes a copy of the entire changed
document from some time after the change.
- ``FullDocument.WHEN_AVAILABLE``: The change event includes a post-image of the
modified document for change events only if the post-image is available.
- ``FullDocument.REQUIRED``: The change event includes a post-image of the
modified document for change events. If the post-image is not available, the
driver raises an error.

The following example calls the ``watch()`` method on a collection and includes the post-image
of updated documents in the results by specifying the ``fullDocument`` parameter:

.. literalinclude:: /includes/read/change-streams.kt
:start-after: start-change-stream-post-image
:end-before: end-change-stream-post-image
:language: kotlin
:copyable:
:dedent:

With the change stream application running, updating a document in the
``restaurants`` collection by using the :ref:`preceding update example
<kotlin-sync-change-stream-update>` prints a change event resembling the following:

.. code-block:: none

ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},
clusterTime=Timestamp{value=..., seconds=..., inc=...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,
wallTime=BsonDateTime{value=...}}


To learn more about pre-images and post-images, see
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
in the {+mdb-server+} manual.

Additional Information
----------------------

To learn more about change streams, see :manual:`Change Streams
</changeStreams>` in the {+mdb-server+} manual.

API Documentation
~~~~~~~~~~~~~~~~~

To learn more about any of the methods or types discussed in this
guide, see the following API documentation:

- `MongoClient.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-client/watch.html>`__
- `MongoDatabase.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-database/watch.html>`__
- `MongoCollection.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-collection/watch.html>`__
- `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__
Comment on lines +266 to +269
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer: these links are broken at the moment.

Rea has a change that has yet to be merged that changes the {+api+} source constant. These will work once it's merged.

Loading