Skip to content

Commit 15c7086

Browse files
committed
CSHARP-2023: Finished implementing and added remaining tests.
1 parent 507e0e4 commit 15c7086

16 files changed

+1062
-89
lines changed

src/MongoDB.Driver.Core/ChangeStreamOperationTypeSerializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public override void Serialize(BsonSerializationContext context, BsonSerializati
5353
case ChangeStreamOperationType.Invalidate: writer.WriteString("invalidate"); break;
5454
case ChangeStreamOperationType.Replace: writer.WriteString("replace"); break;
5555
case ChangeStreamOperationType.Update: writer.WriteString("update"); break;
56-
default: throw new FormatException($"Invalid ChangeStreamOperationType: {value}.");
56+
default: throw new ArgumentException($"Invalid ChangeStreamOperationType: {value}.", nameof(value));
5757
}
5858
}
5959
}

src/MongoDB.Driver.Core/ChangeStreamOutput.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,29 @@ public sealed class ChangeStreamOutput<TDocument>
2727
private readonly CollectionNamespace _collectionNamespace;
2828
private readonly BsonDocument _documentKey;
2929
private readonly TDocument _fullDocument;
30-
private readonly BsonDocument _id;
3130
private readonly ChangeStreamOperationType _operationType;
31+
private readonly BsonDocument _resumeToken;
3232
private readonly ChangeStreamUpdateDescription _updateDescription;
3333

