Skip to content

Commit 2bd8c40

Browse files
committed
CSHARP-3688: Snapshot reads on Secondaries
1 parent ee1ee26 commit 2bd8c40

28 files changed

+1949
-53
lines changed

src/MongoDB.Driver.Core/Core/Bindings/CoreSession.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public sealed class CoreSession : ICoreSession
4040
private readonly IOperationClock _operationClock = new OperationClock();
4141
private readonly CoreSessionOptions _options;
4242
private readonly ICoreServerSession _serverSession;
43+
private BsonTimestamp _snapshotTime;
4344

4445
// constructors
4546
/// <summary>
@@ -109,6 +110,9 @@ public bool IsInTransaction
109110
}
110111
}
111112

113+
/// <inheritdoc />
114+
public bool IsSnapshot => _options.IsSnapshot;
115+
112116
/// <inheritdoc />
113117
public BsonTimestamp OperationTime => _operationClock.OperationTime;
114118

@@ -118,6 +122,9 @@ public bool IsInTransaction
118122
/// <inheritdoc />
119123
public ICoreServerSession ServerSession => _serverSession;
120124

125+
/// <inheritdoc />
126+
public BsonTimestamp SnapshotTime => _snapshotTime;
127+
121128
// public methods
122129
/// <inheritdoc />
123130
public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
@@ -383,6 +390,15 @@ public void StartTransaction(TransactionOptions transactionOptions = null)
383390
_currentTransaction = new CoreTransaction(transactionNumber, effectiveTransactionOptions);
384391
}
385392

393+
/// <inheritdoc />
394+
public void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime)
395+
{
396+
if (IsSnapshot && _snapshotTime == null)
397+
{
398+
_snapshotTime = snapshotTime;
399+
}
400+
}
401+
386402
/// <inheritdoc />
387403
public void WasUsed()
388404
{
@@ -450,6 +466,10 @@ private void EnsureCommitTransactionCanBeCalled(string methodName)
450466

451467
private void EnsureStartTransactionCanBeCalled()
452468
{
469+
if (IsSnapshot)
470+
{
471+
throw new MongoClientException("Transactions are not supported in snapshot sessions.");
472+
}
453473
if (_currentTransaction == null)
454474
{
455475
EnsureTransactionsAreSupported();

src/MongoDB.Driver.Core/Core/Bindings/CoreSessionOptions.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,25 @@ public class CoreSessionOptions
2424
private readonly TransactionOptions _defaultTransactionOptions;
2525
private readonly bool _isCausallyConsistent;
2626
private readonly bool _isImplicit;
27+
private readonly bool _isSnapshot;
2728

2829
// constructors
2930
/// <summary>
3031
/// Initializes a new instance of the <see cref="CoreSessionOptions" /> class.
3132
/// </summary>
3233
/// <param name="isCausallyConsistent">if set to <c>true</c> this session is causally consistent]</param>
3334
/// <param name="isImplicit">if set to <c>true</c> this session is an implicit session.</param>
35+
/// <param name="isSnapshot">if set to <c>true</c> this session is a snapshot session.</param>
3436
/// <param name="defaultTransactionOptions">The default transaction options.</param>
3537
public CoreSessionOptions(
3638
bool isCausallyConsistent = false,
3739
bool isImplicit = false,
38-
TransactionOptions defaultTransactionOptions = null)
40+
TransactionOptions defaultTransactionOptions = null,
41+
bool isSnapshot = false)
3942
{
4043
_isCausallyConsistent = isCausallyConsistent;
4144
_isImplicit = isImplicit;
45+
_isSnapshot = isSnapshot;
4246
_defaultTransactionOptions = defaultTransactionOptions;
4347
}
4448

@@ -66,5 +70,13 @@ public CoreSessionOptions(
6670
/// <c>true</c> if this session is an implicit session; otherwise, <c>false</c>.
6771
/// </value>
6872
public bool IsImplicit => _isImplicit;
73+
74+
/// <summary>
75+
/// Gets a value indicating whether this session is a snapshot session.
76+
/// </summary>
77+
/// <value>
78+
/// <c>true</c> if this session is a snapshot session; otherwise, <c>false</c>.
79+
/// </value>
80+
public bool IsSnapshot => _isSnapshot;
6981
}
7082
}

src/MongoDB.Driver.Core/Core/Bindings/ICoreSession.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ public interface ICoreSession : IDisposable
8282
/// </value>
8383
bool IsInTransaction { get; }
8484

85+
/// <summary>
86+
/// Gets a value indicate whether this instance is a snapshot session.
87+
/// </summary>
88+
/// <value>
89+
/// <c>true</c> if the session is a snapshot session.
90+
/// </value>
91+
bool IsSnapshot { get; }
92+
8593
/// <summary>
8694
/// Gets the operation time.
8795
/// </summary>
@@ -106,6 +114,14 @@ public interface ICoreSession : IDisposable
106114
/// </value>
107115
ICoreServerSession ServerSession { get; }
108116

117+
/// <summary>
118+
/// Gets the snapshot time.
119+
/// </summary>
120+
/// <value>
121+
/// The snapshot time.
122+
/// </value>
123+
BsonTimestamp SnapshotTime { get; }
124+
109125
// methods
110126
/// <summary>
111127
/// Aborts the transaction.
@@ -167,6 +183,12 @@ public interface ICoreSession : IDisposable
167183
/// <param name="transactionOptions">The transaction options.</param>
168184
void StartTransaction(TransactionOptions transactionOptions = null);
169185

186+
/// <summary>
187+
/// Sets the snapshot time if not set.
188+
/// </summary>
189+
/// <param name="snapshotTime">The snapshot time.</param>
190+
void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime);
191+
170192
/// <summary>
171193
/// Called by the driver when the session is used (i.e. sent to the server).
172194
/// </summary>

