@@ -21,7 +21,10 @@ import com.mongodb.OperationFunctionalSpecification
21
21
import com.mongodb.ReadConcern
22
22
import com.mongodb.WriteConcern
23
23
import com.mongodb.client.model.CreateCollectionOptions
24
+ import com.mongodb.client.model.changestream.ChangeStreamDocument
24
25
import com.mongodb.client.model.changestream.FullDocument
26
+ import com.mongodb.client.model.changestream.OperationType
27
+ import com.mongodb.client.model.changestream.UpdateDescription
25
28
import com.mongodb.client.test.CollectionHelper
26
29
import org.bson.BsonArray
27
30
import org.bson.BsonDocument
@@ -30,12 +33,15 @@ import org.bson.BsonInt64
30
33
import org.bson.BsonString
31
34
import org.bson.Document
32
35
import org.bson.codecs.BsonDocumentCodec
36
+ import org.bson.codecs.BsonValueCodecProvider
33
37
import org.bson.codecs.DocumentCodec
38
+ import org.bson.codecs.ValueCodecProvider
34
39
import spock.lang.IgnoreIf
35
40
36
41
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
37
42
import static com.mongodb.ClusterFixture.serverVersionAtLeast
38
43
import static java.util.concurrent.TimeUnit.MILLISECONDS
44
+ import static org.bson.codecs.configuration.CodecRegistries.fromProviders
39
45
40
46
@IgnoreIf ({ !(serverVersionAtLeast(3 , 5 ) && isDiscoverableReplicaSet()) })
41
47
class ChangeStreamOperationSpecification extends OperationFunctionalSpecification {
@@ -95,7 +101,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
95
101
async << [true , false ]
96
102
}
97
103
98
- def ' should support return the expected results' () {
104
+ def ' should return the expected results' () {
99
105
given :
100
106
def helper = getHelper()
101
107
@@ -133,6 +139,120 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
133
139
async << [true , false ]
134
140
}
135
141
142
+ def ' should decode insert to ChangeStreamDocument ' () {
143
+ given :
144
+ def helper = getHelper()
145
+
146
+ def pipeline = [BsonDocument . parse(' {$match: {operationType: "insert"}}' )]
147
+ def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , pipeline,
148
+ ChangeStreamDocument . createCodec(BsonDocument , fromProviders(new BsonValueCodecProvider (), new ValueCodecProvider ())))
149
+
150
+ when :
151
+ def cursor = execute(operation, false )
152
+ helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2 }' ))
153
+ ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
154
+
155
+ then :
156
+ next. getResumeToken() != null
157
+ next. getDocumentKey() == BsonDocument . parse(' { _id : 2 }' )
158
+ next. getFullDocument() == BsonDocument . parse(' { _id : 2, x : 2 }' )
159
+ next. getNamespace() == helper. getNamespace()
160
+ next. getOperationType() == OperationType . INSERT
161
+ next. getUpdateDescription() == null
162
+ }
163
+
164
+ def ' should decode update to ChangeStreamDocument ' () {
165
+ given :
166
+ def helper = getHelper()
167
+
168
+ def pipeline = [BsonDocument . parse(' {$match: {operationType: "update"}}' )]
169
+ def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . UPDATE_LOOKUP , pipeline,
170
+ ChangeStreamDocument . createCodec(BsonDocument , fromProviders(new BsonValueCodecProvider (), new ValueCodecProvider ())))
171
+ helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2, y : 3 }' ))
172
+
173
+ when :
174
+ def cursor = execute(operation, false )
175
+ helper. updateOne(BsonDocument . parse(' { _id : 2}' ), BsonDocument . parse(' { $set : {x : 3}, $unset : {y : 1}}' ))
176
+ ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
177
+
178
+ then :
179
+ next. getResumeToken() != null
180
+ next. getDocumentKey() == BsonDocument . parse(' { _id : 2 }' )
181
+ next. getFullDocument() == BsonDocument . parse(' { _id : 2, x : 3 }' )
182
+ next. getNamespace() == helper. getNamespace()
183
+ next. getOperationType() == OperationType . UPDATE
184
+ next. getUpdateDescription() == new UpdateDescription ([' y' ], BsonDocument . parse(' {x : 3}' ))
185
+ }
186
+
187
+ def ' should decode replace to ChangeStreamDocument ' () {
188
+ given :
189
+ def helper = getHelper()
190
+
191
+ def pipeline = [BsonDocument . parse(' {$match: {operationType: "replace"}}' )]
192
+ def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . UPDATE_LOOKUP , pipeline,
193
+ ChangeStreamDocument . createCodec(BsonDocument , fromProviders(new BsonValueCodecProvider (), new ValueCodecProvider ())))
194
+ helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2, y : 3 }' ))
195
+
196
+ when :
197
+ def cursor = execute(operation, false )
198
+ helper. replaceOne(BsonDocument . parse(' { _id : 2}' ), BsonDocument . parse(' { _id : 2, x : 3}' ), false )
199
+ ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
200
+
201
+ then :
202
+ next. getResumeToken() != null
203
+ next. getDocumentKey() == BsonDocument . parse(' { _id : 2 }' )
204
+ next. getFullDocument() == BsonDocument . parse(' { _id : 2, x : 3 }' )
205
+ next. getNamespace() == helper. getNamespace()
206
+ next. getOperationType() == OperationType . REPLACE
207
+ next. getUpdateDescription() == null
208
+ }
209
+
210
+ def ' should decode delete to ChangeStreamDocument ' () {
211
+ given :
212
+ def helper = getHelper()
213
+
214
+ def pipeline = [BsonDocument . parse(' {$match: {operationType: "delete"}}' )]
215
+ def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . UPDATE_LOOKUP , pipeline,
216
+ ChangeStreamDocument . createCodec(BsonDocument , fromProviders(new BsonValueCodecProvider (), new ValueCodecProvider ())))
217
+ helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2, y : 3 }' ))
218
+
219
+ when :
220
+ def cursor = execute(operation, false )
221
+ helper. deleteOne(BsonDocument . parse(' { _id : 2}' ))
222
+ ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
223
+
224
+ then :
225
+ next. getResumeToken() != null
226
+ next. getDocumentKey() == BsonDocument . parse(' { _id : 2 }' )
227
+ next. getFullDocument() == null
228
+ next. getNamespace() == helper. getNamespace()
229
+ next. getOperationType() == OperationType . DELETE
230
+ next. getUpdateDescription() == null
231
+ }
232
+
233
+ def ' should decode invalidate to ChangeStreamDocument ' () {
234
+ given :
235
+ def helper = getHelper()
236
+
237
+ def pipeline = [BsonDocument . parse(' {$match: {operationType: "invalidate"}}' )]
238
+ def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . UPDATE_LOOKUP , pipeline,
239
+ ChangeStreamDocument . createCodec(BsonDocument , fromProviders(new BsonValueCodecProvider (), new ValueCodecProvider ())))
240
+ helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2, y : 3 }' ))
241
+
242
+ when :
243
+ def cursor = execute(operation, false )
244
+ helper. drop()
245
+ ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
246
+
247
+ then :
248
+ next. getResumeToken() != null
249
+ next. getDocumentKey() == null
250
+ next. getFullDocument() == null
251
+ next. getNamespace() == null
252
+ next. getOperationType() == OperationType . INVALIDATE
253
+ next. getUpdateDescription() == null
254
+ }
255
+
136
256
def ' should throw if the _id field is projected out' () {
137
257
given :
138
258
def helper = getHelper()
0 commit comments