Skip to content

Commit 9e3eafb

Browse files
authored
Add count operation (useful if multiple metrics match the labels) (#45)
Signed-off-by: Jirka Kremser <jiri.kremser@gmail.com>
1 parent 0aa9f26 commit 9e3eafb

File tree

5 files changed

+76
-8
lines changed

5 files changed

+76
-8
lines changed

metric/mem_store.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ func (m ms) Get(unescapedName types.MetricName, searchLabels types.Labels, timeO
4040
return value, found, err
4141
}
4242

43-
func (m ms) get(name types.MetricName, searchLabels types.Labels, timeOp types.OperationOverTime, defaultAggregation types.AggregationOverVectors) (float64, types.Found, error) {
43+
func (m ms) get(name types.MetricName, searchLabels types.Labels, timeOp types.OperationOverTime, vecOp types.AggregationOverVectors) (float64, types.Found, error) {
4444
now := time.Now().Unix()
4545
if err := util.CheckTimeOp(timeOp); err != nil {
4646
return -1., false, err
4747
}
48-
if err := checkDefaultAggregation(defaultAggregation); err != nil {
48+
if err := checkVectorAggregation(vecOp); err != nil {
4949
return -1., false, err
5050
}
5151
storedMetrics, found := m.store.Load(string(name))
@@ -55,6 +55,9 @@ func (m ms) get(name types.MetricName, searchLabels types.Labels, timeOp types.O
5555
}
5656
if md, f := storedMetrics.Load(hashOfMap(searchLabels)); f {
5757
// found exact label match
58+
if vecOp == types.VecCount {
59+
return 1, true, nil
60+
}
5861
if !m.isStale(md.LastUpdate, now) {
5962
ret, f := md.AggregatesOverTime.Load(timeOp)
6063
if !f {
@@ -87,7 +90,7 @@ func (m ms) get(name types.MetricName, searchLabels types.Labels, timeOp types.O
8790
return true
8891
}
8992
counter += 1
90-
accumulator = m.calculateAggregate(val, counter, accumulator, defaultAggregation)
93+
accumulator = m.calculateAggregate(val, counter, accumulator, vecOp)
9194
} else {
9295
defer func() {
9396
storedMetrics.Delete(hashOfMap(searchLabels))
@@ -99,9 +102,9 @@ func (m ms) get(name types.MetricName, searchLabels types.Labels, timeOp types.O
99102
return accumulator, true, nil
100103
}
101104

102-
func checkDefaultAggregation(aggregation types.AggregationOverVectors) error {
105+
func checkVectorAggregation(aggregation types.AggregationOverVectors) error {
103106
switch aggregation {
104-
case types.VecSum, types.VecAvg, types.VecMin, types.VecMax:
107+
case types.VecSum, types.VecAvg, types.VecMin, types.VecMax, types.VecCount:
105108
return nil
106109
default:
107110
return fmt.Errorf("unknown AggregationOverVectors:%s", aggregation)
@@ -173,7 +176,11 @@ func (m ms) isStale(datapoint uint32, now int64) bool {
173176

174177
func (m ms) calculateAggregate(value float64, counter int, accumulator float64, aggregation types.AggregationOverVectors) float64 {
175178
if counter == 1 {
176-
return value
179+
if aggregation == types.VecCount {
180+
return 1
181+
} else {
182+
return value
183+
}
177184
}
178185
switch aggregation {
179186
case types.VecSum:
@@ -188,6 +195,8 @@ func (m ms) calculateAggregate(value float64, counter int, accumulator float64,
188195
return math.Min(accumulator, value)
189196
case types.VecMax:
190197
return math.Max(accumulator, value)
198+
case types.VecCount:
199+
return accumulator + 1
191200
default:
192201
panic("unknown aggregation function: " + aggregation)
193202
}

metric/mem_store_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,49 @@ func TestMemStoreSumOverAverages(t *testing.T) {
521521
assertMetricFound(t, val, found, err, 3.5+2.333)
522522
}
523523

524+
func TestMemStoreCount(t *testing.T) {
525+
// setup
526+
ms := NewMetricStore(60)
527+
labels1 := map[string]any{
528+
"a": "1",
529+
"b": "2",
530+
}
531+
labels2 := map[string]any{
532+
"a": "1",
533+
"b": "3",
534+
}
535+
labels3 := map[string]any{
536+
"a": "1",
537+
"b": "4",
538+
}
539+
labels4 := map[string]any{
540+
"a": "2",
541+
"b": "2",
542+
}
543+
name1 := "metric_name"
544+
setupMetrics(ms, name1, 1, labels1, 1., 2.)
545+
setupMetrics(ms, name1, 1, labels2, 1., 2., 3.)
546+
setupMetrics(ms, name1, 1, labels3, 1., 2., 3., 4.)
547+
setupMetrics(ms, name1, 1, labels4, 1., 2., 3., 4., 5.)
548+
setupMetrics(ms, "noise", 1, labels2, 1., 2., 3., 4., 5.) // this shouldn't be included
549+
val1, found1, err1 := ms.Get(types.MetricName(name1), map[string]any{
550+
"a": "1",
551+
}, types.OpAvg, types.VecCount)
552+
assertMetricFound(t, val1, found1, err1, 3.)
553+
554+
val2, found2, err2 := ms.Get(types.MetricName(name1), map[string]any{
555+
"b": "2",
556+
}, types.OpAvg, types.VecCount)
557+
assertMetricFound(t, val2, found2, err2, 2.)
558+
val3, found3, err3 := ms.Get(types.MetricName(name1), map[string]any{}, types.OpAvg, types.VecCount)
559+
assertMetricFound(t, val3, found3, err3, 4.)
560+
val4, found4, err4 := ms.Get(types.MetricName(name1), map[string]any{
561+
"a": "1",
562+
"b": "2",
563+
}, types.OpAvg, types.VecCount)
564+
assertMetricFound(t, val4, found4, err4, 1.)
565+
}
566+
524567
func setupMetrics(store types.MemStore, name string, secondsStep int64, labels map[string]any, vals ...float64) {
525568
now := time.Now().Unix()
526569
for i, v := range vals {

metric/simple_parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (p p) Parse(metricQuery string) (types.MetricName, types.Labels, types.Aggr
2323
}
2424
mq := strings.TrimSpace(metricQuery)
2525
aggregateFunction := types.VecSum // default
26-
for _, aggFn := range []types.AggregationOverVectors{types.VecSum, types.VecAvg, types.VecMin, types.VecMax} {
26+
for _, aggFn := range []types.AggregationOverVectors{types.VecSum, types.VecAvg, types.VecMin, types.VecMax, types.VecCount} {
2727
if strings.HasPrefix(mq, string(aggFn)+"(") && strings.HasSuffix(mq, ")") {
2828
aggregateFunction = aggFn
2929
mq = strings.TrimPrefix(mq, string(aggFn)+"(")

metric/simple_parser_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ func TestSimpleParserMin(t *testing.T) {
8080
}
8181
}
8282

83+
func TestSimpleParserCount(t *testing.T) {
84+
// setup
85+
p := NewParser()
86+
87+
// check
88+
name, labels, agg, err := p.Parse("count(metric_foo{ahoj=cau})")
89+
if name != "metric_foo" || fmt.Sprint(labels) != fmt.Sprint(map[string]any{"ahoj": "cau"}) || agg != types.VecCount || err != nil {
90+
t.Errorf("expected: [metric_foo, map[ahoj:cau], min, <nil>], got: [%s, %v, %v, %v]", name, labels, agg, err)
91+
}
92+
}
93+
8394
func TestSimpleParserNoLabels(t *testing.T) {
8495
// setup
8596
p := NewParser()

types/metrics.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ const (
3939
// and on the resulting set of numbers where each represents last_one, rate, min, max, avg of the time serie, we apply
4040
// this function
4141

42-
// VecSum sums the number
42+
// VecSum sums the numbers
4343
VecSum AggregationOverVectors = "sum"
44+
// VecAvg calculate the mean value
4445
VecAvg AggregationOverVectors = "avg"
46+
// VecMin calculate the minimum value
4547
VecMin AggregationOverVectors = "min"
48+
// VecMax calculate the max value
4649
VecMax AggregationOverVectors = "max"
50+
// VecCount calculate the number of occurrences
51+
VecCount AggregationOverVectors = "count"
4752

4853
// following operations can be applied on one time serie vector that was captured over time
4954
// returning just one number

0 commit comments

Comments
 (0)