src/MongoDB.Driver.Core/Core/Bindings/NoCoreSession.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ public static ICoreSessionHandle NewHandle()
7272
/// <inheritdoc />
7373
public bool IsInTransaction => false;
7474

75+
/// <inheritdoc />
76+
public bool IsSnapshot => false;
77+
7578
/// <inheritdoc />
7679
public BsonTimestamp OperationTime => null;
7780

@@ -81,6 +84,9 @@ public static ICoreSessionHandle NewHandle()
8184
/// <inheritdoc />
8285
public ICoreServerSession ServerSession => NoCoreServerSession.Instance;
8386

87+
/// <inheritdoc />
88+
public BsonTimestamp SnapshotTime => null;
89+
8490
// public methods
8591
/// <inheritdoc />
8692
public void AbortTransaction(CancellationToken cancellationToken = default(CancellationToken))
@@ -143,6 +149,11 @@ public void StartTransaction(TransactionOptions transactionOptions = null)
143149
throw new NotSupportedException("NoCoreSession does not support StartTransaction.");
144150
}
145151

152+
/// <inheritdoc />
153+
public void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime)
154+
{
155+
}
156+
146157
/// <inheritdoc />
147158
public void WasUsed()
148159
{

src/MongoDB.Driver.Core/Core/Bindings/WrappingCoreSession.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,16 @@ public virtual bool IsInTransaction
115115
}
116116
}
117117

118+
/// <inheritdoc />
119+
public bool IsSnapshot
120+
{
121+
get
122+
{
123+
ThrowIfDisposed();
124+
return _wrapped.IsSnapshot;
125+
}
126+
}
127+
118128
/// <inheritdoc />
119129
public virtual BsonTimestamp OperationTime
120130
{
@@ -145,6 +155,16 @@ public virtual ICoreServerSession ServerSession
145155
}
146156
}
147157

158+
/// <inheritdoc />
159+
public BsonTimestamp SnapshotTime
160+
{
161+
get
162+
{
163+
ThrowIfDisposed();
164+
return _wrapped.SnapshotTime;
165+
}
166+
}
167+
148168
/// <summary>
149169
/// Gets the wrapped session.
150170
/// </summary>
@@ -237,6 +257,13 @@ public virtual void StartTransaction(TransactionOptions transactionOptions = nul
237257
_wrapped.StartTransaction(transactionOptions);
238258
}
239259

