Skip to content

Commit 3d3089c

Browse files
committed
CSHARP-2027: added support for causal consistency.
1 parent b97bcb1 commit 3d3089c

39 files changed

+1237
-184
lines changed

src/MongoDB.Driver.Core/Core/Bindings/ICoreSession.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public interface ICoreSession : IDisposable
4040
/// </value>
4141
BsonDocument Id { get; }
4242

43+
/// <summary>
44+
/// Gets a value indicate whether this instance is causally consistent.
45+
/// </summary>
46+
/// <value>
47+
/// <c>true</c> if the session is causally consistent.
48+
/// </value>
49+
bool IsCausallyConsistent { get; }
50+
4351
/// <summary>
4452
/// Gets a value indicating whether this instance is implicit session.
4553
/// </summary>

src/MongoDB.Driver.Core/Core/Bindings/NoCoreSession.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public static ICoreSessionHandle NewHandle()
5454
/// <inheritdoc />
5555
public BsonDocument Id => null;
5656

57+
/// <inheritdoc />
58+
public bool IsCausallyConsistent => false;
59+
5760
/// <inheritdoc />
5861
public bool IsImplicit => true;
5962

src/MongoDB.Driver.Core/Core/Bindings/WrappingCoreSession.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public virtual BsonDocument Id
6363
}
6464
}
6565

