Skip to content

Commit 8e31820

Browse files
committed
Add support for 'dropDatabase' and 'rename' operation types
This commit adjusts the ChangeStreamDocument so that it takes in a BsonDocument for the namespace instead of a MongoNamespace object. This is to combat the issue of the dropDatabase operation responding with a ChangeStreamDocument that only contains the databaseName and not the collectionName. Tests were updated to account for this as well. Additionally, a test for the rename operation type was added. JAVA-2966
1 parent 63e565b commit 8e31820

File tree

10 files changed

+269
-55
lines changed

10 files changed

+269
-55
lines changed

driver-async/src/test/unit/com/mongodb/async/client/ChangeStreamIterableSpecification.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ class ChangeStreamIterableSpecification extends Specification {
131131
given:
132132
def count = 0
133133
def cannedResults = ['{_id: 1}', '{_id: 2}', '{_id: 3}'].collect {
134-
new ChangeStreamDocument(RawBsonDocument.parse(it), null, Document.parse(it), BsonDocument.parse(it), null, null)
134+
new ChangeStreamDocument(RawBsonDocument.parse(it), new BsonDocument(), Document.parse(it),
135+
BsonDocument.parse(it), null, null, null)
135136
}
136137
def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults),
137138
cursor(cannedResults), cursor(cannedResults)])

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocument.java

Lines changed: 93 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package com.mongodb.client.model.changestream;
1818

1919
import com.mongodb.MongoNamespace;
20+
import com.mongodb.assertions.Assertions;
2021
import com.mongodb.lang.Nullable;
2122
import org.bson.BsonDocument;
23+
import org.bson.BsonString;
2224
import org.bson.BsonTimestamp;
2325
import org.bson.codecs.Codec;
2426
import org.bson.codecs.configuration.CodecRegistry;
2527
import org.bson.codecs.pojo.annotations.BsonCreator;
2628
import org.bson.codecs.pojo.annotations.BsonId;
29+
import org.bson.codecs.pojo.annotations.BsonIgnore;
2730
import org.bson.codecs.pojo.annotations.BsonProperty;
2831