3434
// constructors
3535
/// <summary>
36-
/// Initializes a new instance of the <see cref="ChangeStreamOutput{TDocument}"/> class.
36+
/// Initializes a new instance of the <see cref="ChangeStreamOutput{TDocument}" /> class.
3737
/// </summary>
38-
/// <param name="id">The identifier.</param>
38+
/// <param name="resumeToken">The resume token.</param>
3939
/// <param name="operationType">Type of the operation.</param>
4040
/// <param name="collectionNamespace">Namespace of the collection.</param>
4141
/// <param name="documentKey">The document key.</param>
4242
/// <param name="updateDescription">The update description.</param>
4343
/// <param name="fullDocument">The full document.</param>
4444
public ChangeStreamOutput(
45-
BsonDocument id,
45+
BsonDocument resumeToken,
4646
ChangeStreamOperationType operationType,
4747
CollectionNamespace collectionNamespace,
4848
BsonDocument documentKey,
4949
ChangeStreamUpdateDescription updateDescription,
5050
TDocument fullDocument)
5151
{
52-
_id = Ensure.IsNotNull(id, nameof(id));
52+
_resumeToken = Ensure.IsNotNull(resumeToken, nameof(resumeToken));
5353
_operationType = operationType;
5454
_collectionNamespace = collectionNamespace; // can be null when operationType is Invalidate
5555
_documentKey = documentKey; // can be null
@@ -83,20 +83,20 @@ public ChangeStreamOutput(
8383
public TDocument FullDocument => _fullDocument;
8484

8585
/// <summary>
86-
/// Gets the identifier.
86+
/// Gets the type of the operation.
8787
/// </summary>
8888
/// <value>
89-
/// The identifier.
89+
/// The type of the operation.
9090
/// </value>
91-
public BsonDocument Id => _id;
91+
public ChangeStreamOperationType OperationType => _operationType;
9292

9393
/// <summary>
94-
/// Gets the type of the operation.
94+
/// Gets the resume token.
9595
/// </summary>
9696
/// <value>
97-
/// The type of the operation.
97+
/// The resume token.
9898
/// </value>
99-
public ChangeStreamOperationType OperationType => _operationType;
99+
public BsonDocument ResumeToken => _resumeToken;
100100

101101
/// <summary>
102102
/// Gets the update description.

src/MongoDB.Driver.Core/ChangeStreamOutputSerializer.cs

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,39 +29,36 @@ namespace MongoDB.Driver
2929
/// <typeparam name="TDocument">The type of the document.</typeparam>
3030
public class ChangeStreamOutputSerializer<TDocument> : SealedClassSerializerBase<ChangeStreamOutput<TDocument>>
3131
{
32+
#region static
33+
// private static fields
34+
private static readonly IBsonSerializer<ChangeStreamOperationType> __operationTypeSerializer = new ChangeStreamOperationTypeSerializer();
35+
private readonly ChangeStreamUpdateDescriptionSerializer __updateDescriptionSerializer = new ChangeStreamUpdateDescriptionSerializer();
36+
#endregion
37+
3238
// private fields
3339
private readonly IBsonSerializer<TDocument> _documentSerializer;
34-
private readonly IBsonSerializer<ChangeStreamOperationType> _operationTypeSerializer;
35-
private readonly ChangeStreamFullDocumentOption _fullDocument;
36-
private readonly ChangeStreamUpdateDescriptionSerializer _updateDescriptionSerializer;
3740

3841
// constructors
3942
/// <summary>
4043
/// Initializes a new instance of the <see cref="ChangeStreamOutputSerializer{TDocument}"/> class.
4144
/// </summary>
4245
/// <param name="documentSerializer">The document serializer.</param>
43-
/// <param name="fullDocument">The options.</param>
4446
public ChangeStreamOutputSerializer(
45-
IBsonSerializer<TDocument> documentSerializer,
46-
ChangeStreamFullDocumentOption fullDocument)
47+
IBsonSerializer<TDocument> documentSerializer)
4748
{
4849
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
49-
_fullDocument = fullDocument;
50-
51-
_operationTypeSerializer = new ChangeStreamOperationTypeSerializer();
52-
_updateDescriptionSerializer = new ChangeStreamUpdateDescriptionSerializer();
5350
}
5451

5552
// public methods
5653
/// <inheritdoc />
57-
public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args)
54+
protected override ChangeStreamOutput<TDocument> DeserializeValue(BsonDeserializationContext context, BsonDeserializationArgs args)
5855
{
5956
var reader = context.Reader;
6057

6158
CollectionNamespace collectionNamespace = null;
6259
BsonDocument documentKey = null;
6360
TDocument fullDocument = default(TDocument);
64-
BsonDocument id = null;
61+
BsonDocument resumeToken = null;
6562
ChangeStreamOperationType? operationType = null;
6663
ChangeStreamUpdateDescription updateDescription = null;
6764

@@ -72,7 +69,7 @@ public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationCon
7269
switch (fieldName)
7370
{
7471
case "_id":
75-
id = BsonDocumentSerializer.Instance.Deserialize(context);
72+
resumeToken = BsonDocumentSerializer.Instance.Deserialize(context);
7673
break;
7774

7875
case "ns":
@@ -96,11 +93,11 @@ public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationCon
9693
break;
9794

9895
case "operationType":
99-
operationType = _operationTypeSerializer.Deserialize(context);
96+
operationType = __operationTypeSerializer.Deserialize(context);
10097
break;
10198

10299
case "updateDescription":
103-
updateDescription = _updateDescriptionSerializer.Deserialize(context);
100+
updateDescription = __updateDescriptionSerializer.Deserialize(context);
104101
break;
105102

106103
default:
@@ -110,7 +107,7 @@ public override ChangeStreamOutput<TDocument> Deserialize(BsonDeserializationCon
110107
reader.ReadEndDocument();
111108

112109
return new ChangeStreamOutput<TDocument>(
113-
id,
110+
resumeToken,
114111
operationType.Value,
115112
collectionNamespace,
116113
documentKey,
@@ -124,11 +121,14 @@ protected override void SerializeValue(BsonSerializationContext context, BsonSer
124121
var writer = context.Writer;
125122
writer.WriteStartDocument();
126123
writer.WriteName("_id");
127-
BsonDocumentSerializer.Instance.Serialize(context, value.Id);
124+
BsonDocumentSerializer.Instance.Serialize(context, value.ResumeToken);
128125
writer.WriteName("operationType");
129-
_operationTypeSerializer.Serialize(context, value.OperationType);
130-
writer.WriteName("ns");
131-
SerializeCollectionNamespace(writer, value.CollectionNamespace);
126+
__operationTypeSerializer.Serialize(context, value.OperationType);
127+
if (value.CollectionNamespace != null)
128+
{
129+
writer.WriteName("ns");
130+
SerializeCollectionNamespace(writer, value.CollectionNamespace);
131+
}
132132
if (value.DocumentKey != null)
133133
{
134134
writer.WriteName("documentKey");
@@ -137,9 +137,9 @@ protected override void SerializeValue(BsonSerializationContext context, BsonSer
137137
if (value.UpdateDescription != null)
138138
{
139139
writer.WriteName("updateDescription");
140-
_updateDescriptionSerializer.Serialize(context, value.UpdateDescription);
140+
__updateDescriptionSerializer.Serialize(context, value.UpdateDescription);
141141
}
142-
if (ShouldSerializeFullDocument(value))
142+
if (value.FullDocument != null)
143143
{
144144
writer.WriteName("fullDocument");
145145
_documentSerializer.Serialize(context, value.FullDocument);
@@ -186,21 +186,5 @@ private void SerializeCollectionNamespace(IBsonWriter writer, CollectionNamespac
186186
writer.WriteString(value.CollectionName);
187187
writer.WriteEndDocument();
188188
}
189-
190-
private bool ShouldSerializeFullDocument(ChangeStreamOutput<TDocument> value)
191-
{
192-
switch (value.OperationType)
193-
{
194-
case ChangeStreamOperationType.Insert:
195-
case ChangeStreamOperationType.Replace:
196-
return true;
197-
198-
case ChangeStreamOperationType.Update:
199-
return _fullDocument == ChangeStreamFullDocumentOption.UpdateLookup;
200-
201-
default:
202-
return false;
203-
}
204-
}
205189
}
206190
}

src/MongoDB.Driver.Core/ChangeStreamUpdateDescriptionSerializer.cs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,9 @@ namespace MongoDB.Driver
2727
/// </summary>
2828
public class ChangeStreamUpdateDescriptionSerializer : SealedClassSerializerBase<ChangeStreamUpdateDescription>
2929
{
30-
private readonly IBsonSerializer<string[]> _stringArraySerializer;
31-
32-
/// <summary>
33-
/// Initializes a new instance of the <see cref="ChangeStreamUpdateDescriptionSerializer"/> class.
34-
/// </summary>
35-
public ChangeStreamUpdateDescriptionSerializer()
36-
{
37-
_stringArraySerializer = new ArraySerializer<string>();
38-
}
30+
#region static
31+
private static readonly IBsonSerializer<string[]> __stringArraySerializer = new ArraySerializer<string>();
32+
#endregion
3933

4034
/// <inheritdoc />
4135
protected override ChangeStreamUpdateDescription DeserializeValue(BsonDeserializationContext context, BsonDeserializationArgs args)
@@ -56,7 +50,7 @@ protected override ChangeStreamUpdateDescription DeserializeValue(BsonDeserializ
5650
break;
5751

5852
case "removedFields":
59-
removedFields = _stringArraySerializer.Deserialize(context);
53+
removedFields = __stringArraySerializer.Deserialize(context);
6054
break;
6155

6256
default:
@@ -77,7 +71,7 @@ protected override void SerializeValue(BsonSerializationContext context, BsonSer
7771
writer.WriteName("updatedFields");
7872
BsonDocumentSerializer.Instance.Serialize(context, value.UpdatedFields);
7973
writer.WriteName("removedFields");
80-
_stringArraySerializer.Serialize(context, value.RemovedFields);
74+
__stringArraySerializer.Serialize(context, value.RemovedFields);
8175
writer.WriteEndDocument();
8276
}
8377
}

src/MongoDB.Driver.Core/Core/Operations/ChangeStreamCursor.cs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ namespace MongoDB.Driver.Core.Operations
3131
/// </summary>
3232
/// <typeparam name="TDocument">The type of the output documents.</typeparam>
3333
/// <seealso cref="MongoDB.Driver.IAsyncCursor{TOutput}" />
34-
public sealed class ChangeStreamCursor<TDocument> : IAsyncCursor<TDocument>
34+
internal sealed class ChangeStreamCursor<TDocument> : IAsyncCursor<TDocument>
3535
{
3636
// private fields
3737
private readonly IReadBinding _binding;
38-
private readonly ChangeStreamOperation<TDocument> _changeStreamOperation;
38+
private readonly IChangeStreamOperation<TDocument> _changeStreamOperation;
3939
private IEnumerable<TDocument> _current;
4040
private IAsyncCursor<RawBsonDocument> _cursor;
4141
private bool _disposed;
4242
private IBsonSerializer<TDocument> _documentSerializer;
43-
private BsonDocument _resumeAfter;
43+
private BsonDocument _resumeToken;
4444

4545
// public properties
4646
/// <inheritdoc />
@@ -58,7 +58,7 @@ public ChangeStreamCursor(
5858
IAsyncCursor<RawBsonDocument> cursor,
5959
IBsonSerializer<TDocument> documentSerializer,
6060
IReadBinding binding,
61-
ChangeStreamOperation<TDocument> changeStreamOperation)
61+
IChangeStreamOperation<TDocument> changeStreamOperation)
6262
{
6363
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
6464
_documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
@@ -90,26 +90,48 @@ public void Dispose()
9090
{
9191
if (CanResumeAfter(ex))
9292
{
93-
_cursor = _changeStreamOperation.Resume(_binding, _resumeAfter, cancellationToken);
93+
_cursor = _changeStreamOperation.Resume(_binding, _resumeToken, cancellationToken);
9494
hasMore = _cursor.MoveNext(cancellationToken);
9595
}
96-
throw;
96+
else
97+
{
98+
throw;
99+
}
97100
}
98101

99102
ProcessBatch(hasMore);
100103
return hasMore;
101104
}
102105

103106
/// <inheritdoc/>
104-
public Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
107+
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
105108
{
106-
throw new NotImplementedException();
109+
bool hasMore;
110+
try
111+
{
112+
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
113+
}
114+
catch (Exception ex)
115+
{
116+
if (CanResumeAfter(ex))
117+
{
118+
_cursor = await _changeStreamOperation.ResumeAsync(_binding, _resumeToken, cancellationToken).ConfigureAwait(false);
119+
hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
120+
}
121+
else
122+
{
123+
throw;
124+
}
125+
}
126+
127+
ProcessBatch(hasMore);
128+
return hasMore;
107129
}
108130

109131
// private methods
110-
private bool CanResumeAfter(Exception ex)
132+
private bool CanResumeAfter(Exception exception)
111133
{
112-
return false; // TODO: implement
134+
return exception is IOException || exception is MongoCursorNotFoundException || exception is MongoNotPrimaryException;
113135
}
114136

115137
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times")]
@@ -134,7 +156,7 @@ private IEnumerable<TDocument> DeserializeDocuments(IEnumerable<RawBsonDocument>
134156
{
135157
if (!rawDocument.Contains("_id"))
136158
{
137-
throw new MongoException("Cannot provide resume functionality when the resume token is missing.");
159+
throw new MongoClientException("Cannot provide resume functionality when the resume token is missing.");
138160
}
139161

140162
var document = DeserializeDocument(rawDocument);
@@ -145,7 +167,7 @@ private IEnumerable<TDocument> DeserializeDocuments(IEnumerable<RawBsonDocument>
145167

146168
if (lastRawDocument != null)
147169
{
148-
_resumeAfter = lastRawDocument["_id"].DeepClone().AsBsonDocument;
170+
_resumeToken = lastRawDocument["_id"].DeepClone().AsBsonDocument;
149171
}
150172

151173
return documents;

0 commit comments

Comments
 (0)