Skip to content

Commit 507e0e4

Browse files
committed
CSHARP-2023: More progres and lots of tests.
1 parent 42ad0f1 commit 507e0e4

36 files changed

+1557
-173
lines changed

src/MongoDB.Driver/ChangeStreamFullDocumentOption.cs renamed to src/MongoDB.Driver.Core/ChangeStreamFullDocumentOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public enum ChangeStreamFullDocumentOption
2323
/// <summary>
2424
/// Do not return the full document.
2525
/// </summary>
26-
None = 0,
26+
Default = 0,
2727
/// <summary>
2828
/// Lookup the full document and return it.
2929
/// </summary>
30-
Lookup
30+
UpdateLookup
3131
}
3232
}

src/MongoDB.Driver/ChangeStreamOutput.cs renamed to src/MongoDB.Driver.Core/ChangeStreamOutput.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public ChangeStreamOutput(
5151
{
5252
_id = Ensure.IsNotNull(id, nameof(id));
5353
_operationType = operationType;
54-
_collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
54+
_collectionNamespace = collectionNamespace; // can be null when operationType is Invalidate
5555
_documentKey = documentKey; // can be null
5656
_updateDescription = updateDescription; // can be null
5757
_fullDocument = fullDocument; // can be null

src/MongoDB.Driver/ChangeStreamOutputSerializer.cs renamed to src/MongoDB.Driver.Core/ChangeStreamOutputSerializer.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@ public class ChangeStreamOutputSerializer<TDocument> : SealedClassSerializerBase
3232
// private fields
3333
private readonly IBsonSerializer<TDocument> _documentSerializer;
3434
private readonly IBsonSerializer<ChangeStreamOperationType> _operationTypeSerializer;
35-
private readonly ChangeStreamOptions _options;
35+
private readonly ChangeStreamFullDocumentOption _fullDocument;
3636
private readonly ChangeStreamUpdateDescriptionSerializer _updateDescriptionSerializer;
3737

3838
// constructors
3939
/// <summary>
4040
/// Initializes a new instance of the <see cref="ChangeStreamOutputSerializer{TDocument}"/> class.
4141
/// </summary>
4242
/// <param name="documentSerializer">The document serializer.</param>
43-
/// <param name="options">The options.</param>
43+
/// <param name="fullDocument">The options.</param>
4444
public ChangeStreamOutputSerializer(
4545
IBsonSerializer<TDocument> documentSerializer,
46-
ChangeStreamOptions options)
46+
ChangeStreamFullDocumentOption fullDocument)
4747
{
4848
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
49-
_options = options;
49+
_fullDocument = fullDocument;
5050

5151
_operationTypeSerializer = new ChangeStreamOperationTypeSerializer();
5252
_updateDescriptionSerializer = new ChangeStreamUpdateDescriptionSerializer();
@@ -65,6 +65,7 @@ public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationCon
6565
ChangeStreamOperationType? operationType = null;
6666
ChangeStreamUpdateDescription updateDescription = null;
6767

68+
reader.ReadStartDocument();
6869
while (reader.ReadBsonType() != 0)
6970
{
7071
var fieldName = reader.ReadName();
@@ -83,21 +84,30 @@ public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationCon
8384
break;
8485

8586
case "fullDocument":
86-
fullDocument = _documentSerializer.Deserialize(context);
87+
if (reader.CurrentBsonType == BsonType.Null)
88+
{
89+
reader.ReadNull();
90+
fullDocument = default(TDocument);
91+
}
92+
else
93+
{
94+
fullDocument = _documentSerializer.Deserialize(context);
95+
}
8796
break;
8897

8998
case "operationType":
9099
operationType = _operationTypeSerializer.Deserialize(context);
91100
break;
92101

93-
case "udpateDescription":
94-
_updateDescriptionSerializer.Deserialize(context);
102+
case "updateDescription":
103+
updateDescription = _updateDescriptionSerializer.Deserialize(context);
95104
break;
96105

97106
default:
98107
throw new FormatException($"Invalid field name: \"{fieldName}\".");
99108
}
100109
}
110+
reader.ReadEndDocument();
101111

102112
return new ChangeStreamOutput<TDocument>(
103113
id,
@@ -186,7 +196,7 @@ private bool ShouldSerializeFullDocument(ChangeStreamOutput<TDocument> value)
186196
return true;
187197

188198
case ChangeStreamOperationType.Update:
189-
return _options == null || _options.FullDocument.HasValue && _options.FullDocument.Value == ChangeStreamFullDocumentOption.Lookup;
199+
return _fullDocument == ChangeStreamFullDocumentOption.UpdateLookup;
190200

191201
default:
192202
return false;

src/MongoDB.Driver/ChangeStreamUpdateDescriptionSerializer.cs renamed to src/MongoDB.Driver.Core/ChangeStreamUpdateDescriptionSerializer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ protected override ChangeStreamUpdateDescription DeserializeValue(BsonDeserializ
4545
BsonDocument updatedFields = null;
4646
string[] removedFields = null;
4747

48+
reader.ReadStartDocument();
4849
while (reader.ReadBsonType() != 0)
4950
{
5051
var fieldName = reader.ReadName();
@@ -62,6 +63,7 @@ protected override ChangeStreamUpdateDescription DeserializeValue(BsonDeserializ
6263
throw new FormatException($"Invalid field name: \"{fieldName}\".");
6364
}
6465
}
66+
reader.ReadEndDocument();
6567

6668
return new ChangeStreamUpdateDescription(updatedFields, removedFields);
6769
}

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class Feature
3434
private static readonly Feature __aggregateOut = new Feature("Aggregate", new SemanticVersion(2, 6, 0));
3535
private static readonly ArrayFiltersFeature __arrayFilters = new ArrayFiltersFeature("ArrayFilters", new SemanticVersion(3, 5, 11));
3636
private static readonly Feature __bypassDocumentValidation = new Feature("BypassDocumentValidation", new SemanticVersion(3, 2, 0));
37+
private static readonly Feature __changeStreamStage = new Feature("ChangeStreamStage", new SemanticVersion(3, 5, 11));
3738
private static readonly CollationFeature __collation = new CollationFeature("Collation", new SemanticVersion(3, 3, 11));
3839
private static readonly CommandsThatWriteAcceptWriteConcernFeature __commandsThatWriteAcceptWriteConcern = new CommandsThatWriteAcceptWriteConcernFeature("CommandsThatWriteAcceptWriteConcern", new SemanticVersion(3, 3, 11));
3940
private static readonly Feature __createIndexesCommand = new Feature("CreateIndexesCommand", new SemanticVersion(3, 0, 0));
@@ -111,6 +112,11 @@ public class Feature
111112
/// </summary>
112113
public static Feature BypassDocumentValidation => __bypassDocumentValidation;
113114

115+
/// <summary>
116+
/// Gets the aggregate $changeStream stage feature.
117+
/// </summary>
118+
public static Feature ChangeStreamStage => __changeStreamStage;
119+
114120
/// <summary>
115121
/// Gets the collation feature.
116122
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>
4040
private int? _batchSize;
4141
private Collation _collation;
4242
private readonly CollectionNamespace _collectionNamespace;
43+
private TimeSpan? _maxAwaitTime;
4344
private TimeSpan? _maxTime;
4445
private readonly MessageEncoderSettings _messageEncoderSettings;
4546
private readonly IReadOnlyList<BsonDocument> _pipeline;
@@ -108,6 +109,18 @@ public CollectionNamespace CollectionNamespace
108109
get { return _collectionNamespace; }
109110
}
110111

112+
/// <summary>
113+
/// Gets or sets the maximum await time.
114+
/// </summary>
115+
/// <value>
116+
/// The maximum await time.
117+
/// </value>
118+
public TimeSpan? MaxAwaitTime
119+
{
120+
get { return _maxAwaitTime; }
121+
set { _maxAwaitTime = value; }
122+
}
123+
111124
/// <summary>
112125
/// Gets or sets the maximum time the server should spend on this operation.
113126
/// </summary>
@@ -288,7 +301,8 @@ private AsyncCursor<TResult> CreateCursorFromCursorResult(IChannelSourceHandle c
288301
_batchSize,
289302
null, // limit
290303
_resultSerializer,
291-
MessageEncoderSettings);
304+
MessageEncoderSettings,
305+
_maxAwaitTime);
292306
}
293307

