Skip to content

Commit ef33873

Browse files
committed
CSHARP-2448: Implement MongoDatabase Aggregate method.
1 parent ec182f7 commit ef33873

34 files changed

+943
-120
lines changed

src/MongoDB.Driver.Core/Core/Operations/AggregateToCollectionOperation.cs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class AggregateToCollectionOperation : IWriteOperation<BsonDocument>
3939
private Collation _collation;
4040
private readonly CollectionNamespace _collectionNamespace;
4141
private string _comment;
42+
private readonly DatabaseNamespace _databaseNamespace;
4243
private BsonValue _hint;
4344
private TimeSpan? _maxTime;
4445
private readonly MessageEncoderSettings _messageEncoderSettings;
@@ -50,18 +51,30 @@ public class AggregateToCollectionOperation : IWriteOperation<BsonDocument>
5051
/// <summary>
5152
/// Initializes a new instance of the <see cref="AggregateToCollectionOperation"/> class.
5253
/// </summary>
53-
/// <param name="collectionNamespace">The collection namespace.</param>
54+
/// <param name="databaseNamespace">The database namespace.</param>
5455
/// <param name="pipeline">The pipeline.</param>
5556
/// <param name="messageEncoderSettings">The message encoder settings.</param>
56-
public AggregateToCollectionOperation(CollectionNamespace collectionNamespace, IEnumerable<BsonDocument> pipeline, MessageEncoderSettings messageEncoderSettings)
57+
public AggregateToCollectionOperation(DatabaseNamespace databaseNamespace, IEnumerable<BsonDocument> pipeline, MessageEncoderSettings messageEncoderSettings)
5758
{
58-
_collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
59+
_databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
5960
_pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).ToList();
6061
_messageEncoderSettings = Ensure.IsNotNull(messageEncoderSettings, nameof(messageEncoderSettings));
6162

6263
EnsureIsOutputToCollectionPipeline();
6364
}
6465

66+
/// <summary>
67+
/// Initializes a new instance of the <see cref="AggregateToCollectionOperation"/> class.
68+
/// </summary>
69+
/// <param name="collectionNamespace">The collection namespace.</param>
70+
/// <param name="pipeline">The pipeline.</param>
71+
/// <param name="messageEncoderSettings">The message encoder settings.</param>
72+
public AggregateToCollectionOperation(CollectionNamespace collectionNamespace, IEnumerable<BsonDocument> pipeline, MessageEncoderSettings messageEncoderSettings)
73+
: this(Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace)).DatabaseNamespace, pipeline, messageEncoderSettings)
74+
{
75+
_collectionNamespace = collectionNamespace;
76+
}
77+
6578
// properties
6679
/// <summary>
6780
/// Gets or sets a value indicating whether the server is allowed to use the disk.
@@ -122,6 +135,17 @@ public string Comment
122135
set { _comment = value; }
123136
}
124137