260+
/// <inheritdoc />
261+
public void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime)
262+
{
263+
ThrowIfDisposed();
264+
_wrapped.SetSnapshotTimeIfNeeded(snapshotTime);
265+
}
266+
240267
/// <inheritdoc />
241268
public virtual void WasUsed()
242269
{

src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,9 @@ public IAsyncCursor<TResult> Execute(RetryableReadContext context, CancellationT
282282
{
283283
var operation = CreateOperation(context);
284284
var result = operation.Execute(context, cancellationToken);
285+
286+
context.ChannelSource.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime);
287+
285288
return CreateCursor(context.ChannelSource, context.Channel, operation.Command, result);
286289
}
287290
}
@@ -307,6 +310,9 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(RetryableReadContext conte
307310
{
308311
var operation = CreateOperation(context);
309312
var result = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false);
313+
314+
context.ChannelSource.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime);
315+
310316
return CreateCursor(context.ChannelSource, context.Channel, operation.Command, result);
311317
}
312318
}
@@ -422,6 +428,7 @@ private void EnsureIsReadOnlyPipeline()
422428

423429
private class AggregateResult
424430
{
431+
public BsonTimestamp AtClusterTime;
425432
public long? CursorId;
426433
public CollectionNamespace CollectionNamespace;
427434
public BsonDocument PostBatchResumeToken;
@@ -485,6 +492,10 @@ public override AggregateResult Deserialize(BsonDeserializationContext context,
485492
var elementName = reader.ReadName();
486493
switch (elementName)
487494
{
495+
case "atClusterTime":
496+
result.AtClusterTime = BsonTimestampSerializer.Instance.Deserialize(context);
497+
break;
498+
488499
case "id":
489500
result.CursorId = new Int64Serializer().Deserialize(context);
490501
break;

src/MongoDB.Driver.Core/Core/Operations/DistinctOperation.cs

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
*/
1515

1616
using System;
17-
using System.Collections.Generic;
1817
using System.Threading;
1918
using System.Threading.Tasks;
2019
using MongoDB.Bson;
20+
using MongoDB.Bson.IO;
2121
using MongoDB.Bson.Serialization;
2222
using MongoDB.Bson.Serialization.Serializers;
2323
using MongoDB.Driver.Core.Bindings;
@@ -172,8 +172,11 @@ public IAsyncCursor<TValue> Execute(IReadBinding binding, CancellationToken canc
172172
using (var context = RetryableReadContext.Create(binding, _retryRequested, cancellationToken))
173173
{
174174
var operation = CreateOperation(context);
175-
var values = operation.Execute(context, cancellationToken);
176-
return new SingleBatchAsyncCursor<TValue>(values);
175+
var result = operation.Execute(context, cancellationToken);
176+
177+
binding.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime);
178+
179+
return new SingleBatchAsyncCursor<TValue>(result.Values);
177180
}
178181
}
179182

@@ -185,8 +188,11 @@ public async Task<IAsyncCursor<TValue>> ExecuteAsync(IReadBinding binding, Cance
185188
using (var context = await RetryableReadContext.CreateAsync(binding, _retryRequested, cancellationToken).ConfigureAwait(false))
186189
{
187190
var operation = CreateOperation(context);
188-
var values = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false);
189-
return new SingleBatchAsyncCursor<TValue>(values);
191+
var result = await operation.ExecuteAsync(context, cancellationToken).ConfigureAwait(false);
192+
193+
binding.Session.SetSnapshotTimeIfNeeded(result.AtClusterTime);
194+
195+
return new SingleBatchAsyncCursor<TValue>(result.Values);
190196
}
191197
}
192198

@@ -208,15 +214,59 @@ internal BsonDocument CreateCommand(ConnectionDescription connectionDescription,
208214
};
209215
}
210216

