Skip to content

Commit d781a7b

Browse files
committed
CSHARP-1716: Added support for $bucket aggregation stage.
1 parent 5d20d20 commit d781a7b

File tree

8 files changed

+519
-0
lines changed

8 files changed

+519
-0
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class Feature
2525
#region static
2626
private static readonly Feature __aggregate = new Feature("Aggregate", new SemanticVersion(2, 2, 0));
2727
private static readonly Feature __aggregateAllowDiskUse = new Feature("AggregateAllowDiskUse", new SemanticVersion(2, 6, 0));
28+
private static readonly Feature __aggregateBucketStage = new Feature("AggregateBucketStage", new SemanticVersion(3, 3, 11));
2829
private static readonly Feature __aggregateCountStage = new Feature("AggregateCountStage", new SemanticVersion(3, 3, 11));
2930
private static readonly Feature __aggregateCursorResult = new Feature("AggregateCursorResult", new SemanticVersion(2, 6, 0));
3031
private static readonly Feature __aggregateExplain = new Feature("AggregateExplain", new SemanticVersion(2, 6, 0));
@@ -61,6 +62,11 @@ public class Feature
6162
/// </summary>
6263
public static Feature AggregateAllowDiskUse => __aggregateAllowDiskUse;
6364

65+
/// <summary>
66+
/// Gets the aggregate bucket stage feature.
67+
/// </summary>
68+
public static Feature AggregateBucketStage => __aggregateBucketStage;
69+
6470
/// <summary>
6571
/// Gets the aggregate count stage feature.
6672
/// </summary>
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/* Copyright 2016 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
using System.Text;
20+
using System.Threading.Tasks;
21+
using MongoDB.Bson.Serialization.Attributes;
22+
23+
namespace MongoDB.Driver
24+
{
25+
/// <summary>
26+
/// Represents the result of the $bucket stage.
27+
/// </summary>
28+
/// <typeparam name="TValue">The type of the value.</typeparam>
29+
public class AggregateBucketResult<TValue>
30+
{
31+
// constructors
32+
/// <summary>
33+
/// Initializes a new instance of the <see cref="AggregateBucketResult{TValue}"/> class.
34+
/// </summary>
35+
/// <param name="id">The inclusive lower boundary of the bucket.</param>
36+
/// <param name="count">The count.</param>
37+
public AggregateBucketResult(TValue id, long count)
38+
{
39+
Id = id;
40+
Count = count;
41+
}
42+
43+
// public properties
44+
/// <summary>
45+
/// Gets the inclusive lower boundary of the bucket.
46+
/// </summary>
47+
/// <value>
48+
/// The inclusive lower boundary of the bucket.
49+
/// </value>
50+
[BsonId]
51+
public TValue Id { get; private set; }
52+
53+
/// <summary>
54+
/// Gets the count.
55+
/// </summary>
56+
/// <value>
57+
/// The count.
58+
/// </value>
59+
[BsonElement("count")]
60+
public long Count { get; private set; }
61+
62+
// public methods
63+
/// <inheritdoc/>
64+
public override bool Equals(object obj)
65+
{
66+
var other = obj as AggregateBucketResult<TValue>;
67+
if (object.ReferenceEquals(other, null))
68+
{
69+
return false;
70+
}
71+
72+
return
73+
Id.Equals(other.Id) &&
74+
Count == other.Count;
75+
}
76+
77+
/// <inheritdoc/>
78+
public override int GetHashCode()
79+
{
80+
return 0;
81+
}
82+
}
83+
}

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,76 @@ public override IAggregateFluent<TNewResult> As<TNewResult>(IBsonSerializer<TNew
6767
return Project(projection);
6868
}
6969

70+
public override IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TValue>(
71+
AggregateExpressionDefinition<TResult, TValue> groupBy,
72+
IEnumerable<TValue> boundaries,
73+
Optional<TValue> defaultBucket = default(Optional<TValue>))
74+
{
75+
const string operatorName = "$bucket";
76+
var stage = new DelegatedPipelineStageDefinition<TResult, AggregateBucketResult<TValue>>(
77+
operatorName,
78+
(s, sr) =>
79+
{
80+
var valueSerializer = sr.GetSerializer<TValue>();
81+
var renderedGroupBy = groupBy.Render(s, sr);
82+
var serializedBoundaries = boundaries.Select(b => valueSerializer.ToBsonValue(b));
83+
var serializedDefaultBucket = defaultBucket.HasValue ? valueSerializer.ToBsonValue(defaultBucket.Value) : null;
84+
var document = new BsonDocument
85+
{
86+
{ operatorName, new BsonDocument
87+
{
88+
{ "groupBy", renderedGroupBy },
89+
{ "boundaries", new BsonArray(serializedBoundaries) },
90+
{ "default", serializedDefaultBucket, serializedDefaultBucket != null }
91+
}
92+
}
93+
};
94+
return new RenderedPipelineStageDefinition<AggregateBucketResult<TValue>>(
95+
operatorName,
96+
document,
97+
sr.GetSerializer<AggregateBucketResult<TValue>>());
98+
});
99+
100+
return AppendStage(stage);
101+
}
102+
103+
public override IAggregateFluent<TNewResult> Bucket<TValue, TNewResult>(
104+
AggregateExpressionDefinition<TResult, TValue> groupBy,
105+
IEnumerable<TValue> boundaries,
106+
ProjectionDefinition<TResult, TNewResult> output,
107+
Optional<TValue> defaultBucket = default(Optional<TValue>))
108+
{
109+
const string operatorName = "$bucket";
110+
var stage = new DelegatedPipelineStageDefinition<TResult, TNewResult>(
111+
operatorName,
112+
(s, sr) =>
113+
{
114+
var valueSerializer = sr.GetSerializer<TValue>();
115+
var newResultSerializer = sr.GetSerializer<TNewResult>();
116+
var renderedGroupBy = groupBy.Render(s, sr);
117+
var serializedBoundaries = boundaries.Select(b => valueSerializer.ToBsonValue(b));
118+
var serializedDefaultBucket = defaultBucket.HasValue ? valueSerializer.ToBsonValue(defaultBucket.Value) : null;
119+
var renderedOutput = output.Render(s, sr);
120+
var document = new BsonDocument
121+
{
122+
{ operatorName, new BsonDocument
123+
{
124+
{ "groupBy", renderedGroupBy },
125+
{ "boundaries", new BsonArray(serializedBoundaries) },
126+
{ "default", serializedDefaultBucket, serializedDefaultBucket != null },
127+
{ "output", renderedOutput.Document }
128+
}
129+
}
130+
};
131+
return new RenderedPipelineStageDefinition<TNewResult>(
132+
operatorName,
133+
document,
134+
newResultSerializer);
135+
});
136+
137+
return AppendStage(stage);
138+
}
139+
70140
public override IAggregateFluent<AggregateCountResult> Count()
71141
{
72142
const string operatorName = "$count";

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,25 @@ public abstract class AggregateFluentBase<TResult> : IOrderedAggregateFluent<TRe
4040
/// <inheritdoc />
4141
public abstract IAggregateFluent<TNewResult> As<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer);
4242

43+
/// <inheritdoc />
44+
public virtual IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TValue>(
45+
AggregateExpressionDefinition<TResult, TValue> groupBy,
46+
IEnumerable<TValue> boundaries,
47+
Optional<TValue> defaultBucket = default(Optional<TValue>))
48+
{
49+
throw new NotImplementedException();
50+
}
51+
52+
/// <inheritdoc />
53+
public virtual IAggregateFluent<TNewResult> Bucket<TValue, TNewResult>(
54+
AggregateExpressionDefinition<TResult, TValue> groupBy,
55+
IEnumerable<TValue> boundaries,
56+
ProjectionDefinition<TResult, TNewResult> output,
57+
Optional<TValue> defaultBucket = default(Optional<TValue>))
58+
{
59+
throw new NotImplementedException();
60+
}
61+
4362
/// <inheritdoc />
4463
public virtual IAggregateFluent<AggregateCountResult> Count()
4564
{

src/MongoDB.Driver/IAggregateFluent.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,35 @@ public interface IAggregateFluent<TResult> : IAsyncCursorSource<TResult>
5858
/// <returns>The fluent aggregate interface.</returns>
5959
IAggregateFluent<TNewResult> As<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer = null);
6060

61+
/// <summary>
62+
/// Appends a $bucket stage to the pipeline.
63+
/// </summary>
64+
/// <typeparam name="TValue">The type of the value.</typeparam>
65+
/// <param name="groupBy">The expression providing the value to group by.</param>
66+
/// <param name="boundaries">The bucket boundaries.</param>
67+
/// <param name="defaultBucket">The default bucket (optional).</param>
68+
/// <returns>The fluent aggregate interface.</returns>
69+
IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TValue>(
70+
AggregateExpressionDefinition<TResult, TValue> groupBy,
71+
IEnumerable<TValue> boundaries,
72+
Optional<TValue> defaultBucket = default(Optional<TValue>));
73+
74+
/// <summary>
75+
/// Appends a $bucket stage to the pipeline with a custom projection.
76+
/// </summary>
77+
/// <typeparam name="TValue">The type of the value.</typeparam>
78+
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
79+
/// <param name="groupBy">The expression providing the value to group by.</param>
80+
/// <param name="boundaries">The bucket boundaries.</param>
81+
/// <param name="output">The output projection.</param>
82+
/// <param name="defaultBucket">The default bucket (optional).</param>
83+
/// <returns>The fluent aggregate interface.</returns>
84+
IAggregateFluent<TNewResult> Bucket<TValue, TNewResult>(
85+
AggregateExpressionDefinition<TResult, TValue> groupBy,
86+
IEnumerable<TValue> boundaries,
87+
ProjectionDefinition<TResult, TNewResult> output,
88+
Optional<TValue> defaultBucket = default(Optional<TValue>));
89+
6190
/// <summary>
6291
/// Appends a count stage to the pipeline.
6392
/// </summary>

src/MongoDB.Driver/IAggregateFluentExtensions.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Collections.Generic;
1718
using System.Linq;
1819
using System.Linq.Expressions;
1920
using System.Threading;
@@ -31,6 +32,58 @@ namespace MongoDB.Driver
3132
/// </summary>
3233
public static class IAggregateFluentExtensions
3334
{
35+
/// <summary>
36+
/// Appends a $bucket stage to the pipeline.
37+
/// </summary>
38+
/// <typeparam name="TResult">The type of the result.</typeparam>
39+
/// <typeparam name="TValue">The type of the value.</typeparam>
40+
/// <param name="aggregate">The aggregate.</param>
41+
/// <param name="groupBy">The expression providing the value to group by.</param>
42+
/// <param name="boundaries">The bucket boundaries.</param>
43+
/// <param name="defaultBucket">The default bucket (optional).</param>
44+
/// <returns>The fluent aggregate interface.</returns>
45+
public static IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TResult, TValue>(
46+
this IAggregateFluent<TResult> aggregate,
47+
Expression<Func<TResult, TValue>> groupBy,
48+
IEnumerable<TValue> boundaries,
49+
Optional<TValue> defaultBucket = default(Optional<TValue>))
50+
{
51+
Ensure.IsNotNull(aggregate, nameof(aggregate));
52+
Ensure.IsNotNull(groupBy, nameof(groupBy));
53+
Ensure.IsNotNull(boundaries, nameof(boundaries));
54+
55+
var groupByDefinition = new ExpressionAggregateExpressionDefinition<TResult, TValue>(groupBy, aggregate.Options.TranslationOptions);
56+
return aggregate.Bucket(groupByDefinition, boundaries, defaultBucket);
57+
}
58+
59+
/// <summary>
60+
/// Appends a $bucket stage to the pipeline.
61+
/// </summary>
62+
/// <typeparam name="TResult">The type of the result.</typeparam>
63+
/// <typeparam name="TValue">The type of the value.</typeparam>
64+
/// <typeparam name="TNewResult">The type of the new result.</typeparam>
65+
/// <param name="aggregate">The aggregate.</param>
66+
/// <param name="groupBy">The expression providing the value to group by.</param>
67+
/// <param name="boundaries">The bucket boundaries.</param>
68+
/// <param name="output">The output projection.</param>
69+
/// <param name="defaultBucket">The default bucket (optional).</param>
70+
/// <returns>The fluent aggregate interface.</returns>
71+
public static IAggregateFluent<TNewResult> Bucket<TResult, TValue, TNewResult>(
72+
this IAggregateFluent<TResult> aggregate,
73+
Expression<Func<TResult, TValue>> groupBy,
74+
IEnumerable<TValue> boundaries,
75+
Expression<Func<IGrouping<TValue, TResult>, TNewResult>> output,
76+
Optional<TValue> defaultBucket = default(Optional<TValue>))
77+
{
78+
Ensure.IsNotNull(aggregate, nameof(aggregate));
79+
Ensure.IsNotNull(groupBy, nameof(groupBy));
80+
Ensure.IsNotNull(boundaries, nameof(boundaries));
81+
82+
var groupByDefinition = new ExpressionAggregateExpressionDefinition<TResult, TValue>(groupBy, aggregate.Options.TranslationOptions);
83+
var outputDefinition = new ExpressionBucketOutputProjection<TResult, TValue, TNewResult>(x => default(TValue), output, aggregate.Options.TranslationOptions);
84+
return aggregate.Bucket(groupByDefinition, boundaries, outputDefinition, defaultBucket);
85+
}
86+
3487
/// <summary>
3588
/// Appends a group stage to the pipeline.
3689
/// </summary>
@@ -588,5 +641,36 @@ public override RenderedProjectionDefinition<TNewResult> Render(IBsonSerializer<
588641
return AggregateGroupTranslator.Translate<TKey, TResult, TNewResult>(_idExpression, _groupExpression, documentSerializer, serializerRegistry, _translationOptions);
589642
}
590643
}
644+
645+
private sealed class ExpressionBucketOutputProjection<TResult, TValue, TNewResult> : ProjectionDefinition<TResult, TNewResult>
646+
{
647+
private readonly Expression<Func<IGrouping<TValue, TResult>, TNewResult>> _outputExpression;
648+
private readonly ExpressionTranslationOptions _translationOptions;
649+
private readonly Expression<Func<TResult, TValue>> _valueExpression;
650+
651+
public ExpressionBucketOutputProjection(
652+
Expression<Func<TResult, TValue>> valueExpression,
653+
Expression<Func<IGrouping<TValue, TResult>, TNewResult>> outputExpression,
654+
ExpressionTranslationOptions translationOptions)
655+
{
656+
_valueExpression = Ensure.IsNotNull(valueExpression, nameof(valueExpression));
657+
_outputExpression = Ensure.IsNotNull(outputExpression, nameof(outputExpression));
658+
_translationOptions = translationOptions; // can be null
659+
660+
}
661+
662+
public Expression<Func<IGrouping<TValue, TResult>, TNewResult>> OutputExpression
663+
{
664+
get { return _outputExpression; }
665+
}
666+
667+
public override RenderedProjectionDefinition<TNewResult> Render(IBsonSerializer<TResult> documentSerializer, IBsonSerializerRegistry serializerRegistry)
668+
{
669+
var renderedOutput = AggregateGroupTranslator.Translate<TValue, TResult, TNewResult>(_valueExpression, _outputExpression, documentSerializer, serializerRegistry, _translationOptions);
670+
var document = renderedOutput.Document;
671+
document.Remove("_id");
672+
return new RenderedProjectionDefinition<TNewResult>(document, renderedOutput.ProjectionSerializer);
673+
}
674+
}
591675
}
592676
}

src/MongoDB.Driver/MongoDB.Driver.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
<Compile Include="..\MongoDB.Shared\Hasher.cs">
7878
<Link>Support\Hasher.cs</Link>
7979
</Compile>
80+
<Compile Include="AggregateBucketResult.cs" />
8081
<Compile Include="AggregateCountResult.cs" />
8182
<Compile Include="AggregateExpressionDefinition.cs" />
8283
<Compile Include="AggregateFluent.cs" />

0 commit comments

Comments
 (0)