138+
/// <summary>
139+
/// Gets the database namespace.
140+
/// </summary>
141+
/// <value>
142+
/// The database namespace.
143+
/// </value>
144+
public DatabaseNamespace DatabaseNamespace
145+
{
146+
get { return _databaseNamespace; }
147+
}
148+
125149
/// <summary>
126150
/// Gets or sets the hint. This must either be a BsonString representing the index name or a BsonDocument representing the key pattern of the index.
127151
/// </summary>
@@ -235,7 +259,7 @@ internal BsonDocument CreateCommand(ICoreSessionHandle session, ConnectionDescri
235259
var writeConcern = WriteConcernHelper.GetWriteConcernForCommandThatWrites(session, _writeConcern, serverVersion);
236260
return new BsonDocument
237261
{
238-
{ "aggregate", _collectionNamespace.CollectionName },
262+
{ "aggregate", _collectionNamespace == null ? (BsonValue)1 : _collectionNamespace.CollectionName },
239263
{ "pipeline", new BsonArray(_pipeline) },
240264
{ "allowDiskUse", () => _allowDiskUse.Value, _allowDiskUse.HasValue },
241265
{ "bypassDocumentValidation", () => _bypassDocumentValidation.Value, _bypassDocumentValidation.HasValue && Feature.BypassDocumentValidation.IsSupported(serverVersion) },

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,22 @@
2323

2424
namespace MongoDB.Driver
2525
{
26-
internal class AggregateFluent<TDocument, TResult> : AggregateFluentBase<TResult>
26+
internal abstract class AggregateFluent<TInput, TResult> : AggregateFluentBase<TResult>
2727
{
2828
// fields
29-
private readonly IMongoCollection<TDocument> _collection;
30-
private readonly AggregateOptions _options;
31-
private readonly PipelineDefinition<TDocument, TResult> _pipeline;
32-
private readonly IClientSessionHandle _session;
29+
protected readonly AggregateOptions _options;
30+
protected readonly PipelineDefinition<TInput, TResult> _pipeline;
31+
protected readonly IClientSessionHandle _session;
3332

3433
// constructors
35-
public AggregateFluent(IClientSessionHandle session, IMongoCollection<TDocument> collection, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options)
34+
protected AggregateFluent(IClientSessionHandle session, PipelineDefinition<TInput, TResult> pipeline, AggregateOptions options)
3635
{
3736
_session = session; // can be null
38-
_collection = Ensure.IsNotNull(collection, nameof(collection));
3937
_pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline));
4038
_options = Ensure.IsNotNull(options, nameof(options));
4139
}
4240

4341
// properties
44-
public override IMongoDatabase Database
45-
{
46-
get { return _collection.Database; }
47-
}
48-
4942
public override AggregateOptions Options
5043
{
5144
get { return _options; }
@@ -143,7 +136,7 @@ public override IAggregateFluent<TResult> Limit(int limit)
143136
public override IAggregateFluent<TNewResult> Lookup<TForeignDocument, TNewResult>(string foreignCollectionName, FieldDefinition<TResult> localField, FieldDefinition<TForeignDocument> foreignField, FieldDefinition<TNewResult> @as, AggregateLookupOptions<TForeignDocument, TNewResult> options)
144137
{
145138
Ensure.IsNotNull(foreignCollectionName, nameof(foreignCollectionName));
146-
var foreignCollection = _collection.Database.GetCollection<TForeignDocument>(foreignCollectionName);
139+
var foreignCollection = Database.GetCollection<TForeignDocument>(foreignCollectionName);
147140
return WithPipeline(_pipeline.Lookup(foreignCollection, localField, foreignField, @as, options));
148141
}
149142

@@ -218,7 +211,7 @@ public override IOrderedAggregateFluent<TResult> ThenBy(SortDefinition<TResult>
218211
var combinedSort = Builders<TResult>.Sort.Combine(oldSort, newSort);
219212
var combinedSortStage = PipelineStageDefinitionBuilder.Sort(combinedSort);
220213
stages[stages.Count - 1] = combinedSortStage;
221-
var newPipeline = new PipelineStagePipelineDefinition<TDocument, TResult>(stages);
214+
var newPipeline = new PipelineStagePipelineDefinition<TInput, TResult>(stages);
222215
return (IOrderedAggregateFluent<TResult>)WithPipeline(newPipeline);
223216
}
224217

@@ -232,6 +225,34 @@ public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<
232225
return WithPipeline(_pipeline.Unwind(field, options));
233226
}
234227

228+
public override string ToString()
229+
{
230+
return $"aggregate({_pipeline})";
231+
}
232+
233+
protected abstract IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<TInput, TNewResult> pipeline);
234+
}
235+
236+
internal class CollectionAggregateFluent<TDocument, TResult> : AggregateFluent<TDocument, TResult>
237+
{
238+
// private fields
239+
private readonly IMongoCollection<TDocument> _collection;
240+
241+
// constructors
242+
public CollectionAggregateFluent(
243+
IClientSessionHandle session,
244+
IMongoCollection<TDocument> collection,
245+
PipelineDefinition<TDocument, TResult> pipeline,
246+
AggregateOptions options)
247+
: base(session, pipeline, options)
248+
{
249+
_collection = Ensure.IsNotNull(collection, nameof(collection));
250+
}
251+
252+
// public properties
253+
public override IMongoDatabase Database => _collection.Database;
254+
255+
// public methods
235256
public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
236257
{
237258
if (_session == null)
@@ -256,14 +277,61 @@ public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken canc
256277
}
257278
}
258279

