Skip to content

Commit fc1602e

Browse files
committed
CSHARP-4144: Implemented ChangeStreamOptions.ShowExpandedEvents option for improved change stream event visibility.
1 parent f219f75 commit fc1602e

18 files changed

+1137
-10
lines changed

src/MongoDB.Driver.Core/ChangeStreamDocument.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ public ChangeStreamDocument(
6262
/// </value>
6363
public CollectionNamespace CollectionNamespace => GetValue<CollectionNamespace>(nameof(CollectionNamespace), null);
6464

65+
/// <summary>
66+
/// Gets ui field from the oplog entry corresponding to the change event.
67+
/// Only present when the showExpandedEvents change stream option is enabled and for the following event types (MongoDB 6.0 and later):
68+
/// <list type="bullet">
69+
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
70+
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
71+
/// <item><description><see cref="ChangeStreamOperationType.Delete"/></description></item>
72+
/// <item><description><see cref="ChangeStreamOperationType.Drop"/></description></item>
73+
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
74+
/// <item><description><see cref="ChangeStreamOperationType.Insert"/></description></item>
75+
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
76+
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
77+
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
78+
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
79+
/// <item><description><see cref="ChangeStreamOperationType.Update"/></description></item>
80+
/// </list>
81+
/// </summary>
82+
/// <value>
83+
/// The UUID of the collection.
84+
/// </value>
85+
public Guid? CollectionUuid => GetValue<Guid?>(nameof(CollectionUuid), null);
86+
6587
/// <summary>
6688
/// Gets the database namespace.
6789
/// </summary>
@@ -122,6 +144,25 @@ public TDocument FullDocumentBeforeChange
122144
}
123145
}
124146

147+
/// <summary>
148+
/// Gets the description for the operation.
149+
/// Only present when the showExpandedEvents change stream option is enabled and for the following event types (MongoDB 6.0 and later):
150+
/// <list type="bullet">
151+
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
152+
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
153+
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
154+
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
155+
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
156+
/// <item><description><see cref="ChangeStreamOperationType.Rename"/></description></item>
157+
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
158+
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
159+
/// </list>
160+
/// </summary>
161+
/// <value>
162+
/// The description of the operation.
163+
/// </value>
164+
public BsonDocument OperationDescription => GetValue<BsonDocument>(nameof(OperationDescription), null);
165+
125166
/// <summary>
126167
/// Gets the type of the operation.
127168
/// </summary>