211-
private ReadCommandOperation<TValue[]> CreateOperation(RetryableReadContext context)
217+
private ReadCommandOperation<DistinctResult> CreateOperation(RetryableReadContext context)
212218
{
213219
var command = CreateCommand(context.Channel.ConnectionDescription, context.Binding.Session);
214-
var valueArraySerializer = new ArraySerializer<TValue>(_valueSerializer);
215-
var resultSerializer = new ElementDeserializer<TValue[]>("values", valueArraySerializer);
216-
return new ReadCommandOperation<TValue[]>(_collectionNamespace.DatabaseNamespace, command, resultSerializer, _messageEncoderSettings)
220+
var serializer = new DistinctResultDeserializer(_valueSerializer);
221+
222+
return new ReadCommandOperation<DistinctResult>(_collectionNamespace.DatabaseNamespace, command, serializer, _messageEncoderSettings)
217223
{
218224
RetryRequested = _retryRequested // might be overridden by retryable read context
219225
};
220226
}
227+
228+
private sealed class DistinctResult
229+
{
230+
public BsonTimestamp AtClusterTime;
231+
public TValue[] Values;
232+
}
233+
234+
private sealed class DistinctResultDeserializer : SerializerBase<DistinctResult>
235+
{
236+
private readonly IBsonSerializer<TValue> _valueSerializer;
237+
238+
public DistinctResultDeserializer(IBsonSerializer<TValue> valuesSerializer)
239+
{
240+
_valueSerializer = valuesSerializer;
241+
}
242+
243+
public override DistinctResult Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args)
244+
{
245+
var reader = context.Reader;
246+
var result = new DistinctResult();
247+
reader.ReadStartDocument();
248+
while (reader.ReadBsonType() != 0)
249+
{
250+
var elementName = reader.ReadName();
251+
switch (elementName)
252+
{
253+
case "atClusterTime":
254+
result.AtClusterTime = BsonTimestampSerializer.Instance.Deserialize(context);
255+
break;
256+
257+
case "values":
258+
var arraySerializer = new ArraySerializer<TValue>(_valueSerializer);
259+
result.Values = arraySerializer.Deserialize(context);
260+
break;
261+
262+
default:
263+
reader.SkipValue();
264+
break;
265+
}
266+
}
267+
reader.ReadEndDocument();
268+
return result;
269+
}
270+
}
221271
}
222272
}

src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ private AsyncCursor<TDocument> CreateCursor(IChannelSourceHandle channelSource,
486486
var collectionNamespace = CollectionNamespace.FromFullName(cursorDocument["ns"].AsString);
487487
var firstBatch = CreateFirstCursorBatch(cursorDocument);
488488

489+
if (cursorDocument.TryGetValue("atClusterTime", out var atClusterTime))
490+
{
491+
channelSource.Session.SetSnapshotTimeIfNeeded(atClusterTime.AsBsonTimestamp);
492+
}
493+
489494
return new AsyncCursor<TDocument>(
490495
getMoreChannelSource,
491496
collectionNamespace,

src/MongoDB.Driver.Core/Core/Operations/ReadConcernHelper.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ public static BsonDocument GetReadConcernForFirstCommandInTransaction(ICoreSessi
3636
private static BsonDocument ToBsonDocument(ICoreSession session, ConnectionDescription connectionDescription, ReadConcern readConcern)
3737
{
3838
var sessionsAreSupported = connectionDescription.IsMasterResult.LogicalSessionTimeout != null || connectionDescription.ServiceId.HasValue;
39+
40+
// snapshot
41+
if (sessionsAreSupported && session.IsSnapshot)
42+
{
43+
var readConcernDocument = ReadConcern.Snapshot.ToBsonDocument();
44+
if (session.SnapshotTime != null)
45+
{
46+
readConcernDocument.Add("atClusterTime", session.SnapshotTime);
47+
}
48+
return readConcernDocument;
49+
}
50+
51+
// causal consistency
3952
var shouldSendAfterClusterTime = sessionsAreSupported && session.IsCausallyConsistent && session.OperationTime != null;
4053
var shouldSendReadConcern = !readConcern.IsServerDefault || shouldSendAfterClusterTime;
4154

0 commit comments

Comments
 (0)