259-
public override string ToString()
280+
// protected methods
281+
protected override IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<TDocument, TNewResult> pipeline)
260282
{
261-
return $"aggregate({_pipeline})";
283+
return new CollectionAggregateFluent<TDocument, TNewResult>(_session, _collection, pipeline, _options);
284+
}
285+
}
286+
287+
internal class DatabaseAggregateFluent<TResult> : AggregateFluent<NoPipelineInput, TResult>
288+
{
289+
// private fields
290+
private readonly IMongoDatabase _database;
291+
292+
// constructors
293+
public DatabaseAggregateFluent(
294+
IClientSessionHandle session,
295+
IMongoDatabase database,
296+
PipelineDefinition<NoPipelineInput, TResult> pipeline,
297+
AggregateOptions options)
298+
: base(session, pipeline, options)
299+
{
300+
_database = Ensure.IsNotNull(database, nameof(database));
301+
}
302+
303+
// public properties
304+
public override IMongoDatabase Database => _database;
305+
306+
// public methods
307+
public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
308+
{
309+
if (_session == null)
310+
{
311+
return _database.Aggregate(_pipeline, _options, cancellationToken);
312+
}
313+
else
314+
{
315+
return _database.Aggregate(_session, _pipeline, _options, cancellationToken);
316+
}
317+
}
318+
319+
public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken)
320+
{
321+
if (_session == null)
322+
{
323+
return _database.AggregateAsync(_pipeline, _options, cancellationToken);
324+
}
325+
else
326+
{
327+
return _database.AggregateAsync(_session, _pipeline, _options, cancellationToken);
328+
}
262329
}
263330

264-
public IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<TDocument, TNewResult> pipeline)
331+
// protected methods
332+
protected override IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<NoPipelineInput, TNewResult> pipeline)
265333
{
266-
return new AggregateFluent<TDocument, TNewResult>(_session, _collection, pipeline, _options);
334+
return new DatabaseAggregateFluent<TNewResult>(_session, _database, pipeline, _options);
267335
}
268336
}
269337
}

src/MongoDB.Driver/IMongoCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public static class IMongoCollectionExtensions
4141
public static IAggregateFluent<TDocument> Aggregate<TDocument>(this IMongoCollection<TDocument> collection, AggregateOptions options = null)
4242
{
4343
var emptyPipeline = new EmptyPipelineDefinition<TDocument>(collection.DocumentSerializer);
44-
return new AggregateFluent<TDocument, TDocument>(null, collection, emptyPipeline, options ?? new AggregateOptions());
44+
return new CollectionAggregateFluent<TDocument, TDocument>(null, collection, emptyPipeline, options ?? new AggregateOptions());
4545
}
4646