2932
/**
@@ -39,8 +42,7 @@ public final class ChangeStreamDocument<TDocument> {
3942

4043
@BsonId()
4144
private final BsonDocument resumeToken;
42-
@BsonProperty("ns")
43-
private final MongoNamespace namespace;
45+
private final BsonDocument namespaceDocument;
4446
private final TDocument fullDocument;
4547
private final BsonDocument documentKey;
4648
private final BsonTimestamp clusterTime;
@@ -60,12 +62,12 @@ public final class ChangeStreamDocument<TDocument> {
6062
* UpdateDescription)}
6163
*/
6264
@Deprecated
63-
public ChangeStreamDocument(@BsonProperty("resumeToken") final BsonDocument resumeToken,
64-
@BsonProperty("namespace") final MongoNamespace namespace,
65-
@BsonProperty("fullDocument") final TDocument fullDocument,
66-
@BsonProperty("documentKey") final BsonDocument documentKey,
67-
@BsonProperty("operationType") final OperationType operationType,
68-
@BsonProperty("updateDescription") final UpdateDescription updateDescription) {
65+
public ChangeStreamDocument(final BsonDocument resumeToken,
66+
final MongoNamespace namespace,
67+
final TDocument fullDocument,
68+
final BsonDocument documentKey,
69+
final OperationType operationType,
70+
final UpdateDescription updateDescription) {
6971
this(resumeToken, namespace, fullDocument, documentKey, null, operationType, updateDescription);
7072
}
7173

@@ -80,23 +82,54 @@ public ChangeStreamDocument(@BsonProperty("resumeToken") final BsonDocument resu
8082
* @param operationType the operation type
8183
* @param updateDescription the update description
8284
*/
85+
@Deprecated
86+
public ChangeStreamDocument(final BsonDocument resumeToken,
87+
final MongoNamespace namespace,
88+
final TDocument fullDocument,
89+
final BsonDocument documentKey,
90+
@Nullable final BsonTimestamp clusterTime,
91+
final OperationType operationType,
92+
final UpdateDescription updateDescription) {
93+
this(resumeToken, namespaceToDocument(namespace), fullDocument, documentKey,
94+
clusterTime, operationType, updateDescription);
95+
}
96+
97+
/**
98+
* Creates a new instance
99+
*
100+
* @param resumeToken the resume token
101+
* @param namespaceDocument the BsonDocument representing the namespace
102+
* @param fullDocument the full document
103+
* @param documentKey a document containing the _id of the changed document
104+
* @param clusterTime the cluster time at which the change occured
105+
* @param operationType the operation type
106+
* @param updateDescription the update description
107+
*
108+
* @since 3.8
109+
*/
83110
@BsonCreator
84111
public ChangeStreamDocument(@BsonProperty("resumeToken") final BsonDocument resumeToken,
85-
@BsonProperty("namespace") final MongoNamespace namespace,
112+
@BsonProperty("ns") final BsonDocument namespaceDocument,
86113
@BsonProperty("fullDocument") final TDocument fullDocument,
87114
@BsonProperty("documentKey") final BsonDocument documentKey,
88115
@Nullable @BsonProperty("clusterTime") final BsonTimestamp clusterTime,
89116
@BsonProperty("operationType") final OperationType operationType,
90117
@BsonProperty("updateDescription") final UpdateDescription updateDescription) {
91118
this.resumeToken = resumeToken;
92-
this.namespace = namespace;
119+
this.namespaceDocument = namespaceDocument;
93120
this.documentKey = documentKey;
94121
this.fullDocument = fullDocument;
95122
this.clusterTime = clusterTime;
96123
this.operationType = operationType;
97124
this.updateDescription = updateDescription;
98125
}
99126

127+
private static BsonDocument namespaceToDocument(final MongoNamespace namespace) {
128+
Assertions.notNull("namespace", namespace);
129+
return new BsonDocument("db", new BsonString(namespace.getDatabaseName()))
130+
.append("coll", new BsonString(namespace.getCollectionName()));
131+
}
132+
100133
/**
101134
* Returns the resumeToken
102135
*
@@ -109,10 +142,55 @@ public BsonDocument getResumeToken() {
109142
/**
110143
* Returns the namespace
111144
*
112-
* @return the namespace
145+
* The invalidate operation type does include a MongoNamespace in the ChangeStreamDocument response. The
146+
* dropDatabase operation type includes a MongoNamespace, but does not include a collection name as part
147+
* of the namespace.
148+
*
149+
* @return the namespace. If the namespaceDocument is null or if it is missing either the 'db' or 'coll' keys,
150+
* then this will return null.
113151
*/
152+
@BsonIgnore @Nullable
114153
public MongoNamespace getNamespace() {
115-
return namespace;
154+
if (namespaceDocument == null) {
155+
return null;
156+
}
157+
if (!namespaceDocument.containsKey("db") || !namespaceDocument.containsKey("coll")) {
158+
return null;
159+
}
160+
161+
return new MongoNamespace(namespaceDocument.getString("db").getValue(), namespaceDocument.getString("coll").getValue());
162+
}
163+
164+
/**
165+
* Returns the namespaceDocument
166+
*
167+
* The namespaceDocument is a BsonDocument containing the values associated with a MongoNamespace. The
168+
* 'db' key refers to the database name and the 'coll' key refers to the collection name.
169+
*
170+
* @return the namespaceDocument
171+
* @since 3.8
172+
*/
173+
@BsonProperty("ns")
174+
public BsonDocument getNamespaceDocument() {
175+
return namespaceDocument;
176+
}
177+
178+
/**
179+
* Returns the database name
180+
*
181+
* @return the databaseName. If the namespaceDocument is null or if it is missing the 'db' key, then this will
182+
* return null.
183+
* @since 3.8
184+
*/
185+
@BsonIgnore @Nullable
186+
public String getDatabaseName() {
187+
if (namespaceDocument == null) {
188+
return null;
189+
}
190+
if (!namespaceDocument.containsKey("db")) {
191+
return null;
192+
}
193+
return namespaceDocument.getString("db").getValue();
116194
}
117195

118196
/**
@@ -196,7 +274,7 @@ public boolean equals(final Object o) {
196274
if (resumeToken != null ? !resumeToken.equals(that.resumeToken) : that.resumeToken != null) {
197275
return false;
198276
}
199-
if (namespace != null ? !namespace.equals(that.namespace) : that.namespace != null) {
277+
if (namespaceDocument != null ? !namespaceDocument.equals(that.namespaceDocument) : that.namespaceDocument != null) {
200278
return false;
201279
}
202280
if (fullDocument != null ? !fullDocument.equals(that.fullDocument) : that.fullDocument != null) {
@@ -221,7 +299,7 @@ public boolean equals(final Object o) {
221299
@Override
222300
public int hashCode() {
223301
int result = resumeToken != null ? resumeToken.hashCode() : 0;
224-
result = 31 * result + (namespace != null ? namespace.hashCode() : 0);
302+
result = 31 * result + (namespaceDocument != null ? namespaceDocument.hashCode() : 0);
225303
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0);
226304
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0);
227305
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0);
@@ -234,7 +312,7 @@ public int hashCode() {
234312
public String toString() {
235313
return "ChangeStreamDocument{"
236314
+ "resumeToken=" + resumeToken
237-
+ ", namespace=" + namespace
315+
+ ", namespace=" + getNamespace()
238316
+ ", fullDocument=" + fullDocument
239317
+ ", documentKey=" + documentKey
240318
+ ", clusterTime=" + clusterTime

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocumentCodec.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@
1717
package com.mongodb.client.model.changestream;
1818

1919
import com.mongodb.MongoNamespace;
20-
import org.bson.BsonDocument;
2120
import org.bson.BsonReader;
22-
import org.bson.BsonTimestamp;
2321
import org.bson.BsonWriter;
24-
import org.bson.codecs.BsonDocumentCodec;
25-
import org.bson.codecs.BsonTimestampCodec;
22+
import org.bson.codecs.BsonValueCodecProvider;
2623
import org.bson.codecs.Codec;
2724
import org.bson.codecs.DecoderContext;
2825
import org.bson.codecs.EncoderContext;
@@ -38,18 +35,14 @@
3835
@SuppressWarnings({"unchecked", "rawtypes"})
3936
final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocument<TResult>> {
4037

41-
private static final BsonDocumentCodec BSON_DOCUMENT_CODEC = new BsonDocumentCodec();
4238
private static final OperationTypeCodec OPERATION_TYPE_CODEC = new OperationTypeCodec();
4339

4440
private final Codec<ChangeStreamDocument<TResult>> codec;
4541

4642
ChangeStreamDocumentCodec(final Class<TResult> fullDocumentClass, final CodecRegistry codecRegistry) {
4743

4844
ClassModelBuilder<ChangeStreamDocument> classModelBuilder = ClassModel.builder(ChangeStreamDocument.class);
49-
((PropertyModelBuilder<BsonTimestamp>) classModelBuilder.getProperty("clusterTime")).codec(new BsonTimestampCodec());
50-
((PropertyModelBuilder<BsonDocument>) classModelBuilder.getProperty("documentKey")).codec(BSON_DOCUMENT_CODEC);
5145
((PropertyModelBuilder<TResult>) classModelBuilder.getProperty("fullDocument")).codec(codecRegistry.get(fullDocumentClass));
52-
((PropertyModelBuilder<BsonDocument>) classModelBuilder.getProperty("resumeToken")).codec(BSON_DOCUMENT_CODEC);
5346
((PropertyModelBuilder<OperationType>) classModelBuilder.getProperty("operationType")).codec(OPERATION_TYPE_CODEC);
5447
ClassModel<ChangeStreamDocument> changeStreamDocumentClassModel = classModelBuilder.build();
5548

@@ -59,7 +52,7 @@ final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocu
5952
.register(changeStreamDocumentClassModel)
6053
.build();
6154

62-
CodecRegistry registry = fromRegistries(fromProviders(provider), codecRegistry);
55+
CodecRegistry registry = fromRegistries(fromProviders(provider, new BsonValueCodecProvider()), codecRegistry);
6356
this.codec = (Codec<ChangeStreamDocument<TResult>>) (Codec<? extends ChangeStreamDocument>)
6457
registry.get(ChangeStreamDocument.class);
6558
}

driver-core/src/main/com/mongodb/client/model/changestream/OperationType.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ public enum OperationType {
5555
*/
5656
DROP("drop"),
5757

58+
/**
59+
* The dropDatabase operation type
60+
*
61+
* @since 3.8.2
62+
*/
63+
DROP_DATABASE("dropDatabase"),
64+
65+
/**
66+
* The rename operation type for renaming collections
67+
*
68+
* @since 3.8.2
69+
*/
70+
RENAME("rename"),
71+
5872
/**
5973
* The other operation type.
6074
*

driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,17 @@ public void killAllSessions() {
380380
}
381381
}
382382

383+
public void renameCollection(final MongoNamespace newNamespace) {
384+
try {
385+
new CommandWriteOperation<BsonDocument>("admin",
386+
new BsonDocument("renameCollection", new BsonString(getNamespace().getFullName()))
387+
.append("to", new BsonString(newNamespace.getFullName())),
388+
new BsonDocumentCodec()).execute(getBinding());
389+
} catch (MongoCommandException e) {
390+
// do nothing
391+
}
392+
}
393+
383394
public void runAdminCommand(final BsonDocument command) {
384395
new CommandWriteOperation<BsonDocument>("admin", command, new BsonDocumentCodec()).execute(getBinding());
385396
}

driver-core/src/test/functional/com/mongodb/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.operation
1818

1919
import com.mongodb.MongoChangeStreamException
20+
import com.mongodb.MongoNamespace
2021
import com.mongodb.OperationFunctionalSpecification
2122
import com.mongodb.ReadConcern
2223
import com.mongodb.WriteConcern
@@ -331,6 +332,63 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
331332
waitForLastRelease(getCluster())
332333
}
333334

335+
@IgnoreIf({ !serverVersionAtLeast([4, 0, 1]) })
336+
def 'should decode dropDatabase to ChangeStreamDocument '() {
337+
given:
338+
def helper = getHelper()
339+
340+
def pipeline = [BsonDocument.parse('{$match: {operationType: "dropDatabase"}}')]
341+
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.UPDATE_LOOKUP, pipeline,
342+
ChangeStreamDocument.createCodec(BsonDocument, fromProviders(new BsonValueCodecProvider(), new ValueCodecProvider())),
343+
ChangeStreamLevel.DATABASE)
344+
helper.insertDocuments(BsonDocument.parse('{ _id : 2, x : 2, y : 3 }'))
345+
346+
when:
347+
def cursor = execute(operation, false)
348+
helper.dropDatabase('JavaDriverTest')
349+
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
350+
351+
then:
352+
next.getResumeToken() != null
353+
next.getDocumentKey() == null
354+
next.getFullDocument() == null
355+
next.getDatabaseName() == 'JavaDriverTest'
356+
next.getOperationType() == OperationType.DROP_DATABASE
357+
next.getUpdateDescription() == null
358+
359+
cleanup:
360+
cursor?.close()
361+
waitForLastRelease(getCluster())
362+
}
363+
364+
@IgnoreIf({ !serverVersionAtLeast([4, 0, 1]) })
365+
def 'should decode rename to ChangeStreamDocument '() {
366+
given:
367+
def helper = getHelper()
368+
369+
def pipeline = [BsonDocument.parse('{$match: {operationType: "rename"}}')]
370+
def operation = new ChangeStreamOperation<BsonDocument>(helper.getNamespace(), FullDocument.UPDATE_LOOKUP, pipeline,
371+
ChangeStreamDocument.createCodec(BsonDocument, fromProviders(new BsonValueCodecProvider(), new ValueCodecProvider())))
372+
def newNamespace = new MongoNamespace('JavaDriverTest', 'newCollectionName')
373+
helper.insertDocuments(BsonDocument.parse('{ _id : 2, x : 2, y : 3 }'))
374+
375+
when:
376+
def cursor = execute(operation, false)
377+
helper.renameCollection(newNamespace)
378+
ChangeStreamDocument<BsonDocument> next = next(cursor, false).get(0)
379+
380+
then:
381+
next.getResumeToken() != null
382+
next.getDocumentKey() == null
383+
next.getFullDocument() == null
384+
next.getNamespace() == helper.getNamespace()
385+
next.getOperationType() == OperationType.RENAME
386+
next.getUpdateDescription() == null
387+
388+
cleanup:
389+
cursor?.close()
390+
waitForLastRelease(getCluster())
391+
}
334392

335393
def 'should throw if the _id field is projected out'() {
336394
given:

0 commit comments

Comments
 (0)