Skip to content

Commit edf5764

Browse files
committed
DOCSP-33427: split large cs events (#145)
* DOCSP-33427: split large cs events * CC PR suggestions (cherry picked from commit 9667a4f)
1 parent 8c40ed2 commit edf5764

File tree

3 files changed

+101
-5
lines changed

3 files changed

+101
-5
lines changed

examples/src/test/kotlin/ChangeStreamsTest.kt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import config.getConfig
1313
import kotlinx.coroutines.delay
1414
import kotlinx.coroutines.launch
1515
import kotlinx.coroutines.runBlocking
16+
import org.bson.BsonDocument
1617
import org.bson.Document
1718
import org.junit.jupiter.api.AfterAll
1819
import org.junit.jupiter.api.AfterEach
@@ -136,6 +137,37 @@ internal class ChangeStreamsTest {
136137

137138
}
138139

140+
// Ignore annotation added because this test requires a MongoDB 7.0 deployment
141+
@Ignore
142+
fun splitLargeChangeStreamTest() = runBlocking {
143+
val changeEvents = mutableListOf<ChangeStreamDocument<Document>>()
144+
// :snippet-start: split-large-change-stream
145+
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
146+
147+
val job = launch {
148+
val changeStream = collection.watch(pipeline)
149+
changeStream.collect {
150+
println("Received a change event: $it")
151+
changeEvents.add(it) // :remove:
152+
}
153+
}
154+
// :snippet-end:
155+
156+
// Perform MongoDB operations that trigger change events...
157+
delay(1)
158+
val testData = Document("city", "Rio de Janeiro")
159+
collection.insertOne(testData)
160+
161+
// Wait for change events
162+
delay(1000)
163+
164+
// Cancel the change stream when you're done listening for events.
165+
job.cancel()
166+
167+
// Change stream only captures the insert event, not the delete event.
168+
assertEquals(1, changeEvents.size)
169+
}
170+
139171
// NOTE: Test is being ignored because it will not work with a shared M0 cluster.
140172
// Must have a local cluster with a replica set or >=M10 on Atlas to successfully run.
141173
@Ignore
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument()))
2+
3+
val job = launch {
4+
val changeStream = collection.watch(pipeline)
5+
changeStream.collect {
6+
println("Received a change event: $it")
7+
}
8+
}

source/fundamentals/crud/read-operations/change-streams.txt

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55
Open Change Streams
66
===================
77

8+
.. facet::
9+
:name: genre
10+
:values: reference
11+
12+
.. meta::
13+
:keywords: code example, monitoring, aggregation
14+
815
.. contents:: On this page
916
:local:
1017
:backlinks: none
@@ -18,15 +25,16 @@ In this guide, you can learn how to use a **change stream** to monitor
1825
real-time changes to your database. A change stream is a {+mdb-server+}
1926
feature that allows your application to subscribe to data changes on a single
2027
collection, database, or deployment. You can specify a set of aggregation
21-
operators to filter and transform the data your application receives. When
22-
connecting to a MongoDB deployment v6.0 or later, you can configure the
23-
events to include the document data before and after the change.
28+
operators to filter and transform the data your application receives.
29+
When connecting to MongoDB v6.0 or later, you can configure the events
30+
to include the document data before and after the change.
2431

2532
Learn how to open and configure your change streams in the following
2633
sections:
2734

2835
- :ref:`<kotlin-change-stream-open>`
2936
- :ref:`<kotlin-change-stream-aggregation>`
37+
- :ref:`<kotlin-change-stream-split-large-event>`
3038
- :ref:`<kotlin-change-stream-configure-pre-post>`
3139

3240
.. _kotlin-change-stream-open:
@@ -107,9 +115,8 @@ The following code example shows how you can apply an aggregation pipeline to
107115
configure your change stream to receive change events for only insert and
108116
update operations:
109117

110-
111118
.. literalinclude:: /examples/generated/ChangeStreamsTest.snippet.apply-aggregation-operations-to-change-stream.kt
112-
:language: java
119+
:language: kotlin
113120

114121
When the change stream receives an update change event, the preceding code
115122
example outputs the following text:
@@ -122,6 +129,55 @@ example outputs the following text:
122129
resumeToken={...},
123130
...
124131

132+
.. _kotlin-change-stream-split-large-event:
133+
134+
Split Large Change Stream Events
135+
--------------------------------
136+
137+
When connecting to MongoDB v7.0 or later,
138+
you can use the ``$changeStreamSplitLargeEvent`` aggregation operator to
139+
split event documents that exceed 16 MB into smaller fragments.
140+
141+
Use the ``$changeStreamSplitLargeEvent`` operator only when you expect
142+
the change stream events to exceed the document size limit. For
143+
example, you might use this feature if your application requires full
144+
document pre-images or post-images.
145+
146+
A ``$changeStreamSplitLargeEvent`` aggregation stage returns
147+
fragments sequentially. You can access the fragments by using a change
148+
stream cursor. Each fragment document includes a ``splitEvent`` object that
149+
contains the following fields:
150+
151+
.. list-table::
152+
:header-rows: 1
153+
:widths: 35 65
154+
155+
* - Field
156+
- Description
157+
158+
* - ``fragment``
159+
- The index of the fragment, starting at ``1``
160+
161+
* - ``of``
162+
- The total number of fragments that compose the split event
163+
164+
The following example opens a change stream that includes an aggregation
165+
pipeline with an ``$changeStreamSplitLargeEvent`` aggregation stage to
166+
split large events:
167+
168+
.. literalinclude:: /examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt
169+
:language: kotlin
170+
171+
.. note::
172+
173+
You can have only one ``$changeStreamSplitLargeEvent`` stage in your
174+
aggregation pipeline, and it must be the last stage in the pipeline.
175+
176+
To learn more about the ``$changeStreamSplitLargeEvent`` aggregation operator,
177+
see :manual:`$changeStreamSplitLargeEvent (aggregation)
178+
</reference/operator/aggregation/changeStreamSplitLargeEvent/>` in the
179+
Server manual.
180+
125181
.. _kotlin-change-stream-configure-pre-post:
126182

127183
Include Pre-images and Post-images

0 commit comments

Comments
 (0)