Skip to content

Commit 903285f

Browse files
CSHARP-2528: Add support for majority read concern level to Aggregation $out.
1 parent a196598 commit 903285f

File tree

12 files changed

+786
-53
lines changed

12 files changed

+786
-53
lines changed

src/MongoDB.Driver.Core/Core/Operations/AggregateToCollectionOperation.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class AggregateToCollectionOperation : IWriteOperation<BsonDocument>
4343
private TimeSpan? _maxTime;
4444
private readonly MessageEncoderSettings _messageEncoderSettings;
4545
private readonly IReadOnlyList<BsonDocument> _pipeline;
46+
private ReadConcern _readConcern;
4647
private WriteConcern _writeConcern;
4748

4849
// constructors
@@ -167,6 +168,21 @@ public IReadOnlyList<BsonDocument> Pipeline
167168
get { return _pipeline; }
168169
}
169170

171+
/// <summary>
172+
/// Gets or sets the read concern.
173+
/// </summary>
174+
/// <value>
175+
/// The read concern.
176+
/// </value>
177+
public ReadConcern ReadConcern
178+
{
179+
get { return _readConcern; }
180+
set
181+
{
182+
_readConcern = value;
183+
}
184+
}
185+
170186
/// <summary>
171187
/// Gets or sets the write concern.
172188
/// </summary>
@@ -213,6 +229,9 @@ internal BsonDocument CreateCommand(ICoreSessionHandle session, ConnectionDescri
213229
var serverVersion = connectionDescription.ServerVersion;
214230
Feature.Collation.ThrowIfNotSupported(serverVersion, _collation);
215231

232+
var readConcern = _readConcern != null
233+
? ReadConcernHelper.GetReadConcernForCommand(session, connectionDescription, _readConcern)
234+
: null;
216235
var writeConcern = WriteConcernHelper.GetWriteConcernForCommandThatWrites(session, _writeConcern, serverVersion);
217236
return new BsonDocument
218237
{
@@ -222,6 +241,7 @@ internal BsonDocument CreateCommand(ICoreSessionHandle session, ConnectionDescri
222241
{ "bypassDocumentValidation", () => _bypassDocumentValidation.Value, _bypassDocumentValidation.HasValue && Feature.BypassDocumentValidation.IsSupported(serverVersion) },
223242
{ "maxTimeMS", () => MaxTimeHelper.ToMaxTimeMS(_maxTime.Value), _maxTime.HasValue },
224243
{ "collation", () => _collation.ToBsonDocument(), _collation != null },
244+
{ "readConcern", readConcern, readConcern != null },
225245
{ "writeConcern", writeConcern, writeConcern != null },
226246
{ "cursor", new BsonDocument(), serverVersion >= new SemanticVersion(3, 5, 0) },
227247
{ "hint", () => _hint, _hint != null },

src/MongoDB.Driver.Legacy/MongoCollection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ private IEnumerable<BsonDocument> Aggregate(IClientSessionHandle session, Aggreg
153153
BypassDocumentValidation = args.BypassDocumentValidation,
154154
Collation = args.Collation,
155155
MaxTime = args.MaxTime,
156+
ReadConcern = _settings.ReadConcern,
156157
WriteConcern = _settings.WriteConcern
157158
};
158159
ExecuteWriteOperation(session, aggregateOperation);

src/MongoDB.Driver/MongoCollectionImpl.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ private AggregateToCollectionOperation CreateAggregateToCollectionOperation<TRes
742742
Comment = options.Comment,
743743
Hint = options.Hint,
744744
MaxTime = options.MaxTime,
745+
ReadConcern = _settings.ReadConcern,
745746
WriteConcern = _settings.WriteConcern
746747
};
747748
}

tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateToCollectionOperationTests.cs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
using FluentAssertions;
1919
using MongoDB.Bson;
2020
using MongoDB.Bson.TestHelpers.XunitExtensions;
21-
using MongoDB.Driver.Core.Bindings;
2221
using MongoDB.Driver.Core.Clusters;
2322
using MongoDB.Driver.Core.Misc;
2423
using MongoDB.Driver.Core.TestHelpers;
2524
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
26-
using Moq;
2725
using Xunit;
2826

2927
namespace MongoDB.Driver.Core.Operations
@@ -49,6 +47,7 @@ public void Constructor_should_create_a_valid_instance()
4947
subject.BypassDocumentValidation.Should().NotHaveValue();
5048
subject.Collation.Should().BeNull();
5149
subject.MaxTime.Should().NotHaveValue();
50+
subject.ReadConcern.Should().BeNull();
5251
subject.WriteConcern.Should().BeNull();
5352
}
5453

@@ -183,6 +182,19 @@ public void MaxTime_set_should_throw_when_value_is_invalid(
183182
e.ParamName.Should().Be("value");
184183
}
185184

