Skip to content

Commit 8be5914

Browse files
committed
CSHARP-3024: Support $out to a different database.
1 parent dc3849c commit 8be5914

File tree

15 files changed

+476
-139
lines changed

15 files changed

+476
-139
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class Feature
3737
private static readonly Feature __aggregateLet = new Feature("AggregateLet", new SemanticVersion(3, 6, 0));
3838
private static readonly Feature __aggregateMerge = new Feature("AggregateMerge", new SemanticVersion(4, 2, 0));
3939
private static readonly Feature __aggregateOut = new Feature("AggregateOut", new SemanticVersion(2, 6, 0));
40+
private static readonly Feature __aggregateOutToDifferentDatabase = new Feature("AggregateOutToDifferentDatabase", new SemanticVersion(4, 3, 0));
4041
private static readonly Feature __aggregateToString = new Feature("AggregateToString", new SemanticVersion(4, 0, 0));
4142
private static readonly ArrayFiltersFeature __arrayFilters = new ArrayFiltersFeature("ArrayFilters", new SemanticVersion(3, 5, 11));
4243
private static readonly Feature __bypassDocumentValidation = new Feature("BypassDocumentValidation", new SemanticVersion(3, 2, 0));
@@ -159,6 +160,11 @@ public class Feature
159160
/// </summary>
160161
public static Feature AggregateOut => __aggregateOut;
161162

163+
/// <summary>
164+
/// Gets the aggregate out to a different database feature.
165+
/// </summary>
166+
public static Feature AggregateOutToDifferentDatabase => __aggregateOutToDifferentDatabase;
167+
162168
/// <summary>
163169
/// Gets the aggregate toString feature.
164170
/// </summary>

src/MongoDB.Driver.Core/Core/Operations/AggregateToCollectionOperation.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public AggregateToCollectionOperation(DatabaseNamespace databaseNamespace, IEnum
6161
_messageEncoderSettings = Ensure.IsNotNull(messageEncoderSettings, nameof(messageEncoderSettings));
6262

6363
EnsureIsOutputToCollectionPipeline();
64+
_pipeline = SimplifyOutStageIfOutputDatabaseIsSameAsInputDatabase(_pipeline);
6465
}
6566

6667
/// <summary>
@@ -288,5 +289,28 @@ private void EnsureIsOutputToCollectionPipeline()
288289
throw new ArgumentException("The last stage of the pipeline for an AggregateOutputToCollectionOperation must have a $out or $merge operator.", "pipeline");
289290
}
290291
}
292+
293+
private IReadOnlyList<BsonDocument> SimplifyOutStageIfOutputDatabaseIsSameAsInputDatabase(IReadOnlyList<BsonDocument> pipeline)
294+
{
295+
var lastStage = pipeline.Last();
296+
var lastStageName = lastStage.GetElement(0).Name;
297+
if (lastStageName == "$out" && lastStage["$out"] is BsonDocument outDocument)
298+
{
299+
var outputDatabaseName = outDocument["db"].AsString;
300+
if (outputDatabaseName == _databaseNamespace.DatabaseName)
301+
{
302+
var outputCollectionName = outDocument["coll"].AsString;
303+
var simplifiedOutStage = lastStage.Clone().AsBsonDocument;
304+
simplifiedOutStage["$out"] = outputCollectionName;
305+
306+
var modifiedPipeline = new List<BsonDocument>(pipeline);
307+
modifiedPipeline[modifiedPipeline.Count - 1] = simplifiedOutStage;
308+
309+
return modifiedPipeline;
310+
}
311+
}
312+
313+
return pipeline; // unchanged
314+
}
291315
}
292316
}

src/MongoDB.Driver.Legacy/MongoCollection.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,20 @@ private IEnumerable<BsonDocument> Aggregate(IClientSessionHandle session, Aggreg
165165
{
166166
case "$out":
167167
{
168-
var outputCollectionName = lastStage[0].AsString;
169-
outputCollectionNamespace = new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName);
168+
var outValue = lastStage[0];
169+
DatabaseNamespace outputDatabaseNamespace;
170+
string outputCollectionName;
171+
if (outValue.IsString)
172+
{
173+
outputDatabaseNamespace = _collectionNamespace.DatabaseNamespace;
174+
outputCollectionName = outValue.AsString;
175+
}
176+
else
177+
{
178+
outputDatabaseNamespace = new DatabaseNamespace(outValue["db"].AsString);
179+
outputCollectionName = outValue["coll"].AsString;
180+
}
181+
outputCollectionNamespace = new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
170182
}
171183
break;
172184
case "$merge":

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,32 @@ public override IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<
177177
return WithPipeline(_pipeline.OfType(newResultSerializer));
178178
}
179179

