Skip to content

Commit 5fee4af

Browse files
committed
Include string value of operationType in ChangeStreamDocument
* Allows applications using an older driver to access a newer operation type, for which there is no enumerated value in OperationType * Allows operationType to be round-tripped with no information loss JAVA-4470
1 parent 0dbd939 commit 5fee4af

File tree

4 files changed

+103
-33
lines changed

4 files changed

+103
-33
lines changed

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocument.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
package com.mongodb.client.model.changestream;
1818

1919
import com.mongodb.MongoNamespace;
20-
import com.mongodb.assertions.Assertions;
2120
import com.mongodb.lang.Nullable;
2221
import org.bson.BsonDocument;
2322
import org.bson.BsonInt64;
24-
import org.bson.BsonString;
2523
import org.bson.BsonTimestamp;
2624
import org.bson.codecs.Codec;
2725
import org.bson.codecs.configuration.CodecRegistry;
@@ -30,6 +28,8 @@
3028
import org.bson.codecs.pojo.annotations.BsonIgnore;
3129
import org.bson.codecs.pojo.annotations.BsonProperty;
3230

31+
import java.util.Objects;
32+
3333
/**
3434
* Represents the {@code $changeStream} aggregation output document.
3535
*
@@ -48,6 +48,9 @@ public final class ChangeStreamDocument<TDocument> {
4848
private final TDocument fullDocument;
4949
private final BsonDocument documentKey;
5050
private final BsonTimestamp clusterTime;
51+
@BsonProperty("operationType")
52+
private final String operationTypeString;
53+
@BsonIgnore
5154
private final OperationType operationType;
5255
private final UpdateDescription updateDescription;
5356
private final BsonInt64 txnNumber;
@@ -56,7 +59,7 @@ public final class ChangeStreamDocument<TDocument> {
5659
/**
5760
* Creates a new instance
5861
*
59-
* @param operationType the operation type
62+
* @param operationTypeString the operation type
6063
* @param resumeToken the resume token
6164
* @param namespaceDocument the BsonDocument representing the namespace
6265
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
@@ -67,10 +70,10 @@ public final class ChangeStreamDocument<TDocument> {
6770
* @param txnNumber the transaction number
6871
* @param lsid the identifier for the session associated with the transaction
6972
*
70-
* @since 3.11
73+
* @since 4.6
7174
*/
7275
@BsonCreator
73-
public ChangeStreamDocument(@BsonProperty("operationType") final OperationType operationType,
76+
public ChangeStreamDocument(@BsonProperty("operationType") final String operationTypeString,
7477
@BsonProperty("resumeToken") final BsonDocument resumeToken,
7578
@Nullable @BsonProperty("ns") final BsonDocument namespaceDocument,
7679
@Nullable @BsonProperty("to") final BsonDocument destinationNamespaceDocument,
@@ -86,16 +89,42 @@ public ChangeStreamDocument(@BsonProperty("operationType") final OperationType o
8689
this.documentKey = documentKey;
8790
this.fullDocument = fullDocument;
8891
this.clusterTime = clusterTime;
89-
this.operationType = operationType;
92+
this.operationTypeString = operationTypeString;
93+
this.operationType = OperationType.fromString(operationTypeString);
9094
this.updateDescription = updateDescription;
9195
this.txnNumber = txnNumber;
9296
this.lsid = lsid;
9397
}
9498

95-
private static BsonDocument namespaceToDocument(final MongoNamespace namespace) {
96-
Assertions.notNull("namespace", namespace);
97-
return new BsonDocument("db", new BsonString(namespace.getDatabaseName()))
98-
.append("coll", new BsonString(namespace.getCollectionName()));
99+
/**
100+
* Creates a new instance
101+
*
102+
* @param operationType the operation type
103+
* @param resumeToken the resume token
104+
* @param namespaceDocument the BsonDocument representing the namespace
105+
* @param destinationNamespaceDocument the BsonDocument representing the destinatation namespace
106+
* @param fullDocument the full document
107+
* @param documentKey a document containing the _id of the changed document
108+
* @param clusterTime the cluster time at which the change occured
109+
* @param updateDescription the update description
110+
* @param txnNumber the transaction number
111+
* @param lsid the identifier for the session associated with the transaction
112+
*
113+
* @since 3.11
114+
*/
115+
@Deprecated
116+
public ChangeStreamDocument(final OperationType operationType,
117+
final BsonDocument resumeToken,
118+
final BsonDocument namespaceDocument,
119+
final BsonDocument destinationNamespaceDocument,
120+
final TDocument fullDocument,
121+
final BsonDocument documentKey,
122+
final BsonTimestamp clusterTime,
123+
final UpdateDescription updateDescription,
124+
final BsonInt64 txnNumber,
125+
final BsonDocument lsid) {
126+
this(operationType.getValue(), resumeToken, namespaceDocument, destinationNamespaceDocument, fullDocument, documentKey,
127+
clusterTime, updateDescription, txnNumber, lsid);
99128
}
100129

101130
/**
@@ -240,6 +269,24 @@ public BsonTimestamp getClusterTime() {
240269
return clusterTime;
241270
}
242271

272+
273+
/**
274+
* Returns the operation type as a string.
275+
*
276+
* <p>
277+
* This method is useful when using a driver release that has not yet been updated to include a newer operation type in the
278+
* {@link OperationType} enum. In that case, {@link #getOperationType()} will return {@link OperationType#OTHER} and this method can
279+
* be used to retrieve the actual operation type as a string value.
280+
* </p>
281+
*
282+
* @return the operation type as a string
283+
* @since 4.6
284+
* @see #getOperationType()
285+
*/
286+
public String getOperationTypeString() {
287+
return operationTypeString;
288+
}
289+
243290
/**
244291
* Returns the operationType
245292
*
@@ -324,10 +371,10 @@ public boolean equals(final Object o) {
324371
if (documentKey != null ? !documentKey.equals(that.documentKey) : that.documentKey != null) {
325372
return false;
326373
}
327-
if (clusterTime != null ? !clusterTime.equals(that.clusterTime) : that.clusterTime != null) {
374+
if (!Objects.equals(operationTypeString, that.operationTypeString)) {
328375
return false;
329376
}
330-
if (operationType != that.operationType) {
377+
if (clusterTime != null ? !clusterTime.equals(that.clusterTime) : that.clusterTime != null) {
331378
return false;
332379
}
333380
if (updateDescription != null ? !updateDescription.equals(that.updateDescription) : that.updateDescription != null) {
@@ -351,7 +398,7 @@ public int hashCode() {
351398
result = 31 * result + (fullDocument != null ? fullDocument.hashCode() : 0);
352399
result = 31 * result + (documentKey != null ? documentKey.hashCode() : 0);
353400
result = 31 * result + (clusterTime != null ? clusterTime.hashCode() : 0);
354-
result = 31 * result + (operationType != null ? operationType.hashCode() : 0);
401+
result = 31 * result + (operationTypeString != null ? operationTypeString.hashCode() : 0);
355402
result = 31 * result + (updateDescription != null ? updateDescription.hashCode() : 0);
356403
result = 31 * result + (txnNumber != null ? txnNumber.hashCode() : 0);
357404
result = 31 * result + (lsid != null ? lsid.hashCode() : 0);
@@ -361,7 +408,7 @@ public int hashCode() {
361408
@Override
362409
public String toString() {
363410
return "ChangeStreamDocument{"
364-
+ " operationType=" + operationType
411+
+ " operationType=" + operationTypeString
365412
+ ", resumeToken=" + resumeToken
366413
+ ", namespace=" + getNamespace()
367414
+ ", destinationNamespace=" + getDestinationNamespace()

driver-core/src/main/com/mongodb/client/model/changestream/ChangeStreamDocumentCodec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ final class ChangeStreamDocumentCodec<TResult> implements Codec<ChangeStreamDocu
4343

4444
ClassModelBuilder<ChangeStreamDocument> classModelBuilder = ClassModel.builder(ChangeStreamDocument.class);
4545
((PropertyModelBuilder<TResult>) classModelBuilder.getProperty("fullDocument")).codec(codecRegistry.get(fullDocumentClass));
46-
((PropertyModelBuilder<OperationType>) classModelBuilder.getProperty("operationType")).codec(OPERATION_TYPE_CODEC);
4746
ClassModel<ChangeStreamDocument> changeStreamDocumentClassModel = classModelBuilder.build();
4847

4948
PojoCodecProvider provider = PojoCodecProvider.builder()

driver-core/src/test/unit/com/mongodb/client/model/changestream/ChangeStreamDocumentCodecSpecification.groovy

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
4646
then:
4747
BsonDocument.parse(json) == writer.getDocument()
4848

49+
when:
4950
BsonReader bsonReader = new BsonDocumentReader(writer.getDocument())
5051
ChangeStreamDocument actual = codec.decode(bsonReader, DecoderContext.builder().build())
5152

@@ -54,7 +55,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
5455

5556
where:
5657
changeStreamDocument << [
57-
new ChangeStreamDocument<Document>(OperationType.INSERT,
58+
new ChangeStreamDocument<Document>(OperationType.INSERT.value,
5859
BsonDocument.parse('{token: true}'),
5960
BsonDocument.parse('{db: "engineering", coll: "users"}'),
6061
null,
@@ -64,7 +65,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
6465
,
6566
null, null, null
6667
),
67-
new ChangeStreamDocument<Document>(OperationType.UPDATE,
68+
new ChangeStreamDocument<Document>(OperationType.UPDATE.value,
6869
BsonDocument.parse('{token: true}'),
6970
BsonDocument.parse('{db: "engineering", coll: "users"}'),
7071
null,
@@ -75,7 +76,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
7576
new UpdateDescription(['phoneNumber'], BsonDocument.parse('{email: "[email protected]"}'), null),
7677
null, null
7778
),
78-
new ChangeStreamDocument<Document>(OperationType.UPDATE,
79+
new ChangeStreamDocument<Document>(OperationType.UPDATE.value,
7980
BsonDocument.parse('{token: true}'),
8081
BsonDocument.parse('{db: "engineering", coll: "users"}'),
8182
null,
@@ -87,7 +88,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
8788
singletonList(new TruncatedArray('education', 2))),
8889
null, null
8990
),
90-
new ChangeStreamDocument<Document>(OperationType.REPLACE,
91+
new ChangeStreamDocument<Document>(OperationType.REPLACE.value,
9192
BsonDocument.parse('{token: true}'),
9293
BsonDocument.parse('{db: "engineering", coll: "users"}'),
9394
null,
@@ -97,7 +98,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
9798
,
9899
null, null, null
99100
),
100-
new ChangeStreamDocument<Document>(OperationType.DELETE,
101+
new ChangeStreamDocument<Document>(OperationType.DELETE.value,
101102
BsonDocument.parse('{token: true}'),
102103
BsonDocument.parse('{db: "engineering", coll: "users"}'),
103104
null,
@@ -107,7 +108,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
107108
,
108109
null, null, null
109110
),
110-
new ChangeStreamDocument<Document>(OperationType.DROP,
111+
new ChangeStreamDocument<Document>(OperationType.DROP.value,
111112
BsonDocument.parse('{token: true}'),
112113
BsonDocument.parse('{db: "engineering", coll: "users"}'),
113114
null,
@@ -117,7 +118,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
117118
,
118119
null, null, null
119120
),
120-
new ChangeStreamDocument<Document>(OperationType.RENAME,
121+
new ChangeStreamDocument<Document>(OperationType.RENAME.value,
121122
BsonDocument.parse('{token: true}'),
122123
BsonDocument.parse('{db: "engineering", coll: "users"}'),
123124
BsonDocument.parse('{db: "engineering", coll: "people"}'),
@@ -127,7 +128,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
127128
,
128129
null, null, null
129130
),
130-
new ChangeStreamDocument<Document>(OperationType.DROP_DATABASE,
131+
new ChangeStreamDocument<Document>(OperationType.DROP_DATABASE.value,
131132
BsonDocument.parse('{token: true}'),
132133
BsonDocument.parse('{db: "engineering"}'),
133134
null,
@@ -137,7 +138,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
137138
,
138139
null, null, null
139140
),
140-
new ChangeStreamDocument<Document>(OperationType.INVALIDATE,
141+
new ChangeStreamDocument<Document>(OperationType.INVALIDATE.value,
141142
BsonDocument.parse('{token: true}'),
142143
null,
143144
null,
@@ -147,7 +148,7 @@ class ChangeStreamDocumentCodecSpecification extends Specification {
147148
,
148149
null, null, null
149150
),
150-
new ChangeStreamDocument<Document>(OperationType.INSERT,
151+
new ChangeStreamDocument<Document>(OperationType.INSERT.value,
151152
BsonDocument.parse('{token: true}'),
152153
BsonDocument.parse('{db: "engineering", coll: "users"}'),
153154
null,

driver-core/src/test/unit/com/mongodb/client/model/changestream/ChangeStreamDocumentSpecification.groovy

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ class ChangeStreamDocumentSpecification extends Specification {
4040
def clusterTime = new BsonTimestamp(1234, 2)
4141
def operationType = OperationType.UPDATE
4242
def updateDesc = new UpdateDescription(['a', 'b'], BsonDocument.parse('{c: 1}'), null)
43-
def txnNumber = new BsonInt64(1);
44-
def lsid = BsonDocument.parse('{id: 1, uid: 1}');
43+
def txnNumber = new BsonInt64(1)
44+
def lsid = BsonDocument.parse('{id: 1, uid: 1}')
4545

4646
when:
47-
def changeStreamDocument = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken, namespaceDocument,
47+
def changeStreamDocument = new ChangeStreamDocument<BsonDocument>(operationType.value, resumeToken, namespaceDocument,
4848
destinationNamespaceDocument, fullDocument, documentKey, clusterTime, updateDesc, null, null)
4949

5050
then:
@@ -56,14 +56,36 @@ class ChangeStreamDocumentSpecification extends Specification {
5656
changeStreamDocument.getNamespaceDocument() == namespaceDocument
5757
changeStreamDocument.getDestinationNamespace() == destinationNamespace
5858
changeStreamDocument.getDestinationNamespaceDocument() == destinationNamespaceDocument
59+
changeStreamDocument.getOperationTypeString() == operationType.value
5960
changeStreamDocument.getOperationType() == operationType
6061
changeStreamDocument.getUpdateDescription() == updateDesc
6162
changeStreamDocument.getDatabaseName() == namespace.getDatabaseName()
6263
changeStreamDocument.getTxnNumber() == null
6364
changeStreamDocument.getLsid() == null
6465

6566
when:
66-
def changeStreamDocumentWithTxnInfo = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken,
67+
//noinspection GrDeprecatedAPIUsage
68+
changeStreamDocument = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken, namespaceDocument,
69+
destinationNamespaceDocument, fullDocument, documentKey, clusterTime, updateDesc, null, null)
70+
71+
then:
72+
changeStreamDocument.getResumeToken() == resumeToken
73+
changeStreamDocument.getFullDocument() == fullDocument
74+
changeStreamDocument.getDocumentKey() == documentKey
75+
changeStreamDocument.getClusterTime() == clusterTime
76+
changeStreamDocument.getNamespace() == namespace
77+
changeStreamDocument.getNamespaceDocument() == namespaceDocument
78+
changeStreamDocument.getDestinationNamespace() == destinationNamespace
79+
changeStreamDocument.getDestinationNamespaceDocument() == destinationNamespaceDocument
80+
changeStreamDocument.getOperationTypeString() == operationType.value
81+
changeStreamDocument.getOperationType() == operationType
82+
changeStreamDocument.getUpdateDescription() == updateDesc
83+
changeStreamDocument.getDatabaseName() == namespace.getDatabaseName()
84+
changeStreamDocument.getTxnNumber() == null
85+
changeStreamDocument.getLsid() == null
86+
87+
when:
88+
def changeStreamDocumentWithTxnInfo = new ChangeStreamDocument<BsonDocument>(operationType.value, resumeToken,
6789
namespaceDocument, destinationNamespaceDocument, fullDocument, documentKey, clusterTime, updateDesc,
6890
txnNumber, lsid)
6991

@@ -76,6 +98,7 @@ class ChangeStreamDocumentSpecification extends Specification {
7698
changeStreamDocumentWithTxnInfo.getNamespaceDocument() == namespaceDocument
7799
changeStreamDocumentWithTxnInfo.getDestinationNamespace() == destinationNamespace
78100
changeStreamDocumentWithTxnInfo.getDestinationNamespaceDocument() == destinationNamespaceDocument
101+
changeStreamDocumentWithTxnInfo.getOperationTypeString() == operationType.value
79102
changeStreamDocumentWithTxnInfo.getOperationType() == operationType
80103
changeStreamDocumentWithTxnInfo.getUpdateDescription() == updateDesc
81104
changeStreamDocumentWithTxnInfo.getDatabaseName() == namespace.getDatabaseName()
@@ -91,8 +114,8 @@ class ChangeStreamDocumentSpecification extends Specification {
91114
def clusterTime = new BsonTimestamp(1234, 2)
92115
def operationType = OperationType.DROP_DATABASE
93116
def updateDesc = new UpdateDescription(['a', 'b'], BsonDocument.parse('{c: 1}'), emptyList())
94-
def changeStreamDocumentNullNamespace = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken, (BsonDocument) null,
95-
(BsonDocument) null, fullDocument, documentKey, clusterTime, updateDesc, null, null)
117+
def changeStreamDocumentNullNamespace = new ChangeStreamDocument<BsonDocument>(operationType.value, resumeToken,
118+
(BsonDocument) null, (BsonDocument) null, fullDocument, documentKey, clusterTime, updateDesc, null, null)
96119

97120
expect:
98121
changeStreamDocumentNullNamespace.getDatabaseName() == null
@@ -113,9 +136,9 @@ class ChangeStreamDocumentSpecification extends Specification {
113136
def operationType = OperationType.DROP_DATABASE
114137
def updateDesc = new UpdateDescription(['a', 'b'], BsonDocument.parse('{c: 1}'), singletonList(new TruncatedArray('d', 1)))
115138

116-
def changeStreamDocument = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken, namespaceDocument,
139+
def changeStreamDocument = new ChangeStreamDocument<BsonDocument>(operationType.value, resumeToken, namespaceDocument,
117140
(BsonDocument) null, fullDocument, documentKey, clusterTime, updateDesc, null, null)
118-
def changeStreamDocumentEmptyNamespace = new ChangeStreamDocument<BsonDocument>(operationType, resumeToken,
141+
def changeStreamDocumentEmptyNamespace = new ChangeStreamDocument<BsonDocument>(operationType.value, resumeToken,
119142
namespaceDocumentEmpty, (BsonDocument) null, fullDocument, documentKey, clusterTime, updateDesc,
120143
null, null)
121144

0 commit comments

Comments
 (0)