4747
/// <summary>
@@ -58,7 +58,7 @@ public static IAggregateFluent<TDocument> Aggregate<TDocument>(this IMongoCollec
5858
{
5959
Ensure.IsNotNull(session, nameof(session));
6060
var emptyPipeline = new EmptyPipelineDefinition<TDocument>(collection.DocumentSerializer);
61-
return new AggregateFluent<TDocument, TDocument>(session, collection, emptyPipeline, options ?? new AggregateOptions());
61+
return new CollectionAggregateFluent<TDocument, TDocument>(session, collection, emptyPipeline, options ?? new AggregateOptions());
6262
}
6363

6464
/// <summary>

src/MongoDB.Driver/IMongoDatabase.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,52 @@ public interface IMongoDatabase
4646
/// </summary>
4747
MongoDatabaseSettings Settings { get; }
4848

49+
/// <summary>
50+
/// Runs an aggregation pipeline.
51+
/// </summary>
52+
/// <typeparam name="TResult">The type of the result.</typeparam>
53+
/// <param name="pipeline">The pipeline.</param>
54+
/// <param name="options">The options.</param>
55+
/// <param name="cancellationToken">The cancellation token.</param>
56+
/// <returns>A cursor.</returns>
57+
IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
58+
59+
/// <summary>
60+
/// Runs an aggregation pipeline.
61+
/// </summary>
62+
/// <typeparam name="TResult">The type of the result.</typeparam>
63+
/// <param name="session">The session.</param>
64+
/// <param name="pipeline">The pipeline.</param>
65+
/// <param name="options">The options.</param>
66+
/// <param name="cancellationToken">The cancellation token.</param>
67+
/// <returns>
68+
/// A cursor.
69+
/// </returns>
70+
IAsyncCursor<TResult> Aggregate<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
71+
72+
/// <summary>
73+
/// Runs an aggregation pipeline.
74+
/// </summary>
75+
/// <typeparam name="TResult">The type of the result.</typeparam>
76+
/// <param name="pipeline">The pipeline.</param>
77+
/// <param name="options">The options.</param>
78+
/// <param name="cancellationToken">The cancellation token.</param>
79+
/// <returns>A Task whose result is a cursor.</returns>
80+
Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
81+
82+
/// <summary>
83+
/// Runs an aggregation pipeline.
84+
/// </summary>
85+
/// <typeparam name="TResult">The type of the result.</typeparam>
86+
/// <param name="session">The session.</param>
87+
/// <param name="pipeline">The pipeline.</param>
88+
/// <param name="options">The options.</param>
89+
/// <param name="cancellationToken">The cancellation token.</param>
90+
/// <returns>
91+
/// A Task whose result is a cursor.
92+
/// </returns>
93+
Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
94+
4995
/// <summary>
5096
/// Creates the collection with the specified name.
5197
/// </summary>

src/MongoDB.Driver/IMongoDatabaseExtensions.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,36 @@ namespace MongoDB.Driver
2626
/// </summary>
2727
public static class IMongoDatabaseExtensions
2828
{
29+
/// <summary>
30+
/// Begins a fluent aggregation interface.
31+
/// </summary>
32+
/// <param name="database">The database.</param>
33+
/// <param name="options">The options.</param>
34+
/// <returns>
35+
/// A fluent aggregate interface.
36+
/// </returns>
37+
public static IAggregateFluent<NoPipelineInput> Aggregate(this IMongoDatabase database, AggregateOptions options = null)
38+
{
39+
var emptyPipeline = new EmptyPipelineDefinition<NoPipelineInput>(NoPipelineInputSerializer.Instance);
40+
return new DatabaseAggregateFluent<NoPipelineInput>(null, database, emptyPipeline, options ?? new AggregateOptions());
41+
}
42+
43+
/// <summary>
44+
/// Begins a fluent aggregation interface.
45+
/// </summary>
46+
/// <param name="database">The database.</param>
47+
/// <param name="session">The session.</param>
48+
/// <param name="options">The options.</param>
49+
/// <returns>
50+
/// A fluent aggregate interface.
51+
/// </returns>
52+
public static IAggregateFluent<NoPipelineInput> Aggregate(this IMongoDatabase database, IClientSessionHandle session, AggregateOptions options = null)
53+
{
54+
Ensure.IsNotNull(session, nameof(session));
55+
var emptyPipeline = new EmptyPipelineDefinition<NoPipelineInput>(NoPipelineInputSerializer.Instance);
56+
return new DatabaseAggregateFluent<NoPipelineInput>(session, database, emptyPipeline, options ?? new AggregateOptions());
57+
}
58+
2959
/// <summary>
3060
/// Watches changes on all collection in a database.
3161
/// </summary>

src/MongoDB.Driver/MongoDatabaseBase.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,30 @@ public abstract class MongoDatabaseBase : IMongoDatabase
3737
public abstract MongoDatabaseSettings Settings { get; }
3838

3939
// public methods
40+
/// <inheritdoc />
41+
public virtual IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
42+
{
43+
throw new NotImplementedException();
44+
}
45+
46+
/// <inheritdoc />
47+
public virtual IAsyncCursor<TResult> Aggregate<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
48+
{
49+
throw new NotImplementedException();
50+
}
51+
52+
/// <inheritdoc />
53+
public virtual Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
54+
{
55+
throw new NotImplementedException();
56+
}
57+
58+
/// <inheritdoc />
59+
public virtual Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
60+
{
61+
throw new NotImplementedException();
62+
}
63+
4064
/// <inheritdoc />
4165
public virtual void CreateCollection(string name, CreateCollectionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
4266
{

0 commit comments

Comments
 (0)