180+
public override IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
181+
{
182+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
183+
var aggregate = WithPipeline(_pipeline.Out(outputCollection));
184+
return aggregate.ToCursor(cancellationToken);
185+
}
186+
180187
public override IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken)
181188
{
182189
Ensure.IsNotNull(collectionName, nameof(collectionName));
183190
var outputCollection = Database.GetCollection<TResult>(collectionName);
191+
return Out(outputCollection, cancellationToken);
192+
}
193+
194+
public override Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
195+
{
196+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
184197
var aggregate = WithPipeline(_pipeline.Out(outputCollection));
185-
return aggregate.ToCursor(cancellationToken);
198+
return aggregate.ToCursorAsync(cancellationToken);
186199
}
187200

188201
public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken)
189202
{
190203
Ensure.IsNotNull(collectionName, nameof(collectionName));
191204
var outputCollection = Database.GetCollection<TResult>(collectionName);
192-
var aggregate = WithPipeline(_pipeline.Out(outputCollection));
193-
return aggregate.ToCursorAsync(cancellationToken);
205+
return OutAsync(outputCollection, cancellationToken);
194206
}
195207

196208
public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection)

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,24 @@ public virtual Task<IAsyncCursor<TOutput>> MergeAsync<TOutput>(IMongoCollection<
161161
/// <inheritdoc />
162162
public abstract IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer) where TNewResult : TResult;
163163

164+
/// <inheritdoc />
165+
public virtual IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
166+
{
167+
throw new NotImplementedException();
168+
}
169+
164170
/// <inheritdoc />
165171
public virtual IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken)
166172
{
167173
throw new NotImplementedException();
168174
}
169175

176+
/// <inheritdoc />
177+
public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
178+
{
179+
throw new NotImplementedException();
180+
}
181+
170182
/// <inheritdoc />
171183
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken);
172184

src/MongoDB.Driver/IAggregateFluent.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,14 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
262262
/// <returns>The fluent aggregate interface.</returns>
263263
IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer = null) where TNewResult : TResult;
264264

265+
/// <summary>
266+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
267+
/// </summary>
268+
/// <param name="outputCollection">The output collection.</param>
269+
/// <param name="cancellationToken">The cancellation token.</param>
270+
/// <returns>A cursor.</returns>
271+
IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken = default(CancellationToken));
272+
265273
/// <summary>
266274
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
267275
/// </summary>
@@ -270,6 +278,14 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
270278
/// <returns>A cursor.</returns>
271279
IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken = default(CancellationToken));
272280

281+
/// <summary>
282+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
283+
/// </summary>
284+
/// <param name="outputCollection">The output collection.</param>
285+
/// <param name="cancellationToken">The cancellation token.</param>
286+
/// <returns>A Task whose result is a cursor.</returns>
287+
Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken = default(CancellationToken));
288+
273289
/// <summary>
274290
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
275291
/// </summary>

src/MongoDB.Driver/MongoCollectionImpl.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,8 +786,20 @@ private FindOperation<TResult> CreateAggregateToCollectionFindOperation<TResult>
786786
{
787787
case "$out":
788788
{
789-
var outputCollectionName = outStage[0].AsString;
790-
outputCollectionNamespace = new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName);
789+
var outValue = outStage[0];
790+
DatabaseNamespace outputDatabaseNamespace;
791+
string outputCollectionName;
792+
if (outValue.IsString)
793+
{
794+
outputDatabaseNamespace = _collectionNamespace.DatabaseNamespace;
795+
outputCollectionName = outValue.AsString;
796+
}
797+
else
798+
{
799+
outputDatabaseNamespace = new DatabaseNamespace(outValue["db"].AsString);
800+
outputCollectionName = outValue["coll"].AsString;
801+
}
802+
outputCollectionNamespace = new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
791803
}
792804
break;
793805
case "$merge":

