Skip to content

Commit 557df86

Browse files
committed
CSHARP-2225: ChangeStreamCursor MoveNext sometimes throws EndOfStreamException
1 parent 1c187c9 commit 557df86

File tree

2 files changed

+45
-4
lines changed

2 files changed

+45
-4
lines changed

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,8 @@ private bool CanResumeAfter(Exception exception)
137137
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
138138
private TDocument DeserializeDocument(RawBsonDocument rawDocument)
139139
{
140-
var slice = rawDocument.Slice;
141-
var bytes = slice.AccessBackingBytes(0);
142-
using (var memoryStream = new MemoryStream(bytes.Array, bytes.Offset, bytes.Count))
143-
using (var reader = new BsonBinaryReader(memoryStream))
140+
using (var stream = new ByteBufferStream(rawDocument.Slice, ownsBuffer: false))
141+
using (var reader = new BsonBinaryReader(stream))
144142
{
145143
var context = BsonDeserializationContext.CreateRoot(reader);
146144
return _documentSerializer.Deserialize(context);

tests/MongoDB.Driver.Core.Tests/Core/Operations/ChangeStreamOperationTests.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,49 @@ public void Execute_should_return_expected_results_for_inserts(
350350
}
351351
}
352352

353+
[SkippableTheory]
354+
[ParameterAttributeData]
355+
public void Execute_should_return_expected_results_for_large_batch(
356+
[Values(1, 2, 3)] int numberOfChunks,
357+
[Values(false, true)] bool async)
358+
{
359+
RequireServer.Check().Supports(Feature.ChangeStreamStage).ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded);
360+
EnsureDatabaseExists();
361+
DropCollection();
362+
363+
var pipeline = new[] { BsonDocument.Parse("{ $match : { operationType : \"insert\" } }") };
364+
var resultSerializer = new ChangeStreamDocumentSerializer<BsonDocument>(BsonDocumentSerializer.Instance);
365+
var messageEncoderSettings = new MessageEncoderSettings();
366+
var subject = new ChangeStreamOperation<ChangeStreamDocument<BsonDocument>>(_collectionNamespace, pipeline, resultSerializer, messageEncoderSettings)
367+
{
368+
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
369+
};
370+
using (var cursor = ExecuteOperation(subject, async))
371+
{
372+
var filler = new string('x', (numberOfChunks - 1) * 65536);
373+
var document = new BsonDocument { { "_id", 1 }, { "filler", filler } };
374+
Insert(document);
375+
376+
ChangeStreamDocument<BsonDocument> changeStreamDocument;
377+
do
378+
{
379+
if (async)
380+
{
381+
cursor.MoveNextAsync().GetAwaiter().GetResult();
382+
}
383+
else
384+
{
385+
cursor.MoveNext();
386+
}
387+
388+
changeStreamDocument = cursor.Current.FirstOrDefault();
389+
}
390+
while (changeStreamDocument == null);
391+
392+
changeStreamDocument.FullDocument.Should().Be(document);
393+
}
394+
}
395+
353396
[SkippableTheory]
354397
[ParameterAttributeData]
355398
public void Execute_should_return_expected_results_for_updates(

0 commit comments

Comments
 (0)