Skip to content

Commit aeeb79a

Browse files
committed
Added new functions
1 parent 291ba76 commit aeeb79a

11 files changed

+390
-105
lines changed

datum.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package gorethink
22

33
import (
4-
"code.google.com/p/goprotobuf/proto"
54
"encoding/json"
65
"fmt"
7-
p "github.com/dancannon/gorethink/ql2"
86
"reflect"
7+
8+
"code.google.com/p/goprotobuf/proto"
9+
p "github.com/dancannon/gorethink/ql2"
910
)
1011

1112
// Converts a query term to a datum. If the term cannot be converted to a datum
@@ -165,7 +166,7 @@ func deconstructDatum(datum *p.Datum, opts map[string]interface{}) (interface{},
165166
func convertPseudotype(obj map[string]interface{}, opts map[string]interface{}) (interface{}, error) {
166167
if reqlType, ok := obj["$reql_type$"]; ok {
167168
if reqlType == "TIME" {
168-
// load timeformat, set to native if the option was not set
169+
// load timeFormat, set to native if the option was not set
169170
timeFormat := "native"
170171
if opt, ok := opts["time_format"]; ok {
171172
if sopt, ok := opt.(string); ok {
@@ -182,6 +183,24 @@ func convertPseudotype(obj map[string]interface{}, opts map[string]interface{})
182183
} else {
183184
return nil, fmt.Errorf("Unknown time_format run option \"%s\".", reqlType)
184185
}
186+
} else if reqlType == "GROUPED_DATA" {
187+
// load groupFormat, set to native if the option was not set
188+
groupFormat := "native"
189+
if opt, ok := opts["group_format"]; ok {
190+
if sopt, ok := opt.(string); ok {
191+
groupFormat = sopt
192+
} else {
193+
return nil, fmt.Errorf("Invalid group_format run option \"%s\".", opt)
194+
}
195+
}
196+
197+
if groupFormat == "native" {
198+
return reqlGroupedDataToObj(obj)
199+
} else if groupFormat == "raw" {
200+
return obj, nil
201+
} else {
202+
return nil, fmt.Errorf("Unknown group_format run option \"%s\".", reqlType)
203+
}
185204
} else {
186205
return obj, nil
187206
}

example_query_select_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ package gorethink_test
22

33
import (
44
"fmt"
5-
r "github.com/dancannon/gorethink"
65
"log"
6+
7+
r "github.com/dancannon/gorethink"
78
)
89

910
func ExampleRqlTerm_Get() {
@@ -64,6 +65,7 @@ func ExampleRqlTerm_GetAll_compound() {
6465
r.Db("test").Table("table").IndexCreateFunc("full_name", func(row r.RqlTerm) interface{} {
6566
return []interface{}{row.Field("first_name"), row.Field("last_name")}
6667
}).Run(sess)
68+
r.Db("test").Table("table").IndexWait().Run(sess)
6769

6870
// Fetch the row from the database
6971
row, err := r.Db("test").Table("table").GetAllByIndex("full_name", []interface{}{"John", "Smith"}).RunRow(sess)

query_aggregation.go

Lines changed: 87 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,42 @@ import (
88
// These commands are used to compute smaller values from large sequences.
99

1010
// Produce a single value from a sequence through repeated application of a
11-
// reduction function.
12-
//
13-
// The reduce function gets invoked repeatedly not only for the input values but
14-
// also for results of previous reduce invocations. The type and format of the
15-
// object that is passed in to reduce must be the same with the one returned
16-
// from reduce.
17-
func (t RqlTerm) Reduce(f, base interface{}) RqlTerm {
18-
return newRqlTermFromPrevVal(t, "Reduce", p.Term_REDUCE, []interface{}{funcWrap(f)}, map[string]interface{}{"base": base})
11+
// reduction function
12+
func (t RqlTerm) Reduce(f interface{}) RqlTerm {
13+
return newRqlTermFromPrevVal(t, "Reduce", p.Term_REDUCE, []interface{}{funcWrap(f)}, map[string]interface{}{})
1914
}
2015

21-
// Count the number of elements in the sequence.
22-
func (t RqlTerm) Count() RqlTerm {
23-
return newRqlTermFromPrevVal(t, "Count", p.Term_COUNT, []interface{}{}, map[string]interface{}{})
16+
// Remove duplicate elements from the sequence.
17+
func (t RqlTerm) Distinct() RqlTerm {
18+
return newRqlTermFromPrevVal(t, "Distinct", p.Term_DISTINCT, []interface{}{}, map[string]interface{}{})
2419
}
2520

26-
// Count the number of elements in the sequence. CountFiltered uses the argument
27-
// passed to it to filter the results before counting.
28-
func (t RqlTerm) CountFiltered(f interface{}) RqlTerm {
29-
return newRqlTermFromPrevVal(t, "Count", p.Term_COUNT, []interface{}{funcWrap(f)}, map[string]interface{}{})
21+
// Takes a stream and partitions it into multiple groups based on the
22+
// fields or functions provided. Commands chained after group will be
23+
// called on each of these grouped sub-streams, producing grouped data.
24+
func (t RqlTerm) Group(fieldOrFunctions ...interface{}) RqlTerm {
25+
for k, v := range fieldOrFunctions {
26+
fieldOrFunctions[k] = funcWrap(v)
27+
}
28+
29+
return newRqlTermFromPrevVal(t, "Group", p.Term_GROUP, fieldOrFunctions, map[string]interface{}{})
3030
}
3131

32-
// Remove duplicate elements from the sequence.
33-
func (t RqlTerm) Distinct() RqlTerm {
34-
return newRqlTermFromPrevVal(t, "Distinct", p.Term_DISTINCT, []interface{}{}, map[string]interface{}{})
32+
// Takes a stream and partitions it into multiple groups based on the
33+
// fields or functions provided. Commands chained after group will be
34+
// called on each of these grouped sub-streams, producing grouped data.
35+
func (t RqlTerm) GroupByIndex(index interface{}, fieldOrFunctions ...interface{}) RqlTerm {
36+
for k, v := range fieldOrFunctions {
37+
fieldOrFunctions[k] = funcWrap(v)
38+
}
39+
40+
return newRqlTermFromPrevVal(t, "Group", p.Term_GROUP, fieldOrFunctions, map[string]interface{}{
41+
"index": index,
42+
})
43+
}
44+
45+
func (t RqlTerm) Ungroup() RqlTerm {
46+
return newRqlTermFromPrevVal(t, "Ungroup", p.Term_UNGROUP, []interface{}{}, map[string]interface{}{})
3547
}
3648

3749
//Returns whether or not a sequence contains all the specified values, or if
@@ -48,23 +60,65 @@ func (t RqlTerm) Contains(args ...interface{}) RqlTerm {
4860
// Aggregators
4961
// These standard aggregator objects are to be used in conjunction with group_by.
5062

51-
// Count the total size of the group.
52-
func Count() RqlTerm {
53-
return Expr(map[string]interface{}{
54-
"COUNT": true,
55-
})
63+
// Count the number of elements in the sequence. With a single argument,
64+
// count the number of elements equal to it. If the argument is a function,
65+
// it is equivalent to calling filter before count.
66+
func (t RqlTerm) Count(filters ...interface{}) RqlTerm {
67+
for k, v := range filters {
68+
filters[k] = funcWrap(v)
69+
}
70+
71+
return newRqlTermFromPrevVal(t, "Count", p.Term_COUNT, filters, map[string]interface{}{})
5672
}
5773

58-
// Compute the sum of the given field in the group.
59-
func Sum(arg interface{}) RqlTerm {
60-
return Expr(map[string]interface{}{
61-
"SUM": arg,
62-
})
74+
// Sums all the elements of a sequence. If called with a field name, sums all
75+
// the values of that field in the sequence, skipping elements of the sequence
76+
// that lack that field. If called with a function, calls that function on every
77+
// element of the sequence and sums the results, skipping elements of the
78+
// sequence where that function returns null or a non-existence error.
79+
func (t RqlTerm) Sum(fieldOrFunctions ...interface{}) RqlTerm {
80+
for k, v := range fieldOrFunctions {
81+
fieldOrFunctions[k] = funcWrap(v)
82+
}
83+
84+
return newRqlTermFromPrevVal(t, "Sum", p.Term_SUM, fieldOrFunctions, map[string]interface{}{})
6385
}
6486

65-
// Compute the average value of the given attribute for the group.
66-
func Avg(arg interface{}) RqlTerm {
67-
return Expr(map[string]interface{}{
68-
"AVG": arg,
69-
})
87+
// Averages all the elements of a sequence. If called with a field name, averages
88+
// all the values of that field in the sequence, skipping elements of the sequence
89+
// that lack that field. If called with a function, calls that function on every
90+
// element of the sequence and averages the results, skipping elements of the
91+
// sequence where that function returns null or a non-existence error.
92+
func (t RqlTerm) Avg(fieldOrFunctions ...interface{}) RqlTerm {
93+
for k, v := range fieldOrFunctions {
94+
fieldOrFunctions[k] = funcWrap(v)
95+
}
96+
97+
return newRqlTermFromPrevVal(t, "Sum", p.Term_SUM, fieldOrFunctions, map[string]interface{}{})
98+
}
99+
100+
// Finds the minimum of a sequence. If called with a field name, finds the element
101+
// of that sequence with the smallest value in that field. If called with a function,
102+
// calls that function on every element of the sequence and returns the element
103+
// which produced the smallest value, ignoring any elements where the function
104+
// returns null or produces a non-existence error.
105+
func (t RqlTerm) Min(fieldOrFunctions ...interface{}) RqlTerm {
106+
for k, v := range fieldOrFunctions {
107+
fieldOrFunctions[k] = funcWrap(v)
108+
}
109+
110+
return newRqlTermFromPrevVal(t, "Min", p.Term_MIN, fieldOrFunctions, map[string]interface{}{})
111+
}
112+
113+
// Finds the maximum of a sequence. If called with a field name, finds the element
114+
// of that sequence with the largest value in that field. If called with a function,
115+
// calls that function on every element of the sequence and returns the element
116+
// which produced the largest value, ignoring any elements where the function
117+
// returns null or produces a non-existence error.
118+
func (t RqlTerm) Max(fieldOrFunctions ...interface{}) RqlTerm {
119+
for k, v := range fieldOrFunctions {
120+
fieldOrFunctions[k] = funcWrap(v)
121+
}
122+
123+
return newRqlTermFromPrevVal(t, "Max", p.Term_MAX, fieldOrFunctions, map[string]interface{}{})
70124
}

0 commit comments

Comments
 (0)