-
Notifications
You must be signed in to change notification settings - Fork 10
DOCSP-41140: Change Streams #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
55f1d1f
DOCSP-41140: Change Streams
mcmorisi 81e1af5
Fixes
mcmorisi d1557b7
Address RR feedback
mcmorisi 16e4508
Further feedback
mcmorisi 71ff25a
Address technical feedback
mcmorisi 5e97ce5
Address further technical feedback
mcmorisi 587b2d5
Fix
mcmorisi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import com.mongodb.ConnectionString | ||
import com.mongodb.MongoClientSettings | ||
import com.mongodb.client.model.Aggregates | ||
import com.mongodb.client.model.Filters | ||
import com.mongodb.client.model.Updates | ||
import com.mongodb.client.model.changestream.FullDocument | ||
import com.mongodb.kotlin.client.MongoClient | ||
|
||
// start-data-class | ||
data class Restaurant( | ||
val name: String, | ||
val cuisine: String, | ||
) | ||
// end-data-class | ||
|
||
fun main() { | ||
val uri = "<connection string URI>" | ||
|
||
val settings = MongoClientSettings.builder() | ||
.applyConnectionString(ConnectionString(uri)) | ||
.retryWrites(true) | ||
.build() | ||
|
||
val mongoClient = MongoClient.create(settings) | ||
val database = mongoClient.getDatabase("sample_restaurants") | ||
val collection = database.getCollection<Restaurant>("restaurants") | ||
|
||
// start-open-change-stream | ||
collection.watch().forEach { change -> | ||
println(change) | ||
} | ||
// end-open-change-stream | ||
|
||
// start-update-for-change-stream | ||
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle") | ||
val update = Updates.set(Restaurant::cuisine.name, "Irish") | ||
|
||
val result = collection.updateOne(filter, update) | ||
// end-update-for-change-stream | ||
|
||
// start-change-stream-pipeline | ||
val pipeline = listOf( | ||
Aggregates.match(Filters.eq("operationType", "update")) | ||
) | ||
|
||
collection.watch(pipeline).forEach { change -> | ||
println(change) | ||
} | ||
// end-change-stream-pipeline | ||
|
||
// start-change-stream-post-image | ||
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> | ||
println("Received a change: $change") | ||
} | ||
// end-change-stream-post-image | ||
} | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,284 @@ | ||
.. _kotlin-sync-change-streams: | ||
|
||
==================== | ||
Monitor Data Changes | ||
==================== | ||
|
||
.. contents:: On this page | ||
:local: | ||
:backlinks: none | ||
:depth: 2 | ||
:class: singlecol | ||
|
||
.. facet:: | ||
:name: genre | ||
:values: reference | ||
|
||
.. meta:: | ||
:keywords: watch, code example | ||
|
||
Overview | ||
-------- | ||
|
||
In this guide, you can learn how to use the {+driver-short+} to monitor a **change stream**, | ||
allowing you to view real-time changes to your database. A change stream is a {+mdb-server+} feature that | ||
publishes data changes on a collection, database, or deployment. Your application can | ||
subscribe to a change stream and use events to perform other actions. | ||
|
||
Sample Data | ||
~~~~~~~~~~~ | ||
|
||
The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants`` | ||
database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a | ||
free MongoDB Atlas cluster and load the sample datasets, see the | ||
:atlas:`Get Started with Atlas </getting-started>` guide. | ||
|
||
The following {+language+} data class models the documents in this collection: | ||
|
||
.. literalinclude:: /includes/read/change-streams.kt | ||
:start-after: start-data-class | ||
:end-before: end-data-class | ||
:language: kotlin | ||
:copyable: | ||
|
||
Open a Change Stream | ||
-------------------- | ||
|
||
To open a change stream, call the ``watch()`` method. The instance on which you | ||
call the ``watch()`` method on determines the scope of events that the change | ||
stream listens for. You can call the ``watch()`` method on instances of the following | ||
classes: | ||
|
||
- ``MongoClient``: To monitor all changes in the MongoDB deployment | ||
- ``MongoDatabase``: To monitor changes in all collections in the database | ||
- ``MongoCollection``: To monitor changes in the collection | ||
|
||
rustagir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Open a Change Stream Example | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
||
The following example opens a change stream on the ``restaurants`` collection | ||
and prints changes as they occur: | ||
|
||
.. literalinclude:: /includes/read/change-streams.kt | ||
:start-after: start-open-change-stream | ||
:end-before: end-open-change-stream | ||
:language: kotlin | ||
:copyable: | ||
:dedent: | ||
|
||
To begin watching for changes, run the application. Then, in a separate | ||
application or shell, perform a write operation on the ``restaurants`` collection. The | ||
following example updates a document in which the value of the ``name`` is ``"Blarney Castle"``: | ||
|
||
.. _kotlin-sync-change-stream-update: | ||
|
||
.. literalinclude:: /includes/read/change-streams.kt | ||
:start-after: start-update-for-change-stream | ||
:end-before: end-update-for-change-stream | ||
:language: kotlin | ||
:copyable: | ||
:dedent: | ||
|
||
When you update the collection, the change stream application prints the change | ||
as it occurs. The printed change event resembles the | ||
following: | ||
|
||
.. code-block:: json | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
{ | ||
"cursor": { | ||
"nextBatch": [ | ||
{ | ||
"_id": { ... }, | ||
"operationType": "update", | ||
"clusterTime": { ... }, | ||
... | ||
"ns": { | ||
"db": "sample_restaurants", | ||
"coll": "restaurants" | ||
}, | ||
... | ||
jyemin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"updateDescription": { | ||
"updatedFields": { | ||
"cuisine": "Irish" | ||
}, | ||
"removedFields": [], | ||
"truncatedArrays": [] | ||
} | ||
} | ||
], | ||
... | ||
} | ||
} | ||
|
||
Modify the Change Stream Output | ||
------------------------------- | ||
|
||
You can pass the ``pipeline`` parameter to the ``watch()`` method to modify the | ||
change stream output. This parameter allows you to watch for only specified | ||
change events. Format the parameter as a list of objects that each represents an | ||
aggregation stage. | ||
|
||
You can specify the following stages in the ``pipeline`` parameter: | ||
|
||
- ``$addFields`` | ||
- ``$match`` | ||
- ``$project`` | ||
- ``$replaceRoot`` | ||
- ``$replaceWith`` | ||
- ``$redact`` | ||
- ``$set`` | ||
- ``$unset`` | ||
|
||
rustagir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Open a Change Stream with a Pipeline Example | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The following example uses the ``pipeline`` parameter to open a change stream | ||
that records only update operations: | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. literalinclude:: /includes/read/change-streams.kt | ||
:start-after: start-change-stream-pipeline | ||
:end-before: end-change-stream-pipeline | ||
:language: kotlin | ||
:copyable: | ||
:dedent: | ||
|
||
To learn more about modifying your change stream output, see the | ||
:manual:`Modify Change Stream Output | ||
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+} | ||
manual. | ||
|
||
Modify watch() Behavior | ||
----------------------- | ||
|
||
You can modify the ``watch()`` by chaining methods to the ``ChangeStreamIterable`` | ||
object returned by the ``watch()`` method call. If you don't specify any options, the | ||
driver does not customize the operation. | ||
|
||
The following table describes methods you can use to customize the behavior | ||
of ``watch()``: | ||
|
||
.. list-table:: | ||
:widths: 30 70 | ||
:header-rows: 1 | ||
|
||
* - Method | ||
- Description | ||
|
||
* - ``batchSize()`` | ||
- | Sets the number of documents to return per batch. | ||
|
||
* - ``collation()`` | ||
- | Specifies the kind of language collation to use when sorting | ||
results. For more information, see :manual:`Collation </reference/collation/#std-label-collation>` | ||
in the {+mdb-server+} manual. | ||
|
||
* - ``comment()`` | ||
- | Specifies a comment to attach to the operation. | ||
|
||
* - ``forEach()`` | ||
jyemin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- | Performs the given action on each element and safely closes the cursor. | ||
|
||
* - ``fullDocument()`` | ||
- | Sets the ``fullDocument`` value. To learn more, see the | ||
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document. | ||
|
||
* - ``fullDocumentBeforeChange()`` | ||
- | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the | ||
:ref:`<kotlin-sync-change-stream-pre-post-image>` section of this document. | ||
|
||
* - ``maxAwaitTime()`` | ||
- | Sets the maximum await execution time on the server for this operation, in | ||
milliseconds. | ||
|
||
* - ``toCollection()`` | ||
jyemin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- | Inserts all elements into the given destination collection. | ||
|
||
For a complete list of methods you can use to configure the ``watch()`` method, see | ||
the `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__ | ||
rustagir marked this conversation as resolved.
Show resolved
Hide resolved
jyemin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
API documentation. | ||
|
||
.. _kotlin-sync-change-stream-pre-post-image: | ||
|
||
Include Pre-Images and Post-Images | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
||
.. important:: | ||
|
||
You can enable pre-images and post-images on collections only if your | ||
deployment uses MongoDB v6.0 or later. | ||
|
||
By default, when you perform an operation on a collection, the | ||
corresponding change event includes only the delta of the fields | ||
modified by that operation. To see the full document before or after a | ||
change, chain the ``fullDocumentBeforeChange()`` or the ``fullDocument()`` | ||
methods to the ``watch()`` method. | ||
|
||
The **pre-image** is the full version of a document *before* a change. To include the | ||
pre-image in the change stream event, pass the ``fullDocumentBeforeChange()`` method | ||
one of the following parameters: | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- ``FullDocumentBeforeChange.WHEN_AVAILABLE``: The change event includes a pre-image of the | ||
modified document for change events only if the pre-image is available. | ||
- ``FullDocumentBeforeChange.REQUIRED``: The change event includes a pre-image of the | ||
modified document for change events. If the pre-image is not available, the | ||
driver raises an error. | ||
|
||
The **post-image** is the full version of a document *after* a change. To include the | ||
post-image in the change stream event, pass the ``fullDocument()`` method one of the | ||
following parameters: | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- ``FullDocument.UPDATE_LOOKUP``: The change event includes a copy of the entire changed | ||
document from some time after the change. | ||
- ``FullDocument.WHEN_AVAILABLE``: The change event includes a post-image of the | ||
modified document for change events only if the post-image is available. | ||
- ``FullDocument.REQUIRED``: The change event includes a post-image of the | ||
modified document for change events. If the post-image is not available, the | ||
driver raises an error. | ||
|
||
The following example calls the ``watch()`` method on a collection and includes the post-image | ||
of updated documents by specifying the ``fullDocument`` parameter: | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. literalinclude:: /includes/read/change-streams.kt | ||
:start-after: start-change-stream-post-image | ||
:end-before: end-change-stream-post-image | ||
:language: kotlin | ||
:copyable: | ||
:dedent: | ||
|
||
With the change stream application running, updating a document in the | ||
``restaurants`` collection by using the :ref:`preceding update example | ||
<kotlin-sync-change-stream-update>` prints a change event resembling the following: | ||
|
||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. code-block:: sh | ||
mcmorisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, | ||
namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish), | ||
fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}}, | ||
clusterTime=Timestamp{value=..., seconds=..., inc=...}, | ||
updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"}, | ||
truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null, | ||
wallTime=BsonDateTime{value=...}} | ||
|
||
|
||
To learn more about pre-images and post-images, see | ||
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>` | ||
in the {+mdb-server+} manual. | ||
|
||
Additional Information | ||
---------------------- | ||
|
||
To learn more about change streams, see :manual:`Change Streams | ||
</changeStreams>` in the {+mdb-server+} manual. | ||
|
||
API Documentation | ||
~~~~~~~~~~~~~~~~~ | ||
|
||
To learn more about any of the methods or types discussed in this | ||
guide, see the following API documentation: | ||
|
||
- `MongoClient.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-client/watch.html>`__ | ||
- `MongoDatabase.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-database/watch.html>`__ | ||
- `MongoCollection.watch() <{+api+}/com.mongodb.kotlin.client/-mongo-collection/watch.html>`__ | ||
- `ChangeStreamIterable <{+api+}/com.mongodb.kotlin.client/-change-stream-iterable/index.html>`__ | ||
Comment on lines
+266
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note for reviewer: these links are broken at the moment. Rea has a change that has yet to be merged that changes the |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.