66+
/// <inheritdoc />
67+
public virtual bool IsCausallyConsistent
68+
{
69+
get
70+
{
71+
ThrowIfDisposed();
72+
return _wrapped.IsCausallyConsistent;
73+
}
74+
}
75+
6676
/// <inheritdoc />
6777
public virtual bool IsImplicit
6878
{

src/MongoDB.Driver.Core/Core/Operations/AggregateOperation.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using MongoDB.Bson.Serialization;
2424
using MongoDB.Bson.Serialization.Serializers;
2525
using MongoDB.Driver.Core.Bindings;
26+
using MongoDB.Driver.Core.Connections;
2627
using MongoDB.Driver.Core.Events;
2728
using MongoDB.Driver.Core.Misc;
2829
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
@@ -202,7 +203,7 @@ public IAsyncCursor<TResult> Execute(IReadBinding binding, CancellationToken can
202203
using (var channel = channelSource.GetChannel(cancellationToken))
203204
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
204205
{
205-
var operation = CreateOperation(channel);
206+
var operation = CreateOperation(channel, channelBinding);
206207
var result = operation.Execute(channelBinding, cancellationToken);
207208
return CreateCursor(channelSource, channel, operation.Command, result);
208209
}
@@ -219,7 +220,7 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(IReadBinding binding, Canc
219220
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
220221
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
221222
{
222-
var operation = CreateOperation(channel);
223+
var operation = CreateOperation(channel, channelBinding);
223224
var result = await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
224225
return CreateCursor(channelSource, channel, operation.Command, result);
225226
}
@@ -240,24 +241,25 @@ public IReadOperation<BsonDocument> ToExplainOperation(ExplainVerbosity verbosit
240241
};
241242
}
242243

243-
internal BsonDocument CreateCommand(SemanticVersion serverVersion)
244+
internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, ICoreSession session)
244245
{
245-
Feature.ReadConcern.ThrowIfNotSupported(serverVersion, _readConcern);
246-
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
246+
Feature.ReadConcern.ThrowIfNotSupported(connectionDescription.ServerVersion, _readConcern);
247+
Feature.Collation.ThrowIfNotSupported(connectionDescription.ServerVersion, _collation);
247248

248249
var command = new BsonDocument
249250
{
250251
{ "aggregate", _collectionNamespace.CollectionName },
251252
{ "pipeline", new BsonArray(_pipeline) },
252253
{ "allowDiskUse", () => _allowDiskUse.Value, _allowDiskUse.HasValue },
253254
{ "maxTimeMS", () => _maxTime.Value.TotalMilliseconds, _maxTime.HasValue },
254-
{ "readConcern", () => _readConcern.ToBsonDocument(), !_readConcern.IsServerDefault },
255255
{ "collation", () => _collation.ToBsonDocument(), _collation != null }
256256
};
257257

258-
if (Feature.AggregateCursorResult.IsSupported(serverVersion))
258+
ReadConcernHelper.AppendReadConcern(command, _readConcern, connectionDescription, session);
259+
260+
if (Feature.AggregateCursorResult.IsSupported(connectionDescription.ServerVersion))
259261
{
260-
var useCursor = _useCursor.GetValueOrDefault(true) || serverVersion >= new SemanticVersion(3, 5, 0);
262+
var useCursor = _useCursor.GetValueOrDefault(true) || connectionDescription.ServerVersion >= new SemanticVersion(3, 5, 0);
261263
if (useCursor)
262264
{
263265
command["cursor"] = new BsonDocument
@@ -270,9 +272,9 @@ internal BsonDocument CreateCommand(SemanticVersion serverVersion)
270272
return command;
271273
}
272274

273-
private ReadCommandOperation<AggregateResult> CreateOperation(IChannelHandle channel)
275+
private ReadCommandOperation<AggregateResult> CreateOperation(IChannel channel, IBinding binding)
274276
{
275-
var command = CreateCommand(channel.ConnectionDescription.ServerVersion);
277+
var command = CreateCommand(channel.ConnectionDescription, binding.Session);
276278
var serializer = new AggregateResultDeserializer(_resultSerializer);
277279
return new ReadCommandOperation<AggregateResult>(CollectionNamespace.DatabaseNamespace, command, serializer, MessageEncoderSettings);
278280
}

src/MongoDB.Driver.Core/Core/Operations/CountOperation.cs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using MongoDB.Bson;
2020
using MongoDB.Bson.Serialization.Serializers;
2121
using MongoDB.Driver.Core.Bindings;
22+
using MongoDB.Driver.Core.Connections;
2223
using MongoDB.Driver.Core.Misc;
2324
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
2425

@@ -159,22 +160,25 @@ public long? Skip
159160
}
160161

161162
// methods
162-
internal BsonDocument CreateCommand(SemanticVersion serverVersion)
163+
internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, ICoreSession session)
163164
{
164-
Feature.ReadConcern.ThrowIfNotSupported(serverVersion, _readConcern);
165-
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
165+
Feature.ReadConcern.ThrowIfNotSupported(connectionDescription.ServerVersion, _readConcern);
166+
Feature.Collation.ThrowIfNotSupported(connectionDescription.ServerVersion, _collation);
166167

167-
return new BsonDocument
168+
var command = new BsonDocument
168169
{
169170
{ "count", _collectionNamespace.CollectionName },
170171
{ "query", _filter, _filter != null },
171172
{ "limit", () => _limit.Value, _limit.HasValue },
172173
{ "skip", () => _skip.Value, _skip.HasValue },
173174
{ "hint", _hint, _hint != null },
174175
{ "maxTimeMS", () => _maxTime.Value.TotalMilliseconds, _maxTime.HasValue },
175-
{ "readConcern", () => _readConcern.ToBsonDocument(), !_readConcern.IsServerDefault },
176176
{ "collation", () => _collation.ToBsonDocument(), _collation != null }
177177
};
178+
179+
ReadConcernHelper.AppendReadConcern(command, _readConcern, connectionDescription, session);
180+
181+
return command;
178182
}
179183

180184
/// <inheritdoc/>
@@ -185,7 +189,7 @@ public long Execute(IReadBinding binding, CancellationToken cancellationToken)
185189
using (var channel = channelSource.GetChannel(cancellationToken))
186190
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
187191
{
188-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
192+
var operation = CreateOperation(channel, channelBinding);
189193
var document = operation.Execute(channelBinding, cancellationToken);
190194
return document["n"].ToInt64();
191195
}
@@ -199,15 +203,15 @@ public async Task<long> ExecuteAsync(IReadBinding binding, CancellationToken can
199203
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
200204
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
201205
{
202-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
206+
var operation = CreateOperation(channel, channelBinding);
203207
var document = await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
204208
return document["n"].ToInt64();
205209
}
206210
}
207211

208-
private ReadCommandOperation<BsonDocument> CreateOperation(SemanticVersion serverVersion)
212+
private ReadCommandOperation<BsonDocument> CreateOperation(IChannel channel, IBinding binding)
209213
{
210-
var command = CreateCommand(serverVersion);
214+
var command = CreateCommand(channel.ConnectionDescription, binding.Session);
211215
return new ReadCommandOperation<BsonDocument>(_collectionNamespace.DatabaseNamespace, command, BsonDocumentSerializer.Instance, _messageEncoderSettings);
212216
}
213217
}

src/MongoDB.Driver.Core/Core/Operations/DistinctOperation.cs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using MongoDB.Bson.Serialization;
2222
using MongoDB.Bson.Serialization.Serializers;
2323
using MongoDB.Driver.Core.Bindings;
24+
using MongoDB.Driver.Core.Connections;
2425
using MongoDB.Driver.Core.Misc;
2526
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
2627

