Skip to content

Commit 22c09c9

Browse files
committed
CSHARP-2746: Ensure support for $out to S3.
1 parent 569e1d5 commit 22c09c9

15 files changed

+898
-32
lines changed

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,30 @@ public CollectionAggregateFluent(
274274
public override IMongoDatabase Database => _collection.Database;
275275

276276
// public methods
277+
public override void ToCollection(CancellationToken cancellationToken)
278+
{
279+
if (_session == null)
280+
{
281+
_collection.AggregateToCollection(_pipeline, _options, cancellationToken);
282+
}
283+
else
284+
{
285+
_collection.AggregateToCollection(_session, _pipeline, _options, cancellationToken);
286+
}
287+
}
288+
289+
public override Task ToCollectionAsync(CancellationToken cancellationToken)
290+
{
291+
if (_session == null)
292+
{
293+
return _collection.AggregateToCollectionAsync(_pipeline, _options, cancellationToken);
294+
}
295+
else
296+
{
297+
return _collection.AggregateToCollectionAsync(_session, _pipeline, _options, cancellationToken);
298+
}
299+
}
300+
277301
public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
278302
{
279303
if (_session == null)
@@ -325,6 +349,30 @@ public DatabaseAggregateFluent(
325349
public override IMongoDatabase Database => _database;
326350

327351
// public methods
352+
public override void ToCollection(CancellationToken cancellationToken)
353+
{
354+
if (_session == null)
355+
{
356+
_database.AggregateToCollection(_pipeline, _options, cancellationToken);
357+
}
358+
else
359+
{
360+
_database.AggregateToCollection(_session, _pipeline, _options, cancellationToken);
361+
}
362+
}
363+
364+
public override Task ToCollectionAsync(CancellationToken cancellationToken)
365+
{
366+
if (_session == null)
367+
{
368+
return _database.AggregateToCollectionAsync(_pipeline, _options, cancellationToken);
369+
}
370+
else
371+
{
372+
return _database.AggregateToCollectionAsync(_session, _pipeline, _options, cancellationToken);
373+
}
374+
}
375+
328376
public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
329377
{
330378
if (_session == null)

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ public virtual IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<T
212212
throw new NotImplementedException();
213213
}
214214

215+
/// <inheritdoc />
216+
public virtual void ToCollection(CancellationToken cancellationToken)
217+
{
218+
throw new NotImplementedException();
219+
}
220+
221+
/// <inheritdoc />
222+
public virtual Task ToCollectionAsync(CancellationToken cancellationToken)
223+
{
224+
throw new NotImplementedException();
225+
}
226+
215227
/// <inheritdoc />
216228
public virtual IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
217229
{

src/MongoDB.Driver/FilteredMongoCollectionBase.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,30 @@ protected IMongoCollection<TDocument> WrappedCollection
9898
return _wrappedCollection.AggregateAsync(session, filteredPipeline, options, cancellationToken);
9999
}
100100

101+
public override void AggregateToCollection<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
102+
{
103+
var filteredPipeline = CreateFilteredPipeline(pipeline);
104+
_wrappedCollection.AggregateToCollection(filteredPipeline, options, cancellationToken);
105+
}
106+
107+
public override void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
108+
{
109+
var filteredPipeline = CreateFilteredPipeline(pipeline);
110+
_wrappedCollection.AggregateToCollection(session, filteredPipeline, options, cancellationToken);
111+
}
112+
113+
public override Task AggregateToCollectionAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
114+
{
115+
var filteredPipeline = CreateFilteredPipeline(pipeline);
116+
return _wrappedCollection.AggregateToCollectionAsync(filteredPipeline, options, cancellationToken);
117+
}
118+
119+
public override Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
120+
{
121+
var filteredPipeline = CreateFilteredPipeline(pipeline);
122+
return _wrappedCollection.AggregateToCollectionAsync(session, filteredPipeline, options, cancellationToken);
123+
}
124+
101125
public override BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
102126
{
103127
return _wrappedCollection.BulkWrite(CombineModelFilters(requests), options, cancellationToken);

src/MongoDB.Driver/IAggregateFluent.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,19 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
326326
/// <returns>The fluent aggregate interface.</returns>
327327
IAggregateFluent<AggregateSortByCountResult<TId>> SortByCount<TId>(AggregateExpressionDefinition<TResult, TId> id);
328328

329+
/// <summary>
330+
/// Executes an aggregation pipeline that writes the results to a collection.
331+
/// </summary>
332+
/// <param name="cancellationToken">The cancellation token.</param>
333+
void ToCollection(CancellationToken cancellationToken = default(CancellationToken));
334+
335+
/// <summary>
336+
/// Executes an aggregation pipeline that writes the results to a collection.
337+
/// </summary>
338+
/// <param name="cancellationToken">The cancellation token.</param>
339+
/// <returns>A Task.</returns>
340+
Task ToCollectionAsync(CancellationToken cancellationToken = default(CancellationToken));
341+
329342
/// <summary>
330343
/// Appends an unwind stage to the pipeline.
331344
/// </summary>

src/MongoDB.Driver/IMongoCollection.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,46 @@ public interface IMongoCollection<TDocument>
104104
/// </returns>
105105
Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
106106

107+
/// <summary>
108+
/// Runs an aggregation pipeline whose results are written to a collection.
109+
/// </summary>
110+
/// <typeparam name="TResult">The type of the result.</typeparam>
111+
/// <param name="pipeline">The pipeline.</param>
112+
/// <param name="options">The options.</param>
113+
/// <param name="cancellationToken">The cancellation token.</param>
114+
void AggregateToCollection<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
115+
116+
/// <summary>
117+
/// Runs an aggregation pipeline whose results are written to a collection.
118+
/// </summary>
119+
/// <typeparam name="TResult">The type of the result.</typeparam>
120+
/// <param name="session">The session.</param>
121+
/// <param name="pipeline">The pipeline.</param>
122+
/// <param name="options">The options.</param>
123+
/// <param name="cancellationToken">The cancellation token.</param>
124+
void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
125+
126+
/// <summary>
127+
/// Runs an aggregation pipeline whose results are written to a collection.
128+
/// </summary>
129+
/// <typeparam name="TResult">The type of the result.</typeparam>
130+
/// <param name="pipeline">The pipeline.</param>
131+
/// <param name="options">The options.</param>
132+
/// <param name="cancellationToken">The cancellation token.</param>
133+
/// <returns>A Task.</returns>
134+
Task AggregateToCollectionAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
135+
136+
/// <summary>
137+
/// Runs an aggregation pipeline whose results are written to a collection.
138+
/// </summary>
139+
/// <typeparam name="TResult">The type of the result.</typeparam>
140+
/// <param name="session">The session.</param>
141+
/// <param name="pipeline">The pipeline.</param>
142+
/// <param name="options">The options.</param>
143+
/// <param name="cancellationToken">The cancellation token.</param>
144+
/// <returns>A Task.</returns>
145+
Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
146+
107147
/// <summary>
108148
/// Performs multiple write operations.
109149
/// </summary>

src/MongoDB.Driver/IMongoDatabase.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,46 @@ public interface IMongoDatabase
9292
/// </returns>
9393
Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
9494

95+
/// <summary>
96+
/// Runs an aggregation pipeline.
97+
/// </summary>
98+
/// <typeparam name="TResult">The type of the result.</typeparam>
99+
/// <param name="pipeline">The pipeline.</param>
100+
/// <param name="options">The options.</param>
101+
/// <param name="cancellationToken">The cancellation token.</param>
102+
void AggregateToCollection<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
103+
104+
/// <summary>
105+
/// Runs an aggregation pipeline.
106+
/// </summary>
107+
/// <typeparam name="TResult">The type of the result.</typeparam>
108+
/// <param name="session">The session.</param>
109+
/// <param name="pipeline">The pipeline.</param>
110+
/// <param name="options">The options.</param>
111+
/// <param name="cancellationToken">The cancellation token.</param>
112+
void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
113+
114+
/// <summary>
115+
/// Runs an aggregation pipeline.
116+
/// </summary>
117+
/// <typeparam name="TResult">The type of the result.</typeparam>
118+
/// <param name="pipeline">The pipeline.</param>
119+
/// <param name="options">The options.</param>
120+
/// <param name="cancellationToken">The cancellation token.</param>
121+
/// <returns>A Task.</returns>
122+
Task AggregateToCollectionAsync<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
123+
124+
/// <summary>
125+
/// Runs an aggregation pipeline.
126+
/// </summary>
127+
/// <typeparam name="TResult">The type of the result.</typeparam>
128+
/// <param name="session">The session.</param>
129+
/// <param name="pipeline">The pipeline.</param>
130+
/// <param name="options">The options.</param>
131+
/// <param name="cancellationToken">The cancellation token.</param>
132+
/// <returns>A Task.</returns>
133+
Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
134+
95135
/// <summary>
96136
/// Creates the collection with the specified name.
97137
/// </summary>

src/MongoDB.Driver/MongoCollectionBase.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,30 @@ public abstract class MongoCollectionBase<TDocument> : IMongoCollection<TDocumen
6666
throw new NotImplementedException();
6767
}
6868

69+
/// <inheritdoc />
70+
public virtual void AggregateToCollection<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
71+
{
72+
throw new NotImplementedException();
73+
}
74+
75+
/// <inheritdoc />
76+
public virtual void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
77+
{
78+
throw new NotImplementedException();
79+
}
80+
81+
/// <inheritdoc />
82+
public virtual Task AggregateToCollectionAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
83+
{
84+
throw new NotImplementedException();
85+
}
86+
87+
/// <inheritdoc />
88+
public virtual Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
89+
{
90+
throw new NotImplementedException();
91+
}
92+
6993
/// <inheritdoc />
7094
public virtual BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
7195
{

src/MongoDB.Driver/MongoCollectionImpl.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,50 @@ public override MongoCollectionSettings Settings
155155
}
156156
}
157157

158+
public override void AggregateToCollection<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
159+
{
160+
UsingImplicitSession(session => AggregateToCollection(session, pipeline, options, cancellationToken), cancellationToken);
161+
}
162+
163+
public override void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
164+
{
165+
Ensure.IsNotNull(session, nameof(session));
166+
var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
167+
options = options ?? new AggregateOptions();
168+
169+
var lastStage = renderedPipeline.Documents.LastOrDefault();
170+
var lastStageName = lastStage?.GetElement(0).Name;
171+
if (lastStage == null || (lastStageName != "$out" && lastStageName != "$merge"))
172+
{
173+
throw new InvalidOperationException("AggregateToCollection requires that the last stage be $out or $merge.");
174+
}
175+
176+
var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
177+
ExecuteWriteOperation(session, aggregateOperation, cancellationToken);
178+
}
179+
180+
public override Task AggregateToCollectionAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
181+
{
182+
return UsingImplicitSessionAsync(session => AggregateToCollectionAsync(session, pipeline, options, cancellationToken), cancellationToken);
183+
}
184+
185+
public override async Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
186+
{
187+
Ensure.IsNotNull(session, nameof(session));
188+
var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
189+
options = options ?? new AggregateOptions();
190+
191+
var lastStage = renderedPipeline.Documents.LastOrDefault();
192+
var lastStageName = lastStage?.GetElement(0).Name;
193+
if (lastStage == null || (lastStageName != "$out" && lastStageName != "$merge"))
194+
{
195+
throw new InvalidOperationException("AggregateToCollectionAsync requires that the last stage be $out or $merge.");
196+
}
197+
198+
var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
199+
await ExecuteWriteOperationAsync(session, aggregateOperation, cancellationToken).ConfigureAwait(false);
200+
}
201+
158202
public override BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
159203
{
160204
return UsingImplicitSession(session => BulkWrite(session, requests, options, cancellationToken), cancellationToken);

src/MongoDB.Driver/MongoDatabaseBase.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,30 @@ public abstract class MongoDatabaseBase : IMongoDatabase
6161
throw new NotImplementedException();
6262
}
6363

64+
/// <inheritdoc />
65+
public virtual void AggregateToCollection<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
66+
{
67+
throw new NotImplementedException();
68+
}
69+
70+
/// <inheritdoc />
71+
public virtual void AggregateToCollection<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
72+
{
73+
throw new NotImplementedException();
74+
}
75+
76+
/// <inheritdoc />
77+
public virtual Task AggregateToCollectionAsync<TResult>(PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
78+
{
79+
throw new NotImplementedException();
80+
}
81+
82+
/// <inheritdoc />
83+
public virtual Task AggregateToCollectionAsync<TResult>(IClientSessionHandle session, PipelineDefinition<NoPipelineInput, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
84+
{
85+
throw new NotImplementedException();
86+
}
87+
6488
/// <inheritdoc />
6589
public virtual void CreateCollection(string name, CreateCollectionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
6690
{

0 commit comments

Comments
 (0)