src/MongoDB.Driver/MongoDatabaseImpl.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,8 +537,20 @@ private FindOperation<TResult> CreateAggregateToCollectionFindOperation<TResult>
537537
{
538538
case "$out":
539539
{
540-
var outputCollectionName = outStage[0].AsString;
541-
outputCollectionNamespace = new CollectionNamespace(_databaseNamespace, outputCollectionName);
540+
var outValue = outStage[0];
541+
DatabaseNamespace outputDatabaseNamespace;
542+
string outputCollectionName;
543+
if (outValue.IsString)
544+
{
545+
outputDatabaseNamespace = _databaseNamespace;
546+
outputCollectionName = outValue.AsString;
547+
}
548+
else
549+
{
550+
outputDatabaseNamespace = new DatabaseNamespace(outValue["db"].AsString);
551+
outputCollectionName = outValue["coll"].AsString;
552+
}
553+
outputCollectionNamespace = new CollectionNamespace(outputDatabaseNamespace, outputCollectionName);
542554
}
543555
break;
544556
case "$merge":

src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,10 @@ public static PipelineStageDefinition<TInput, TInput> Out<TInput>(
10741074
IMongoCollection<TInput> outputCollection)
10751075
{
10761076
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
1077-
return new BsonDocumentPipelineStageDefinition<TInput, TInput>(new BsonDocument("$out", outputCollection.CollectionNamespace.CollectionName));
1077+
var outputDatabaseName = outputCollection.Database.DatabaseNamespace.DatabaseName;
1078+
var outputCollectionName = outputCollection.CollectionNamespace.CollectionName;
1079+
var outDocument = new BsonDocument { { "db", outputDatabaseName }, { "coll", outputCollectionName } };
1080+
return new BsonDocumentPipelineStageDefinition<TInput, TInput>(new BsonDocument("$out", outDocument));
10781081
}
10791082

10801083
/// <summary>

tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateToCollectionOperationTests.cs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Collections.Generic;
1718
using System.Linq;
1819
using FluentAssertions;
1920
using MongoDB.Bson;
@@ -485,15 +486,55 @@ public void CreateCommand_should_return_expected_result_when_WriteConcern_is_set
485486
[SkippableTheory]
486487
[ParameterAttributeData]
487488
public void Execute_should_return_expected_result(
488-
[Values(false, true)]
489-
bool async)
489+
[Values("$out", "$merge")] string lastStageName,
490+
[Values(false, true)] bool usingDifferentOutputDatabase,
491+
[Values(false, true)] bool async)
490492
{
491493
RequireServer.Check();
494+
var pipeline = new List<BsonDocument> { BsonDocument.Parse("{ $match : { _id : 1 } }") };
495+
var inputDatabaseName = _databaseNamespace.DatabaseName;
496+
var inputCollectionName = _collectionNamespace.CollectionName;
497+
var outputDatabaseName = usingDifferentOutputDatabase ? $"{inputDatabaseName}-outputdatabase" : inputDatabaseName;
498+
var outputCollectionName = $"{inputCollectionName}-outputcollection";
499+
switch (lastStageName)
500+
{
501+
case "$out":
502+
RequireServer.Check().Supports(Feature.AggregateOut);
503+
if (usingDifferentOutputDatabase)
504+
{
505+
RequireServer.Check().Supports(Feature.AggregateOutToDifferentDatabase);
506+
pipeline.Add(BsonDocument.Parse($"{{ $out : {{ db : '{outputDatabaseName}', coll : '{outputCollectionName}' }} }}"));
507+
}
508+
else
509+
{
510+
pipeline.Add(BsonDocument.Parse($"{{ $out : '{outputCollectionName}' }}"));
511+
}
512+
break;
513+
514+
case "$merge":
515+
RequireServer.Check().Supports(Feature.AggregateMerge);
516+
if (usingDifferentOutputDatabase)
517+
{
518+
pipeline.Add(BsonDocument.Parse($"{{ $merge : {{ into : {{ db : '{outputDatabaseName}', coll : '{outputCollectionName}' }} }} }}"));
519+
}
520+
else
521+
{
522+
pipeline.Add(BsonDocument.Parse($"{{ $merge : {{ into : '{outputDatabaseName}' }} }}"));
523+
}
524+
break;
525+
526+
default:
527+
throw new Exception($"Unexpected lastStageName: \"{lastStageName}\".");
528+
}
492529
EnsureTestData();
493-
var subject = new AggregateToCollectionOperation(_collectionNamespace, __pipeline, _messageEncoderSettings);
530+
if (usingDifferentOutputDatabase)
531+
{
532+
EnsureDatabaseExists(outputDatabaseName);
533+
}
534+
var subject = new AggregateToCollectionOperation(_collectionNamespace, pipeline, _messageEncoderSettings);
494535

495536
ExecuteOperation(subject, async);
496-
var result = ReadAllFromCollection(new CollectionNamespace(_databaseNamespace, "awesome"), async);
537+
var result = ReadAllFromCollection(new CollectionNamespace(new DatabaseNamespace(outputDatabaseName), outputCollectionName), async);
497538

498539
result.Should().NotBeNull();
499540
result.Should().HaveCount(1);

0 commit comments

Comments
 (0)