@@ -159,7 +160,7 @@ public IAsyncCursor<TValue> Execute(IReadBinding binding, CancellationToken canc
159160
using (var channel = channelSource.GetChannel(cancellationToken))
160161
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
161162
{
162-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
163+
var operation = CreateOperation(channel, channelBinding);
163164
var values = operation.Execute(channelBinding, cancellationToken);
164165
return new SingleBatchAsyncCursor<TValue>(values);
165166
}
@@ -173,32 +174,34 @@ public async Task<IAsyncCursor<TValue>> ExecuteAsync(IReadBinding binding, Cance
173174
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
174175
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
175176
{
176-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
177+
var operation = CreateOperation(channel, channelBinding);
177178
var values = await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
178179
return new SingleBatchAsyncCursor<TValue>(values);
179180
}
180181
}
181182

182183
// private methods
183-
internal BsonDocument CreateCommand(SemanticVersion serverVersion)
184+
internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, ICoreSession session)
184185
{
185-
Feature.ReadConcern.ThrowIfNotSupported(serverVersion, _readConcern);
186-
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
186+
Feature.ReadConcern.ThrowIfNotSupported(connectionDescription.ServerVersion, _readConcern);
187+
Feature.Collation.ThrowIfNotSupported(connectionDescription.ServerVersion, _collation);
187188

188-
return new BsonDocument
189+
var command = new BsonDocument
189190
{
190191
{ "distinct", _collectionNamespace.CollectionName },
191192
{ "key", _fieldName },
192193
{ "query", _filter, _filter != null },
193194
{ "maxTimeMS", () => _maxTime.Value.TotalMilliseconds, _maxTime.HasValue },
194-
{ "readConcern", () => _readConcern.ToBsonDocument(), !_readConcern.IsServerDefault },
195195
{ "collation", () => _collation.ToBsonDocument(), _collation != null }
196-
};
196+
};
197+
198+
ReadConcernHelper.AppendReadConcern(command, _readConcern, connectionDescription, session);
199+
return command;
197200
}
198201

199-
private ReadCommandOperation<TValue[]> CreateOperation(SemanticVersion serverVersion)
202+
private ReadCommandOperation<TValue[]> CreateOperation(IChannel channel, IBinding binding)
200203
{
201-
var command = CreateCommand(serverVersion);
204+
var command = CreateCommand(channel.ConnectionDescription, binding.Session);
202205
var valueArraySerializer = new ArraySerializer<TValue>(_valueSerializer);
203206
var resultSerializer = new ElementDeserializer<TValue[]>("values", valueArraySerializer);
204207
return new ReadCommandOperation<TValue[]>(_collectionNamespace.DatabaseNamespace, command, resultSerializer, _messageEncoderSettings);

src/MongoDB.Driver.Core/Core/Operations/FindCommandOperation.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using MongoDB.Bson.Serialization;
2222
using MongoDB.Bson.Serialization.Serializers;
2323
using MongoDB.Driver.Core.Bindings;
24+
using MongoDB.Driver.Core.Connections;
2425
using MongoDB.Driver.Core.Events;
2526
using MongoDB.Driver.Core.Misc;
2627
using MongoDB.Driver.Core.Servers;
@@ -412,13 +413,13 @@ public BsonDocument Sort
412413
}
413414

414415
// methods
415-
internal BsonDocument CreateCommand(SemanticVersion serverVersion, ServerType serverType)
416+
internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, ICoreSession session)
416417
{
417-
Feature.ReadConcern.ThrowIfNotSupported(serverVersion, _readConcern);
418-
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
418+
Feature.ReadConcern.ThrowIfNotSupported(connectionDescription.ServerVersion, _readConcern);
419+
Feature.Collation.ThrowIfNotSupported(connectionDescription.ServerVersion, _collation);
419420

420421
var firstBatchSize = _firstBatchSize ?? (_batchSize > 0 ? _batchSize : null);
421-
var isShardRouter = serverType == ServerType.ShardRouter;
422+
var isShardRouter = connectionDescription.IsMasterResult.ServerType == ServerType.ShardRouter;
422423

423424
var command = new BsonDocument
424425
{
@@ -444,10 +445,11 @@ internal BsonDocument CreateCommand(SemanticVersion serverVersion, ServerType se
444445
{ "noCursorTimeout", () => _noCursorTimeout.Value, _noCursorTimeout.HasValue },
445446
{ "awaitData", true, _cursorType == CursorType.TailableAwait },
446447
{ "allowPartialResults", () => _allowPartialResults.Value, _allowPartialResults.HasValue && isShardRouter },
447-
{ "readConcern", () => _readConcern.ToBsonDocument(), !_readConcern.IsServerDefault },
448448
{ "collation", () => _collation.ToBsonDocument(), _collation != null }
449449
};
450450

451+
ReadConcernHelper.AppendReadConcern(command, _readConcern, connectionDescription, session);
452+
451453
return command;
452454
}
453455

@@ -497,7 +499,7 @@ private CursorBatch<TDocument> CreateCursorBatch(BsonDocument commandResult)
497499

498500
using (EventContext.BeginFind(_batchSize, _limit))
499501
{
500-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion, channelSource.ServerDescription.Type);
502+
var operation = CreateOperation(channel, channelBinding);
501503
var commandResult = operation.Execute(channelBinding, cancellationToken);
502504
return CreateCursor(channelSource, commandResult, slaveOk);
503505
}
@@ -519,16 +521,16 @@ private CursorBatch<TDocument> CreateCursorBatch(BsonDocument commandResult)
519521

