Skip to content

Commit 0e2bc5f

Browse files
authored
CSHARP-4633: Avoiding Errors if Change Stream Events Exceed 16MB (#1209)
1 parent c249251 commit 0e2bc5f

15 files changed

+398
-12
lines changed

src/MongoDB.Driver.Core/ChangeStreamDocument.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ public TDocument FullDocumentBeforeChange
223223
/// </value>
224224
public BsonDocument ResumeToken => GetValue<BsonDocument>(nameof(ResumeToken), null);
225225

226+
/// <summary>
227+
/// Gets the split event.
228+
/// </summary>
229+
public ChangeStreamSplitEvent SplitEvent => GetValue<ChangeStreamSplitEvent>(nameof(SplitEvent), null);
230+
226231
/// <summary>
227232
/// Gets the update description.
228233
/// </summary>

src/MongoDB.Driver.Core/ChangeStreamDocumentSerializer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public ChangeStreamDocumentSerializer(
5252
RegisterMember("OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
5353
RegisterMember("RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
5454
RegisterMember("ResumeToken", "_id", BsonDocumentSerializer.Instance);
55+
RegisterMember("SplitEvent", "splitEvent", ChangeStreamSplitEventSerializer.Instance);
5556
RegisterMember("UpdateDescription", "updateDescription", ChangeStreamUpdateDescriptionSerializer.Instance);
5657
RegisterMember("WallTime", "wallTime", DateTimeSerializer.UtcInstance);
5758
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Driver.Core.Misc;
17+
18+
namespace MongoDB.Driver
19+
{
20+
/// <summary>
21+
/// Change stream splitEvent data.
22+
/// </summary>
23+
public sealed class ChangeStreamSplitEvent
24+
{
25+
/// <summary>
26+
/// Initializes a new instance of the <see cref="ChangeStreamSplitEvent" /> class.
27+
/// </summary>
28+
/// <param name="fragment">Fragment index.</param>
29+
/// <param name="of">Total number of fragments.</param>
30+
public ChangeStreamSplitEvent(int fragment, int of)
31+
{
32+
Fragment = Ensure.IsGreaterThanZero(fragment, nameof(of));
33+
Of = Ensure.IsGreaterThanZero(of, nameof(of));
34+
}
35+
36+
/// <summary>
37+
/// Gets the fragment index, starting at 1.
38+
/// </summary>
39+
public int Fragment { get; }
40+
41+
/// <summary>
42+
/// Total number of fragments for the event.
43+
/// </summary>
44+
public int Of { get; }
45+
}
46+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using MongoDB.Bson.IO;
18+
using MongoDB.Bson.Serialization;
19+
using MongoDB.Bson.Serialization.Serializers;
20+
21+
namespace MongoDB.Driver
22+
{
23+
internal class ChangeStreamSplitEventSerializer : SealedClassSerializerBase<ChangeStreamSplitEvent>
24+
{
25+
#region static
26+
27+
public static ChangeStreamSplitEventSerializer Instance { get; } = new ChangeStreamSplitEventSerializer();
28+
29+
#endregion
30+
31+
protected override ChangeStreamSplitEvent DeserializeValue(BsonDeserializationContext context, BsonDeserializationArgs args)
32+
{
33+
var reader = context.Reader;
34+
var fragment = -1;
35+
var of = -1;
36+
37+
reader.ReadStartDocument();
38+
while (reader.ReadBsonType() != 0)
39+
{
40+
var fieldName = reader.ReadName();
41+
switch (fieldName)
42+
{
43+
case "fragment":
44+
fragment = reader.ReadInt32();
45+
break;
46+
47+
case "of":
48+
of = reader.ReadInt32();
49+
break;
50+
default:
51+
throw new FormatException($"Invalid field name: \"{fieldName}\".");
52+
}
53+
}
54+
reader.ReadEndDocument();
55+
56+
return new(fragment, of);
57+
}
58+
59+
protected override void SerializeValue(BsonSerializationContext context, BsonSerializationArgs args, ChangeStreamSplitEvent value)
60+
{
61+
var writer = context.Writer;
62+
writer.WriteStartDocument();
63+
writer.WriteName("fragment");
64+
writer.WriteInt32(value.Fragment);
65+
writer.WriteName("of");
66+
writer.WriteInt32(value.Of);
67+
writer.WriteEndDocument();
68+
}
69+
}
70+
}

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class Feature
5151
private static readonly Feature __changeStreamForDatabase = new Feature("ChangeStreamForDatabase", WireVersion.Server40);
5252
private static readonly Feature __changeStreamPostBatchResumeToken = new Feature("ChangeStreamPostBatchResumeToken", WireVersion.Server40);
5353
private static readonly Feature __changeStreamPrePostImages = new Feature("ChangeStreamPrePostImages", WireVersion.Server60);
54+
private static readonly Feature __changeStreamSplitEventStage = new Feature("ChangeStreamSplitEventStage", WireVersion.Server70);
5455
private static readonly Feature __clientSideEncryption = new Feature("ClientSideEncryption", WireVersion.Server42);
5556
private static readonly Feature __clusteredIndexes = new Feature("ClusteredIndexes", WireVersion.Server53);
5657
private static readonly Feature __collation = new Feature("Collation", WireVersion.Server34);
@@ -292,6 +293,11 @@ public class Feature
292293
/// </summary>
293294
public static Feature ChangeStreamPrePostImages => __changeStreamPrePostImages;
294295

296+
/// <summary>
297+
/// Gets the change stream splitEvent stage feature.
298+
/// </summary>
299+
public static Feature ChangeStreamSplitEventStage => __changeStreamSplitEventStage;
300+
295301
/// <summary>
296302
/// Gets the client side encryption feature.
297303
/// </summary>

src/MongoDB.Driver/IAggregateFluentExtensions.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ public static IAggregateFluent<TNewResult> BucketAutoForLinq2<TResult, TValue, T
150150
return aggregate.AppendStage(PipelineStageDefinitionBuilder.BucketAutoForLinq2(groupBy, buckets, output, options));
151151
}
152152

153+
/// <summary>
154+
/// Appends a $changeStreamSplitLargeEvent stage to the pipeline.
155+
/// </summary>
156+
/// <typeparam name="TResult">The type of the result.</typeparam>
157+
/// <param name="aggregate">The aggregate.</param>
158+
/// <returns>The fluent aggregate interface.</returns>
159+
public static IAggregateFluent<ChangeStreamDocument<TResult>> ChangeStreamSplitLargeEvent<TResult>(this IAggregateFluent<ChangeStreamDocument<TResult>> aggregate)
160+
{
161+
Ensure.IsNotNull(aggregate, nameof(aggregate));
162+
return aggregate.AppendStage(PipelineStageDefinitionBuilder.ChangeStreamSplitLargeEvent<TResult>());
163+
}
164+
153165
/// <summary>
154166
/// Appends a $densify stage to the pipeline.
155167
/// </summary>

src/MongoDB.Driver/PipelineDefinitionBuilder.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,18 @@ public static PipelineDefinition<TInput, ChangeStreamDocument<TIntermediate>> Ch
300300
return pipeline.AppendStage(PipelineStageDefinitionBuilder.ChangeStream<TIntermediate>(options));
301301
}
302302

303+
/// <summary>
304+
/// Appends a $changeStreamSplitLargeEvent stage.
305+
/// </summary>
306+
/// <typeparam name="TInput">The type of the input documents.</typeparam>
307+
/// <param name="pipeline">The pipeline.</param>
308+
/// <returns>A new pipeline with an additional stage.</returns>
309+
public static PipelineDefinition<ChangeStreamDocument<TInput>, ChangeStreamDocument<TInput>> ChangeStreamSplitLargeEvent<TInput>(
310+
this PipelineDefinition<ChangeStreamDocument<TInput>, ChangeStreamDocument<TInput>> pipeline)
311+
{
312+
return pipeline.AppendStage(PipelineStageDefinitionBuilder.ChangeStreamSplitLargeEvent<TInput>());
313+
}
314+
303315
/// <summary>
304316
/// Appends a $count stage to the pipeline.
305317
/// </summary>

src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,14 @@ public static PipelineStageDefinition<TInput, ChangeStreamDocument<TInput>> Chan
382382
return stage;
383383
}
384384

385+
/// <summary>
386+
/// Creates a $changeStreamSplitLargeEvent stage.
387+
/// </summary>
388+
/// <typeparam name="TInput">The type of the input documents.</typeparam>
389+
/// <returns>The stage.</returns>
390+
public static PipelineStageDefinition<ChangeStreamDocument<TInput>, ChangeStreamDocument<TInput>> ChangeStreamSplitLargeEvent<TInput>() =>
391+
(PipelineStageDefinition<ChangeStreamDocument<TInput>, ChangeStreamDocument<TInput>>)new BsonDocument("$changeStreamSplitLargeEvent", new BsonDocument());
392+
385393
/// <summary>
386394
/// Creates a $count stage.
387395
/// </summary>

tests/MongoDB.Driver.Core.Tests/ChangeStreamDocumentSerializerTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2017-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -36,7 +36,7 @@ public void constructor_should_initialize_instance()
3636
var result = new ChangeStreamDocumentSerializer<BsonDocument>(documentSerializer);
3737

3838
result._documentSerializer().Should().BeSameAs(documentSerializer);
39-
result._memberSerializationInfo().Count.Should().Be(14);
39+
result._memberSerializationInfo().Count.Should().Be(15);
4040
AssertRegisteredMember(result, "ClusterTime", "clusterTime", BsonTimestampSerializer.Instance);
4141
AssertRegisteredMember(result, "CollectionNamespace", "ns", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
4242
AssertRegisteredMember(result, "CollectionUuid", "ui", GuidSerializer.StandardInstance);
@@ -49,6 +49,7 @@ public void constructor_should_initialize_instance()
4949
AssertRegisteredMember(result, "OperationType", "operationType", ChangeStreamOperationTypeSerializer.Instance);
5050
AssertRegisteredMember(result, "RenameTo", "to", ChangeStreamDocumentCollectionNamespaceSerializer.Instance);
5151
AssertRegisteredMember(result, "ResumeToken", "_id", BsonDocumentSerializer.Instance);
52+
AssertRegisteredMember(result, "SplitEvent", "splitEvent", ChangeStreamSplitEventSerializer.Instance);
5253
AssertRegisteredMember(result, "UpdateDescription", "updateDescription", ChangeStreamUpdateDescriptionSerializer.Instance);
5354
AssertRegisteredMember(result, "WallTime", "wallTime", DateTimeSerializer.UtcInstance);
5455
}

tests/MongoDB.Driver.Core.Tests/ChangeStreamDocumentTests.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2018-present MongoDB Inc.
1+
/* Copyright 2010-present MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -461,6 +461,18 @@ public void ResumeToken_should_return_null_when_not_present()
461461
result.Should().BeNull();
462462
}
463463

464+
[Fact]
465+
public void SplitEvent_should_return_null_when_not_present()
466+
{
467+
var value = new BsonDocument("x", 1234);
468+
var backingDocument = new BsonDocument { { "other", 1 } };
469+
var subject = CreateSubject(backingDocument: backingDocument);
470+
471+
var result = subject.SplitEvent;
472+
473+
result.Should().BeNull();
474+
}
475+
464476
[Theory]
465477
[InlineData("{}", new string[0], null)]
466478
[InlineData("{ x : 1 }", new[] { "a", "b", "c" }, null)]

0 commit comments

Comments
 (0)