185+
[Theory]
186+
[ParameterAttributeData]
187+
public void ReadConcern_get_and_set_should_work([Values(ReadConcernLevel.Local, ReadConcernLevel.Majority)] ReadConcernLevel level)
188+
{
189+
var subject = new AggregateToCollectionOperation(_collectionNamespace, __pipeline, _messageEncoderSettings);
190+
var value = new ReadConcern(new Optional<ReadConcernLevel?>(level));
191+
192+
subject.ReadConcern = value;
193+
var result = subject.ReadConcern;
194+
195+
result.Should().BeSameAs(value);
196+
}
197+
186198
[Theory]
187199
[ParameterAttributeData]
188200
public void WriteConcern_get_and_set_should_work(
@@ -390,6 +402,32 @@ public void CreateCommand_should_return_expected_result_when_MaxTime_is_set(long
390402
result["maxTimeMS"].BsonType.Should().Be(BsonType.Int32);
391403
}
392404

405+
[Theory]
406+
[ParameterAttributeData]
407+
public void CreateCommand_should_return_expected_result_when_ReadConcern_is_set(
408+
[Values(ReadConcernLevel.Majority)] ReadConcernLevel readConcernLevel,
409+
[Values(false, true)] bool withReadConcern)
410+
{
411+
var subject = new AggregateToCollectionOperation(_collectionNamespace, __pipeline, _messageEncoderSettings);
412+
if (withReadConcern)
413+
{
414+
subject.ReadConcern = new ReadConcern(readConcernLevel);
415+
};
416+
var session = OperationTestHelper.CreateSession();
417+
var connectionDescription = OperationTestHelper.CreateConnectionDescription();
418+
419+
var result = subject.CreateCommand(session, connectionDescription);
420+
421+
var expectedResult = new BsonDocument
422+
{
423+
{ "aggregate", _collectionNamespace.CollectionName },
424+
{ "pipeline", new BsonArray(__pipeline) },
425+
{ "readConcern", () => subject.ReadConcern.ToBsonDocument(), withReadConcern },
426+
{ "cursor", new BsonDocument() }
427+
};
428+
result.Should().Be(expectedResult);
429+
}
430+
393431
[Theory]
394432
[ParameterAttributeData]
395433
public void CreateCommand_should_return_expected_result_when_WriteConcern_is_set(

tests/MongoDB.Driver.Tests/MongoCollectionImplTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public void Aggregate_should_execute_an_AggregateToCollectionOperation_and_a_Fin
143143
[Values(false, true)] bool async)
144144
{
145145
var writeConcern = new WriteConcern(1);
146+
var readConcern = new ReadConcern(ReadConcernLevel.Majority);
146147
var subject = CreateSubject<BsonDocument>().WithWriteConcern(writeConcern);
147148
var session = CreateSession(usingSession);
148149
var pipeline = new EmptyPipelineDefinition<BsonDocument>()
@@ -199,6 +200,7 @@ public void Aggregate_should_execute_an_AggregateToCollectionOperation_and_a_Fin
199200
aggregateOperation.Hint.Should().Be(options.Hint);
200201
aggregateOperation.MaxTime.Should().Be(options.MaxTime);
201202
aggregateOperation.Pipeline.Should().Equal(renderedPipeline.Documents);
203+
aggregateOperation.ReadConcern.Should().Be(readConcern);
202204
aggregateOperation.WriteConcern.Should().BeSameAs(writeConcern);
203205

204206
var mockCursor = new Mock<IAsyncCursor<BsonDocument>>();

tests/MongoDB.Driver.Tests/Specifications/change-streams/ChangeStreamTestRunner.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,15 +236,11 @@ private void ExecuteOperation(IMongoClient client, BsonDocument operation)
236236
var test = CrudOperationTestFactory.CreateTest(name);
237237

238238
var arguments = (BsonDocument)operation.GetValue("arguments", new BsonDocument());
239-
string reason;
240-
if (!test.CanExecute(DriverTestConfiguration.Client.Cluster.Description, arguments, out reason))
241-
{
242-
throw new SkipException(reason);
243-
}
239+
test.SkipIfNotSupported(arguments);
244240

245241
var database = client.GetDatabase(operation["database"].AsString);
246242
var collection = database.GetCollection<BsonDocument>(operation["collection"].AsString);
247-
test.Execute(DriverTestConfiguration.Client.Cluster.Description, database, collection, arguments, outcome: null, async: false);
243+
test.Execute(DriverTestConfiguration.Client.Cluster.Description, database, collection, arguments, outcome: null, isErrorExpected: false, async: false);
248244
}
249245

250246
private List<CommandStartedEvent> GetEvents(EventCapturer eventCapturer)

tests/MongoDB.Driver.Tests/Specifications/crud/AggregateTest.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515

1616
using System.Collections.Generic;
1717
using System.Linq;
18-
using System.Threading.Tasks;
1918
using FluentAssertions;
2019
using MongoDB.Bson;
21-
using MongoDB.Driver.Core.Clusters;
2220
using MongoDB.Driver.Core.Misc;
21+
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
2322

2423
namespace MongoDB.Driver.Tests.Specifications.crud
2524
{
@@ -28,15 +27,13 @@ public class AggregateTest : CrudOperationWithResultTestBase<List<BsonDocument>>
2827
private List<BsonDocument> _stages;
2928
private AggregateOptions _options = new AggregateOptions();
3029

31-
public override bool CanExecute(ClusterDescription clusterDescription, BsonDocument arguments, out string reason)
30+
public override void SkipIfNotSupported(BsonDocument arguments)
3231
{
33-
var version = clusterDescription.Servers[0].Version;
34-
reason = string.Format("Server must be at least 2.6.0. Current server is {0}.", version);
35-
return !(version < new SemanticVersion(2, 6, 0) &&
36-
((BsonArray)arguments["pipeline"])
37-
.Cast<BsonDocument>()
38-
.Last()
39-
.Contains("$out"));
32+
var lastStage = arguments["pipeline"].AsBsonArray.Last().AsBsonDocument;
33+
if (lastStage.GetElement(0).Name == "$out")
34+
{
35+
RequireServer.Check().Supports(Feature.AggregateOut);
36+
}
4037
}
4138

4239
protected override bool TrySetArgument(string name, BsonValue value)

tests/MongoDB.Driver.Tests/Specifications/crud/CrudOperationTestBase.cs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,29 @@
1414
*/
1515

1616
using System;
17-
using System.Threading.Tasks;
1817
using FluentAssertions;
1918
using MongoDB.Bson;
2019
using MongoDB.Driver.Core.Clusters;
21-
using MongoDB.Driver.Core.Misc;
2220

2321
namespace MongoDB.Driver.Tests.Specifications.crud
2422
{
2523
public abstract class CrudOperationTestBase : ICrudOperationTest
2624
{
25+
public Exception ActualException { get; set; }
2726
protected ClusterDescription ClusterDescription { get; private set; }
2827

29-
public virtual bool CanExecute(ClusterDescription clusterDescription, BsonDocument arguments, out string reason)
28+
public virtual void SkipIfNotSupported(BsonDocument arguments)
3029
{
31-
reason = null;
32-
return true;
3330
}
3431

35-
public void Execute(ClusterDescription clusterDescription, IMongoDatabase database, IMongoCollection<BsonDocument> collection, BsonDocument arguments, BsonDocument outcome, bool async)
32+
public void Execute(
33+
ClusterDescription clusterDescription,
34+
IMongoDatabase database,
35+
IMongoCollection<BsonDocument> collection,
36+
BsonDocument arguments,
37+
BsonDocument outcome,
38+
bool isErrorExpected,
39+
bool async)
3640
{
3741
ClusterDescription = clusterDescription;
3842

@@ -44,8 +48,20 @@ public void Execute(ClusterDescription clusterDescription, IMongoDatabase databa
4448
}
4549
}
4650

47-
Execute(collection, outcome, async);
51+
try
52+
{
53+
Execute(collection, outcome, async);
54+
}
55+
catch (Exception ex) when (isErrorExpected)
56+
{
57+
ActualException = ex;
58+
}
59+
60+
AssertOutcome(outcome, database, collection);
61+
}
4862

63+
protected virtual void AssertOutcome(BsonDocument outcome, IMongoDatabase database, IMongoCollection<BsonDocument> collection)
64+
{
4965
if (outcome != null && outcome.Contains("collection"))
5066
{
5167
var collectionToVerify = collection;
@@ -70,14 +86,22 @@ protected virtual void VerifyCollection(IMongoCollection<BsonDocument> collectio
7086

7187
public abstract class CrudOperationWithResultTestBase<TResult> : CrudOperationTestBase
7288
{
89+
private TResult _result;
90+
7391
protected sealed override void Execute(IMongoCollection<BsonDocument> collection, BsonDocument outcome, bool async)
7492
{
75-
var actualResult = ExecuteAndGetResult(collection, async);
76-
if (outcome != null)
93+
_result = ExecuteAndGetResult(collection, async);
94+
}
95+
96+
protected override void AssertOutcome(BsonDocument outcome, IMongoDatabase database, IMongoCollection<BsonDocument> collection)
97+
{
98+
if (outcome != null && outcome.Contains("result"))
7799
{
78100
var expectedResult = ConvertExpectedResult(outcome["result"]);
79-
VerifyResult(actualResult, expectedResult);
101+
VerifyResult(_result, expectedResult);
80102
}
103+
104+
base.AssertOutcome(outcome, database, collection);
81105
}
82106

83107
protected abstract TResult ConvertExpectedResult(BsonValue expectedResult);

0 commit comments

Comments
 (0)