src/MongoDB.Driver.Core/ChangeStreamDocumentSerializer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ public ChangeStreamDocumentSerializer(
4242

4343
RegisterMember("ClusterTime", "clusterTime", BsonTimestampSerializer.Instance);
4444
RegisterMember("CollectionNamespace", "ns", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
45+
RegisterMember("CollectionUuid", "ui", GuidSerializer.StandardInstance);
4546
RegisterMember("DatabaseNamespace", "ns", ChangeStreamDocumentDatabaseNamespaceSerializer.Instance);
4647
RegisterMember("DocumentKey", "documentKey", BsonDocumentSerializer.Instance);
4748
RegisterMember("FullDocument", "fullDocument", _documentSerializer);
4849
RegisterMember("FullDocumentBeforeChange", "fullDocumentBeforeChange", _documentSerializer);
50+
RegisterMember("OperationDescription", "operationDescription", BsonDocumentSerializer.Instance);
4951
RegisterMember("OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
5052
RegisterMember("RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
5153
RegisterMember("ResumeToken", "_id", BsonDocumentSerializer.Instance);

src/MongoDB.Driver.Core/ChangeStreamOperationType.cs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,34 @@ public enum ChangeStreamOperationType
5151
/// <summary>
5252
/// A dropDatabase operation type.
5353
/// </summary>
54-
DropDatabase
54+
DropDatabase,
55+
/// <summary>
56+
/// A createIndexes operation type.
57+
/// </summary>
58+
CreateIndexes,
59+
/// <summary>
60+
/// A dropIndexes operation type.
61+
/// </summary>
62+
DropIndexes,
63+
/// <summary>
64+
/// A modify operation type.
65+
/// </summary>
66+
Modify,
67+
/// <summary>
68+
/// A create operation type.
69+
/// </summary>
70+
Create,
71+
/// <summary>
72+
/// A shardCollection operation type.
73+
/// </summary>
74+
ShardCollection,
75+
/// <summary>
76+
/// A refineCollectionShardKey operation type.
77+
/// </summary>
78+
RefineCollectionShardKey,
79+
/// <summary>
80+
/// A reshardCollection operation type.
81+
/// </summary>
82+
ReshardCollection
5583
}
5684
}

src/MongoDB.Driver.Core/ChangeStreamOperationTypeSerializer.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ public override ChangeStreamOperationType Deserialize(BsonDeserializationContext
5454
case "rename": return ChangeStreamOperationType.Rename;
5555
case "drop": return ChangeStreamOperationType.Drop;
5656
case "dropDatabase": return ChangeStreamOperationType.DropDatabase;
57+
case "createIndexes": return ChangeStreamOperationType.CreateIndexes;
58+
case "dropIndexes": return ChangeStreamOperationType.DropIndexes;
59+
case "modify": return ChangeStreamOperationType.Modify;
60+
case "create": return ChangeStreamOperationType.Create;
61+
case "shardCollection": return ChangeStreamOperationType.ShardCollection;
62+
case "refineCollectionShardKey": return ChangeStreamOperationType.RefineCollectionShardKey;
63+
case "reshardCollection": return ChangeStreamOperationType.ReshardCollection;
5764
default: return (ChangeStreamOperationType)(-1);
5865
}
5966
}
@@ -73,6 +80,13 @@ public override void Serialize(BsonSerializationContext context, BsonSerializati
7380
case ChangeStreamOperationType.Rename: writer.WriteString("rename"); break;
7481
case ChangeStreamOperationType.Drop: writer.WriteString("drop"); break;
7582
case ChangeStreamOperationType.DropDatabase: writer.WriteString("dropDatabase"); break;
83+
case ChangeStreamOperationType.CreateIndexes: writer.WriteString("createIndexes"); break;
84+
case ChangeStreamOperationType.DropIndexes: writer.WriteString("dropIndexes"); break;
85+
case ChangeStreamOperationType.Modify: writer.WriteString("modify"); break;
86+
case ChangeStreamOperationType.Create: writer.WriteString("create"); break;
87+
case ChangeStreamOperationType.ShardCollection: writer.WriteString("shardCollection"); break;
88+
case ChangeStreamOperationType.RefineCollectionShardKey: writer.WriteString("refineCollectionShardKey"); break;
89+
case ChangeStreamOperationType.ReshardCollection: writer.WriteString("reshardCollection"); break;
7690
default: throw new ArgumentException($"Invalid ChangeStreamOperationType: {value}.", nameof(value));
7791
}
7892
}

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamOperation.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ public interface IChangeStreamOperation<TResult> : IReadOperation<IChangeStreamC
4242
/// </value>
4343
BsonDocument ResumeAfter { get; set; }
4444

45+
/// <summary>
46+
/// Gets or sets whether the change stream should show expanded events (MongoDB 6.0 and later).
47+
/// </summary>
48+
/// <value>
49+
/// The value.
50+
/// </value>
51+
bool? ShowExpandedEvents { get; set; }
52+
4553
/// <summary>
4654
/// Gets or sets the start after value.
4755
/// </summary>
@@ -97,6 +105,7 @@ public class ChangeStreamOperation<TResult> : IChangeStreamOperation<TResult>
97105
private readonly IBsonSerializer<TResult> _resultSerializer;
98106
private BsonDocument _resumeAfter;
99107
private bool _retryRequested;
108+
private bool? _showExpandedEvents;
100109
private BsonDocument _startAfter;
101110
private BsonTimestamp _startAtOperationTime;
102111

@@ -290,6 +299,13 @@ public bool RetryRequested
290299
set => _retryRequested = value;
291300
}
292301

302+
/// <inheritdoc />
303+
public bool? ShowExpandedEvents
304+
{
305+
get => _showExpandedEvents;
306+
set => _showExpandedEvents = value;
307+
}
308+
293309
/// <inheritdoc />
294310
public BsonDocument StartAfter
295311
{
@@ -432,7 +448,8 @@ private BsonDocument CreateChangeStreamStage()
432448
{ "fullDocument", () => ToString(_fullDocument), _fullDocument != ChangeStreamFullDocumentOption.Default },
433449
{ "fullDocumentBeforeChange", () => ToString(_fullDocumentBeforeChangeOption), _fullDocumentBeforeChangeOption != ChangeStreamFullDocumentBeforeChangeOption.Default },
434450
{ "allChangesForCluster", true, _collectionNamespace == null && _databaseNamespace == null },
435-
{ "startAfter", _startAfter, _startAfter != null},
451+
{ "showExpandedEvents", _showExpandedEvents, _showExpandedEvents.HasValue },
452+
{ "startAfter", _startAfter, _startAfter != null },
436453
{ "startAtOperationTime", _startAtOperationTime, _startAtOperationTime != null },
437454
{ "resumeAfter", _resumeAfter, _resumeAfter != null }
438455
};