294308
private AsyncCursor<TResult> CreateCursorFromInlineResult(BsonDocument command, AggregateResult result)
@@ -302,7 +316,8 @@ private AsyncCursor<TResult> CreateCursorFromInlineResult(BsonDocument command,
302316
null, // batchSize
303317
null, // limit
304318
_resultSerializer,
305-
MessageEncoderSettings);
319+
MessageEncoderSettings,
320+
_maxAwaitTime);
306321
}
307322

308323
private void EnsureIsReadOnlyPipeline()
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/* Copyright 2017 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using MongoDB.Bson;
17+
using MongoDB.Bson.IO;
18+
using MongoDB.Bson.Serialization;
19+
using MongoDB.Driver.Core.Bindings;
20+
using MongoDB.Driver.Core.Misc;
21+
using System;
22+
using System.Collections.Generic;
23+
using System.IO;
24+
using System.Threading;
25+
using System.Threading.Tasks;
26+
27+
namespace MongoDB.Driver.Core.Operations
28+
{
29+
/// <summary>
30+
/// A change stream cursor.
31+
/// </summary>
32+
/// <typeparam name="TDocument">The type of the output documents.</typeparam>
33+
/// <seealso cref="MongoDB.Driver.IAsyncCursor{TOutput}" />
34+
public sealed class ChangeStreamCursor<TDocument> : IAsyncCursor<TDocument>
35+
{
36+
// private fields
37+
private readonly IReadBinding _binding;
38+
private readonly ChangeStreamOperation<TDocument> _changeStreamOperation;
39+
private IEnumerable<TDocument> _current;
40+
private IAsyncCursor<RawBsonDocument> _cursor;
41+
private bool _disposed;
42+
private IBsonSerializer<TDocument> _documentSerializer;
43+
private BsonDocument _resumeAfter;
44+
45+
// public properties
46+
/// <inheritdoc />
47+
public IEnumerable<TDocument> Current => _current;
48+
49+
// constructors
50+
/// <summary>
51+
/// Initializes a new instance of the <see cref="ChangeStreamCursor{TDocument}" /> class.
52+
/// </summary>
53+
/// <param name="cursor">The cursor.</param>
54+
/// <param name="documentSerializer">The document serializer.</param>
55+
/// <param name="binding">The binding.</param>
56+
/// <param name="changeStreamOperation">The change stream operation.</param>
57+
public ChangeStreamCursor(
58+
IAsyncCursor<RawBsonDocument> cursor,
59+
IBsonSerializer<TDocument> documentSerializer,
60+
IReadBinding binding,
61+
ChangeStreamOperation<TDocument> changeStreamOperation)
62+
{
63+
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
64+
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
65+
_binding = Ensure.IsNotNull(binding, nameof(binding));
66+
_changeStreamOperation = Ensure.IsNotNull(changeStreamOperation, nameof(changeStreamOperation));
67+
}
68+
69+
// public methods
70+
/// <inheritdoc />
71+
public void Dispose()
72+
{
73+
if (!_disposed)
74+
{
75+
_disposed = true;
76+
_cursor.Dispose();
77+
_binding.Dispose();
78+
}
79+
}
80+
81+
/// <inheritdoc/>
82+
public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken))
83+
{
84+
bool hasMore;
85+
try
86+
{
87+
hasMore = _cursor.MoveNext(cancellationToken);
88+
}
89+
catch (Exception ex)
90+
{
91+
if (CanResumeAfter(ex))
92+
{
93+
_cursor = _changeStreamOperation.Resume(_binding, _resumeAfter, cancellationToken);
94+
hasMore = _cursor.MoveNext(cancellationToken);
95+
}
96+
throw;
97+
}
98+
99+
ProcessBatch(hasMore);
100+
return hasMore;
101+
}
102+
103+
/// <inheritdoc/>
104+
public Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
105+
{
106+
throw new NotImplementedException();
107+
}
108+
109+
// private methods
110+
private bool CanResumeAfter(Exception ex)
111+
{
112+
return false; // TODO: implement
113+
}
114+
115+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
116+
private TDocument DeserializeDocument(RawBsonDocument rawDocument)
117+
{
118+
var slice = rawDocument.Slice;
119+
var bytes = slice.AccessBackingBytes(0);
120+
using (var memoryStream = new MemoryStream(bytes.Array, bytes.Offset, bytes.Count))
121+
using (var reader = new BsonBinaryReader(memoryStream))
122+
{
123+
var context = BsonDeserializationContext.CreateRoot(reader);
124+
return _documentSerializer.Deserialize(context);
125+
}
126+
}
127+
128+
private IEnumerable<TDocument> DeserializeDocuments(IEnumerable<RawBsonDocument> rawDocuments)
129+
{
130+
var documents = new List<TDocument>();
131+
RawBsonDocument lastRawDocument = null;
132+
133+
foreach (var rawDocument in rawDocuments)
134+
{
135+
if (!rawDocument.Contains("_id"))
136+
{
137+
throw new MongoException("Cannot provide resume functionality when the resume token is missing.");
138+
}
139+
140+
var document = DeserializeDocument(rawDocument);
141+
documents.Add(document);
142+
143+
lastRawDocument = rawDocument;
144+
}
145+
146+
if (lastRawDocument != null)
147+
{
148+
_resumeAfter = lastRawDocument["_id"].DeepClone().AsBsonDocument;
149+
}
150+
151+
return documents;
152+
}
153+
154+
private void ProcessBatch(bool hasMore)
155+
{
156+
if (hasMore)
157+
{
158+
try
159+
{
160+
_current = DeserializeDocuments(_cursor.Current);
161+
}
162+
finally
163+
{
164+
foreach (var rawDocument in _cursor.Current)
165+
{
166+
rawDocument.Dispose();
167+
}
168+
}
169+
}
170+
else
171+
{
172+
_current = null;
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)