Skip to content

Commit 7c88c5c

Browse files
Divjot Arorarfblue2
authored andcommitted
Add mongo/aggregateopt
GODRIVER-272 Change-Id: I8101516ee7506501a2f5de2f7a634d33321b22c8
1 parent c6a2b29 commit 7c88c5c

File tree

3 files changed

+567
-0
lines changed

3 files changed

+567
-0
lines changed

internal/testutil/helpers/helpers.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,42 @@ import (
2020

2121
"github.com/mongodb/mongo-go-driver/core/connstring"
2222
"github.com/stretchr/testify/require"
23+
"reflect"
2324
)
2425

26+
// Test helpers
27+
28+
// IsNil returns true if the object is nil
29+
func IsNil(object interface{}) bool {
30+
if object == nil {
31+
return true
32+
}
33+
34+
value := reflect.ValueOf(object)
35+
kind := value.Kind()
36+
37+
// checking to see if type is Chan, Func, Interface, Map, Ptr, or Slice
38+
if kind >= reflect.Chan && kind <= reflect.Slice && value.IsNil() {
39+
return true
40+
}
41+
42+
return false
43+
}
44+
45+
// RequireNotNil throws an error if var is nil
46+
func RequireNotNil(t *testing.T, variable interface{}, msgFormat string, msgVars ...interface{}) {
47+
if IsNil(variable) {
48+
t.Errorf(msgFormat, msgVars...)
49+
}
50+
}
51+
52+
// RequireNil throws an error if var is not nil
53+
func RequireNil(t *testing.T, variable interface{}, msgFormat string, msgVars ...interface{}) {
54+
if !IsNil(variable) {
55+
t.Errorf(msgFormat, msgVars...)
56+
}
57+
}
58+
2559
// FindJSONFilesInDir finds the JSON files in a directory.
2660
func FindJSONFilesInDir(t *testing.T, dir string) []string {
2761
files := make([]string, 0)

mongo/aggregateopt/aggregateopt.go

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
package aggregateopt
2+
3+
import (
4+
"time"
5+
6+
"reflect"
7+
8+
"github.com/mongodb/mongo-go-driver/core/option"
9+
)
10+
11+
var aggregateBundle = new(AggregateBundle)
12+
13+
// Aggregate is options for the aggregate() function
14+
type Aggregate interface {
15+
aggregate()
16+
ConvertOption() option.Optioner
17+
}
18+
19+
// AggregateBundle is a bundle of Aggregate options
20+
type AggregateBundle struct {
21+
option Aggregate
22+
next *AggregateBundle
23+
}
24+
25+
// Implement the Aggregate interface
26+
func (ab *AggregateBundle) aggregate() {}
27+
28+
// ConvertOption implements the Aggregate interface
29+
func (ab *AggregateBundle) ConvertOption() option.Optioner { return nil }
30+
31+
// BundleAggregate bundles Aggregate options
32+
func BundleAggregate(opts ...Aggregate) *AggregateBundle {
33+
head := aggregateBundle
34+
35+
for _, opt := range opts {
36+
newBundle := AggregateBundle{
37+
option: opt,
38+
next: head,
39+
}
40+
41+
head = &newBundle
42+
}
43+
44+
return head
45+
}
46+
47+
// AllowDiskUse adds an option to allow aggregation stages to write to temporary files.
48+
func (ab *AggregateBundle) AllowDiskUse(b bool) *AggregateBundle {
49+
bundle := &AggregateBundle{
50+
option: OptAllowDiskUse(b),
51+
next: ab,
52+
}
53+
54+
return bundle
55+
}
56+
57+
// BatchSize adds an option to specify the number of documents to return in every batch.
58+
func (ab *AggregateBundle) BatchSize(i int32) *AggregateBundle {
59+
bundle := &AggregateBundle{
60+
option: OptBatchSize(i),
61+
next: ab,
62+
}
63+
64+
return bundle
65+
}
66+
67+
// BypassDocumentValidation adds an option to allow the write to opt-out of document-level validation.
68+
func (ab *AggregateBundle) BypassDocumentValidation(b bool) *AggregateBundle {
69+
bundle := &AggregateBundle{
70+
option: OptBypassDocumentValidation(b),
71+
next: ab,
72+
}
73+
74+
return bundle
75+
}
76+
77+
//Collation adds an option to specify a Collation.
78+
func (ab *AggregateBundle) Collation(c option.Collation) *AggregateBundle {
79+
bundle := &AggregateBundle{
80+
option: OptCollation{Collation: &c},
81+
next: ab,
82+
}
83+
84+
return bundle
85+
}
86+
87+
// MaxTime adds an option to specify the maximum amount of time to allow the query to run.
88+
func (ab *AggregateBundle) MaxTime(d time.Duration) *AggregateBundle {
89+
bundle := &AggregateBundle{
90+
option: OptMaxTime(d),
91+
next: ab,
92+
}
93+
94+
return bundle
95+
}
96+
97+
// Comment adds an option to specify a string to help trace the operation through the database profiler, currentOp, and logs.
98+
func (ab *AggregateBundle) Comment(s string) *AggregateBundle {
99+
bundle := &AggregateBundle{
100+
option: OptComment(s),
101+
next: ab,
102+
}
103+
104+
return bundle
105+
}
106+
107+
// Hint adds an option to specify the index to use for the aggregation.
108+
func (ab *AggregateBundle) Hint(hint interface{}) *AggregateBundle {
109+
bundle := &AggregateBundle{
110+
option: OptHint{hint},
111+
next: ab,
112+
}
113+
114+
return bundle
115+
}
116+
117+
// Calculates the total length of a bundle, accounting for nested bundles.
118+
func (ab *AggregateBundle) bundleLength() int {
119+
if ab == nil {
120+
return 0
121+
}
122+
123+
bundleLen := 0
124+
for ; ab != nil && ab.option != nil; ab = ab.next {
125+
if converted, ok := ab.option.(*AggregateBundle); ok {
126+
// nested bundle
127+
bundleLen += converted.bundleLength()
128+
continue
129+
}
130+
131+
bundleLen++
132+
}
133+
134+
return bundleLen
135+
}
136+
137+
// Unbundle transforms a bundle into a slice of options, optionally deduplicating
138+
func (ab *AggregateBundle) Unbundle(deduplicate bool) ([]option.Optioner, error) {
139+
140+
options, err := ab.unbundle()
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
if !deduplicate {
146+
return options, nil
147+
}
148+
149+
// iterate backwards and make dedup slice
150+
optionsSet := make(map[reflect.Type]struct{})
151+
152+
for i := len(options) - 1; i >= 0; i-- {
153+
currOption := options[i]
154+
optionType := reflect.TypeOf(currOption)
155+
156+
if _, ok := optionsSet[optionType]; ok {
157+
// option already found
158+
options = append(options[:i], options[i+1:]...)
159+
continue
160+
}
161+
162+
optionsSet[optionType] = struct{}{}
163+
}
164+
165+
return options, nil
166+
}
167+
168+
// Helper that recursively unwraps bundle into slice of options
169+
func (ab *AggregateBundle) unbundle() ([]option.Optioner, error) {
170+
if ab == nil {
171+
return nil, nil
172+
}
173+
174+
listLen := ab.bundleLength()
175+
176+
options := make([]option.Optioner, listLen)
177+
index := listLen - 1
178+
179+
for listHead := ab; listHead != nil && listHead.option != nil; listHead = listHead.next {
180+
// if the current option is a nested bundle, Unbundle it and add its options to the current array
181+
if converted, ok := listHead.option.(*AggregateBundle); ok {
182+
nestedOptions, err := converted.unbundle()
183+
if err != nil {
184+
return nil, err
185+
}
186+
187+
// where to start inserting nested options
188+
startIndex := index - len(nestedOptions) + 1
189+
190+
// add nested options in order
191+
for _, nestedOp := range nestedOptions {
192+
options[startIndex] = nestedOp
193+
startIndex++
194+
}
195+
index -= len(nestedOptions)
196+
continue
197+
}
198+
199+
options[index] = listHead.option.ConvertOption()
200+
index--
201+
}
202+
203+
return options, nil
204+
}
205+
206+
// String implements the Stringer interface
207+
func (ab *AggregateBundle) String() string {
208+
if ab == nil {
209+
return ""
210+
}
211+
212+
str := ""
213+
for head := ab; head != nil && head.option != nil; head = head.next {
214+
if converted, ok := head.option.(*AggregateBundle); ok {
215+
str += converted.String()
216+
continue
217+
}
218+
219+
str += head.option.ConvertOption().String() + "\n"
220+
}
221+
222+
return str
223+
}
224+
225+
// AllowDiskUse allows aggregation stages to write to temporary files.
226+
func AllowDiskUse(b bool) OptAllowDiskUse {
227+
return OptAllowDiskUse(b)
228+
}
229+
230+
// BatchSize specifies the number of documents to return in every batch.
231+
func BatchSize(i int32) OptBatchSize {
232+
return OptBatchSize(i)
233+
}
234+
235+
// BypassDocumentValidation allows the write to opt-out of document-level validation.
236+
func BypassDocumentValidation(b bool) OptBypassDocumentValidation {
237+
return OptBypassDocumentValidation(b)
238+
}
239+
240+
// Collation specifies a collation.
241+
func Collation(c option.Collation) OptCollation {
242+
return OptCollation{Collation: &c}
243+
}
244+
245+
// MaxTime specifies the maximum amount of time to allow the query to run.
246+
func MaxTime(d time.Duration) OptMaxTime {
247+
return OptMaxTime(d)
248+
}
249+
250+
// Comment allows users to specify a string to help trace the operation through the database profiler, currentOp, and logs.
251+
func Comment(s string) OptComment {
252+
return OptComment(s)
253+
}
254+
255+
// Hint specifies the index to use for the aggregation.
256+
func Hint(hint interface{}) OptHint {
257+
return OptHint{hint}
258+
}
259+
260+
// OptAllowDiskUse allows aggregation stages to write to temporary files.
261+
type OptAllowDiskUse option.OptAllowDiskUse
262+
263+
func (OptAllowDiskUse) aggregate() {}
264+
265+
// ConvertOption implements the Aggregate interface
266+
func (opt OptAllowDiskUse) ConvertOption() option.Optioner {
267+
return option.OptAllowDiskUse(opt)
268+
}
269+
270+
// OptBatchSize specifies the number of documents to return in every batch.
271+
type OptBatchSize option.OptBatchSize
272+
273+
func (OptBatchSize) aggregate() {}
274+
275+
// ConvertOption implements the Aggregate interface
276+
func (opt OptBatchSize) ConvertOption() option.Optioner {
277+
return option.OptBatchSize(opt)
278+
}
279+
280+
// OptBypassDocumentValidation allows the write to opt-out of document-level validation.
281+
type OptBypassDocumentValidation option.OptBypassDocumentValidation
282+
283+
// ConvertOption implements the Aggregate interface
284+
func (opt OptBypassDocumentValidation) ConvertOption() option.Optioner {
285+
return option.OptBypassDocumentValidation(opt)
286+
}
287+
288+
func (OptBypassDocumentValidation) aggregate() {}
289+
290+
// OptCollation specifies a collation.
291+
type OptCollation option.OptCollation
292+
293+
func (OptCollation) aggregate() {}
294+
295+
// ConvertOption implements the Aggregate interface
296+
func (opt OptCollation) ConvertOption() option.Optioner {
297+
return option.OptCollation(opt)
298+
}
299+
300+
// OptMaxTime specifies the maximum amount of time to allow the query to run.
301+
type OptMaxTime option.OptMaxTime
302+
303+
func (OptMaxTime) aggregate() {}
304+
305+
// ConvertOption implements the Aggregate interface
306+
func (opt OptMaxTime) ConvertOption() option.Optioner {
307+
return option.OptMaxTime(opt)
308+
}
309+
310+
// OptComment allows users to specify a string to help trace the operation through the database profiler, currentOp, and logs.
311+
type OptComment option.OptComment
312+
313+
func (OptComment) aggregate() {}
314+
315+
// ConvertOption implements the Aggregate interface
316+
func (opt OptComment) ConvertOption() option.Optioner {
317+
return option.OptComment(opt)
318+
}
319+
320+
// OptHint specifies the index to use for the aggregation.
321+
type OptHint option.OptHint
322+
323+
func (OptHint) aggregate() {}
324+
325+
// ConvertOption implements the Aggregate interface
326+
func (opt OptHint) ConvertOption() option.Optioner {
327+
return option.OptHint(opt)
328+
}

0 commit comments

Comments
 (0)