src/MongoDB.Driver/ChangeStreamHelper.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private static void SetOperationOptions<TResult>(
145145
ChangeStreamOptions options,
146146
ReadConcern readConcern)
147147
{
148-
options = options ?? new ChangeStreamOptions();
148+
options ??= new ChangeStreamOptions();
149149

150150
operation.BatchSize = options.BatchSize;
151151
operation.Collation = options.Collation;
@@ -155,6 +155,7 @@ private static void SetOperationOptions<TResult>(
155155
operation.MaxAwaitTime = options.MaxAwaitTime;
156156
operation.ReadConcern = readConcern;
157157
operation.ResumeAfter = options.ResumeAfter;
158+
operation.ShowExpandedEvents = options.ShowExpandedEvents;
158159
operation.StartAfter = options.StartAfter;
159160
operation.StartAtOperationTime = options.StartAtOperationTime;
160161
}

src/MongoDB.Driver/ChangeStreamOptions.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ChangeStreamOptions
3232
private ChangeStreamFullDocumentBeforeChangeOption _fullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.Default;
3333
private TimeSpan? _maxAwaitTime;
3434
private BsonDocument _resumeAfter;
35+
private bool? _showExpandedEvents;
3536
private BsonDocument _startAfter;
3637
private BsonTimestamp _startAtOperationTime;
3738

@@ -120,6 +121,28 @@ public BsonDocument ResumeAfter
120121
set { _resumeAfter = value; }
121122
}
122123

124+
/// <summary>
125+
/// Gets or sets whether the change stream should show expanded events (MongoDB 6.0 and later).
126+
/// Expanded change stream events include:
127+
/// <list type="bullet">
128+
/// <item><description><see cref="ChangeStreamOperationType.Create"/></description></item>
129+
/// <item><description><see cref="ChangeStreamOperationType.CreateIndexes"/></description></item>
130+
/// <item><description><see cref="ChangeStreamOperationType.DropIndexes"/></description></item>
131+
/// <item><description><see cref="ChangeStreamOperationType.Modify"/></description></item>
132+
/// <item><description><see cref="ChangeStreamOperationType.RefineCollectionShardKey"/></description></item>
133+
/// <item><description><see cref="ChangeStreamOperationType.ReshardCollection"/></description></item>
134+
/// <item><description><see cref="ChangeStreamOperationType.ShardCollection"/></description></item>
135+
/// </list>
136+
/// </summary>
137+
/// <value>
138+
/// The value.
139+
/// </value>
140+
public bool? ShowExpandedEvents
141+
{
142+
get { return _showExpandedEvents; }
143+
set { _showExpandedEvents = value; }
144+
}
145+
123146
/// <summary>
124147
/// Gets or sets the start after.
125148
/// </summary>

tests/MongoDB.Driver.Core.Tests/ChangeStreamDocumentSerializerTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ public void constructor_should_initialize_instance()
3636
var result = new ChangeStreamDocumentSerializer<BsonDocument>(documentSerializer);
3737