520522
using (EventContext.BeginFind(_batchSize, _limit))
521523
{
522-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion, channelSource.ServerDescription.Type);
524+
var operation = CreateOperation(channel, channelBinding);
523525
var commandResult = await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
524526
return CreateCursor(channelSource, commandResult, slaveOk);
525527
}
526528
}
527529
}
528530

529-
private ReadCommandOperation<BsonDocument> CreateOperation(SemanticVersion serverVersion, ServerType serverType)
531+
private ReadCommandOperation<BsonDocument> CreateOperation(IChannel channel, IBinding binding)
530532
{
531-
var command = CreateCommand(serverVersion, serverType);
533+
var command = CreateCommand(channel.ConnectionDescription, binding.Session);
532534
var operation = new ReadCommandOperation<BsonDocument>(
533535
_collectionNamespace.DatabaseNamespace,
534536
command,

src/MongoDB.Driver.Core/Core/Operations/GeoNearOperation.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using MongoDB.Bson;
2020
using MongoDB.Bson.Serialization;
2121
using MongoDB.Driver.Core.Bindings;
22+
using MongoDB.Driver.Core.Connections;
2223
using MongoDB.Driver.Core.Misc;
2324
using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
2425

@@ -190,7 +191,7 @@ public TResult Execute(IReadBinding binding, CancellationToken cancellationToken
190191
using (var channel = channelSource.GetChannel(cancellationToken))
191192
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
192193
{
193-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
194+
var operation = CreateOperation(channel, channelBinding);
194195
return operation.Execute(channelBinding, cancellationToken);
195196
}
196197
}
@@ -203,17 +204,17 @@ public async Task<TResult> ExecuteAsync(IReadBinding binding, CancellationToken
203204
using (var channel = await channelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false))
204205
using (var channelBinding = new ChannelReadBinding(channelSource.Server, channel, binding.ReadPreference, binding.Session.Fork()))
205206
{
206-
var operation = CreateOperation(channel.ConnectionDescription.ServerVersion);
207+
var operation = CreateOperation(channel, channelBinding);
207208
return await operation.ExecuteAsync(channelBinding, cancellationToken).ConfigureAwait(false);
208209
}
209210
}
210211

211-
internal BsonDocument CreateCommand(SemanticVersion serverVersion)
212+
internal BsonDocument CreateCommand(ConnectionDescription connectionDescription, ICoreSession session)
212213
{
213-
Feature.ReadConcern.ThrowIfNotSupported(serverVersion, _readConcern);
214-
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
214+
Feature.ReadConcern.ThrowIfNotSupported(connectionDescription.ServerVersion, _readConcern);
215+
Feature.Collation.ThrowIfNotSupported(connectionDescription.ServerVersion, _collation);
215216

216-
return new BsonDocument
217+
var command = new BsonDocument
217218
{
218219
{ "geoNear", _collectionNamespace.CollectionName },
219220
{ "near", _near },
@@ -225,14 +226,16 @@ internal BsonDocument CreateCommand(SemanticVersion serverVersion)
225226
{ "includeLocs", () => _includeLocs.Value, _includeLocs.HasValue },
226227
{ "uniqueDocs", () => _uniqueDocs.Value, _uniqueDocs.HasValue },
227228
{ "maxTimeMS", () => _maxTime.Value.TotalMilliseconds, _maxTime.HasValue },
228-
{ "readConcern", _readConcern.ToBsonDocument(), !_readConcern.IsServerDefault },
229229
{ "collation", () => _collation.ToBsonDocument(), _collation != null }
230230
};
231+
232+
ReadConcernHelper.AppendReadConcern(command, _readConcern, connectionDescription, session);
233+
return command;
231234
}
232235

233-
private ReadCommandOperation<TResult> CreateOperation(SemanticVersion serverVersion)
236+
private ReadCommandOperation<TResult> CreateOperation(IChannel channel, IBinding binding)
234237
{
235-
var command = CreateCommand(serverVersion);
238+
var command = CreateCommand(channel.ConnectionDescription, binding.Session);
236239
return new ReadCommandOperation<TResult>(
237240
_collectionNamespace.DatabaseNamespace,
238241
command,

0 commit comments

Comments
 (0)