3838
result._documentSerializer().Should().BeSameAs(documentSerializer);
39-
result._memberSerializationInfo().Count.Should().Be(11);
39+
result._memberSerializationInfo().Count.Should().Be(13);
4040
AssertRegisteredMember(result, "ClusterTime", "clusterTime", BsonTimestampSerializer.Instance);
4141
AssertRegisteredMember(result, "CollectionNamespace", "ns", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
42+
AssertRegisteredMember(result, "CollectionUuid", "ui", GuidSerializer.StandardInstance);
4243
AssertRegisteredMember(result, "DatabaseNamespace", "ns", ChangeStreamDocumentDatabaseNamespaceSerializer.Instance);
4344
AssertRegisteredMember(result, "DocumentKey", "documentKey", BsonDocumentSerializer.Instance);
4445
AssertRegisteredMember(result, "FullDocument", "fullDocument", documentSerializer);
4546
AssertRegisteredMember(result, "FullDocumentBeforeChange", "fullDocumentBeforeChange", documentSerializer);
47+
AssertRegisteredMember(result, "OperationDescription", "operationDescription", BsonDocumentSerializer.Instance);
4648
AssertRegisteredMember(result, "OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
4749
AssertRegisteredMember(result, "RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
4850
AssertRegisteredMember(result, "ResumeToken", "_id", BsonDocumentSerializer.Instance);

tests/MongoDB.Driver.Core.Tests/ChangeStreamDocumentTests.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,29 @@ public void CollectionNamespace_should_allow_extra_elements()
169169
subject.BackingDocument["ns"]["newField2"]["x"].AsInt32.Should().Be(1);
170170
}
171171

172+
[Fact]
173+
public void CollectionUuid_should_return_expected_result()
174+
{
175+
var value = Guid.NewGuid();
176+
var backingDocument = new BsonDocument { { "other", 1 }, { "ui", new BsonBinaryData(value, GuidRepresentation.Standard) } };
177+
var subject = CreateSubject(backingDocument: backingDocument);
178+
179+
var result = subject.CollectionUuid;
180+
181+
result.Should().Be(value);
182+
}
183+
184+
[Fact]
185+
public void CollectionUuid_should_return_null_when_not_present()
186+
{
187+
var backingDocument = new BsonDocument { { "other", 1 } };
188+
var subject = CreateSubject(backingDocument: backingDocument);
189+
190+
var result = subject.CollectionUuid;
191+
192+
result.Should().NotHaveValue();
193+
}
194+
172195
[Fact]
173196
public void DatabaseNamespace_should_return_expected_result()
174197
{
@@ -289,6 +312,29 @@ public void FullDocument_should_return_null_when_not_present()
289312
result.Should().BeNull();
290313
}
291314

315+
[Fact]
316+
public void OperationDescription_should_return_expected_result()
317+
{
318+
var value = new BsonDocument("x", 1234);
319+
var backingDocument = new BsonDocument { { "other", 1 }, { "operationDescription", value } };
320+
var subject = CreateSubject(backingDocument: backingDocument);
321+
322+
var result = subject.OperationDescription;
323+
324+
result.Should().Be(value);
325+
}
326+
327+
[Fact]
328+
public void OperationDescription_should_return_null_when_not_present()
329+
{
330+
var backingDocument = new BsonDocument { { "other", 1 } };
331+
var subject = CreateSubject(backingDocument: backingDocument);
332+
333+
var result = subject.OperationDescription;
334+
335+
result.Should().BeNull();
336+
}
337+
292338
[Theory]
293339
[InlineData("insert", ChangeStreamOperationType.Insert)]
294340
[InlineData("update", ChangeStreamOperationType.Update)]

tests/MongoDB.Driver.Core.Tests/ChangeStreamOperationTypeSerializerTests.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ public class ChangeStreamOperationTypeSerializerTests
3333
[InlineData("\"rename\"", ChangeStreamOperationType.Rename)]
3434
[InlineData("\"drop\"", ChangeStreamOperationType.Drop)]
3535
[InlineData("\"dropDatabase\"", ChangeStreamOperationType.DropDatabase)]
36+
[InlineData("\"createIndexes\"", ChangeStreamOperationType.CreateIndexes)]
37+
[InlineData("\"dropIndexes\"", ChangeStreamOperationType.DropIndexes)]
38+
[InlineData("\"modify\"", ChangeStreamOperationType.Modify)]
39+
[InlineData("\"create\"", ChangeStreamOperationType.Create)]
40+
[InlineData("\"shardCollection\"", ChangeStreamOperationType.ShardCollection)]
41+
[InlineData("\"refineCollectionShardKey\"", ChangeStreamOperationType.RefineCollectionShardKey)]
42+
[InlineData("\"reshardCollection\"", ChangeStreamOperationType.ReshardCollection)]
3643
public void Deserialize_should_return_expected_result(string json, ChangeStreamOperationType expectedResult)
3744
{
3845
var subject = CreateSubject();
@@ -72,6 +79,13 @@ public void Deserialize_should_return_negative_one_when_input_is_invalid()
7279
[InlineData(ChangeStreamOperationType.Rename, "\"rename\"")]
7380
[InlineData(ChangeStreamOperationType.Drop, "\"drop\"")]
7481
[InlineData(ChangeStreamOperationType.DropDatabase, "\"dropDatabase\"")]
82+
[InlineData(ChangeStreamOperationType.CreateIndexes, "\"createIndexes\"")]
83+
[InlineData(ChangeStreamOperationType.DropIndexes, "\"dropIndexes\"")]
84+
[InlineData(ChangeStreamOperationType.Modify, "\"modify\"")]
85+
[InlineData(ChangeStreamOperationType.Create, "\"create\"")]
86+
[InlineData(ChangeStreamOperationType.ShardCollection, "\"shardCollection\"")]
87+
[InlineData(ChangeStreamOperationType.RefineCollectionShardKey, "\"refineCollectionShardKey\"")]
88+
[InlineData(ChangeStreamOperationType.ReshardCollection, "\"reshardCollection\"")]
7589
public void Serialize_should_have_expected_result(ChangeStreamOperationType value, string expectedResult)
7690
{
7791
var subject = CreateSubject();
@@ -90,7 +104,7 @@ public void Serialize_should_have_expected_result(ChangeStreamOperationType valu
90104

91105
[Theory]
92106
[InlineData(-1)]
93-
[InlineData(8)]
107+
[InlineData(15)]
94108
public void Serialize_should_throw_when_value_is_invalid(int valueAsInt)
95109
{
96110
var subject = CreateSubject();

0 commit comments

Comments
 (0)