Skip to content

Commit 45044cb

Browse files
Merge pull request #602 from prometheus/feat/multi-valued-event
feat(event): Add MultiValueEvent interface and MultiObserverEvent implementation
2 parents ce77e77 + cc1fcb4 commit 45044cb

File tree

2 files changed

+295
-0
lines changed

2 files changed

+295
-0
lines changed

pkg/event/event.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"time"
1919

2020
"github.com/prometheus/client_golang/prometheus"
21+
2122
"github.com/prometheus/statsd_exporter/pkg/clock"
2223
"github.com/prometheus/statsd_exporter/pkg/mapper"
2324
)
@@ -39,6 +40,7 @@ func (c *CounterEvent) MetricName() string { return c.CMetricName }
3940
func (c *CounterEvent) Value() float64 { return c.CValue }
4041
func (c *CounterEvent) Labels() map[string]string { return c.CLabels }
4142
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
43+
func (c *CounterEvent) Values() []float64 { return []float64{c.CValue} }
4244

4345
type GaugeEvent struct {
4446
GMetricName string
@@ -51,6 +53,7 @@ func (g *GaugeEvent) MetricName() string { return g.GMetricName }
5153
func (g *GaugeEvent) Value() float64 { return g.GValue }
5254
func (g *GaugeEvent) Labels() map[string]string { return g.GLabels }
5355
func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
56+
func (g *GaugeEvent) Values() []float64 { return []float64{g.GValue} }
5457

5558
type ObserverEvent struct {
5659
OMetricName string
@@ -62,6 +65,7 @@ func (o *ObserverEvent) MetricName() string { return o.OMetricName }
6265
func (o *ObserverEvent) Value() float64 { return o.OValue }
6366
func (o *ObserverEvent) Labels() map[string]string { return o.OLabels }
6467
func (o *ObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
68+
func (o *ObserverEvent) Values() []float64 { return []float64{o.OValue} }
6569

6670
type Events []Event
6771

@@ -136,3 +140,87 @@ type UnbufferedEventHandler struct {
136140
func (ueh *UnbufferedEventHandler) Queue(events Events) {
137141
ueh.C <- events
138142
}
143+
144+
// MultiValueEvent is an event that contains multiple values, it is going to replace the existing Event interface.
145+
type MultiValueEvent interface {
146+
MetricName() string
147+
Labels() map[string]string
148+
MetricType() mapper.MetricType
149+
Values() []float64
150+
}
151+
152+
type MultiObserverEvent struct {
153+
OMetricName string
154+
OValues []float64 // DataDog extensions allow multiple values in a single sample
155+
OLabels map[string]string
156+
SampleRate float64
157+
}
158+
159+
type ExpandableEvent interface {
160+
Expand() []Event
161+
}
162+
163+
func (m *MultiObserverEvent) MetricName() string { return m.OMetricName }
164+
func (m *MultiObserverEvent) Value() float64 { return m.OValues[0] }
165+
func (m *MultiObserverEvent) Labels() map[string]string { return m.OLabels }
166+
func (m *MultiObserverEvent) MetricType() mapper.MetricType { return mapper.MetricTypeObserver }
167+
func (m *MultiObserverEvent) Values() []float64 { return m.OValues }
168+
169+
// Expand returns a list of events that are the result of expanding the multi-value event.
170+
// This will be used as a middle-step in the pipeline to convert multi-value events to single-value events.
171+
// And keep the exporter code compatible with previous versions.
172+
func (m *MultiObserverEvent) Expand() []Event {
173+
if len(m.OValues) == 1 && m.SampleRate == 0 {
174+
return []Event{
175+
&ObserverEvent{
176+
OMetricName: m.OMetricName,
177+
OValue: m.OValues[0],
178+
OLabels: copyLabels(m.OLabels),
179+
},
180+
}
181+
}
182+
183+
events := make([]Event, 0, len(m.OValues))
184+
for _, value := range m.OValues {
185+
events = append(events, &ObserverEvent{
186+
OMetricName: m.OMetricName,
187+
OValue: value,
188+
OLabels: copyLabels(m.OLabels),
189+
})
190+
}
191+
192+
if m.SampleRate > 0 && m.SampleRate < 1 {
193+
multiplier := int(1 / m.SampleRate)
194+
multipliedEvents := make([]Event, 0, len(events)*multiplier)
195+
for i := 0; i < multiplier; i++ {
196+
for _, event := range events {
197+
e := event.(*ObserverEvent)
198+
multipliedEvents = append(multipliedEvents, &ObserverEvent{
199+
OMetricName: e.OMetricName,
200+
OValue: e.OValue,
201+
OLabels: copyLabels(e.OLabels),
202+
})
203+
}
204+
}
205+
return multipliedEvents
206+
}
207+
208+
return events
209+
}
210+
211+
// Helper function to copy labels map
212+
func copyLabels(labels map[string]string) map[string]string {
213+
newLabels := make(map[string]string, len(labels))
214+
for k, v := range labels {
215+
newLabels[k] = v
216+
}
217+
return newLabels
218+
}
219+
220+
var (
221+
_ ExpandableEvent = &MultiObserverEvent{}
222+
_ MultiValueEvent = &MultiObserverEvent{}
223+
_ MultiValueEvent = &CounterEvent{}
224+
_ MultiValueEvent = &GaugeEvent{}
225+
_ MultiValueEvent = &ObserverEvent{}
226+
)

pkg/event/event_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
package event
1515

1616
import (
17+
"fmt"
18+
"reflect"
1719
"testing"
1820
"time"
1921

2022
"github.com/prometheus/client_golang/prometheus"
23+
2124
"github.com/prometheus/statsd_exporter/pkg/clock"
25+
"github.com/prometheus/statsd_exporter/pkg/mapper"
2226
)
2327

2428
var eventsFlushed = prometheus.NewCounter(
@@ -85,3 +89,206 @@ func TestEventIntervalFlush(t *testing.T) {
8589
t.Fatal("Expected 10 events in the event channel, but got", len(events))
8690
}
8791
}
92+
93+
func TestMultiValueEvent(t *testing.T) {
94+
tests := []struct {
95+
name string
96+
event MultiValueEvent
97+
wantValues []float64
98+
wantName string
99+
wantType mapper.MetricType
100+
wantLabels map[string]string
101+
}{
102+
{
103+
name: "MultiObserverEvent with single value",
104+
event: &MultiObserverEvent{
105+
OMetricName: "test_metric",
106+
OValues: []float64{1.0},
107+
OLabels: map[string]string{"label": "value"},
108+
SampleRate: 0,
109+
},
110+
wantValues: []float64{1.0},
111+
wantName: "test_metric",
112+
wantType: mapper.MetricTypeObserver,
113+
wantLabels: map[string]string{"label": "value"},
114+
},
115+
{
116+
name: "MultiObserverEvent with multiple values",
117+
event: &MultiObserverEvent{
118+
OMetricName: "test_metric",
119+
OValues: []float64{1.0, 2.0, 3.0},
120+
OLabels: map[string]string{"label": "value"},
121+
SampleRate: 0.5,
122+
},
123+
wantValues: []float64{1.0, 2.0, 3.0},
124+
wantName: "test_metric",
125+
wantType: mapper.MetricTypeObserver,
126+
wantLabels: map[string]string{"label": "value"},
127+
},
128+
{
129+
name: "CounterEvent implements MultiValueEvent",
130+
event: &CounterEvent{
131+
CMetricName: "test_counter",
132+
CValue: 42.0,
133+
CLabels: map[string]string{"label": "value"},
134+
},
135+
wantValues: []float64{42.0},
136+
wantName: "test_counter",
137+
wantType: mapper.MetricTypeCounter,
138+
wantLabels: map[string]string{"label": "value"},
139+
},
140+
{
141+
name: "GaugeEvent implements MultiValueEvent",
142+
event: &GaugeEvent{
143+
GMetricName: "test_gauge",
144+
GValue: 123.0,
145+
GLabels: map[string]string{"label": "value"},
146+
},
147+
wantValues: []float64{123.0},
148+
wantName: "test_gauge",
149+
wantType: mapper.MetricTypeGauge,
150+
wantLabels: map[string]string{"label": "value"},
151+
},
152+
{
153+
name: "ObserverEvent implements MultiValueEvent",
154+
event: &ObserverEvent{
155+
OMetricName: "test_observer",
156+
OValue: 99.0,
157+
OLabels: map[string]string{"label": "value"},
158+
},
159+
wantValues: []float64{99.0},
160+
wantName: "test_observer",
161+
wantType: mapper.MetricTypeObserver,
162+
wantLabels: map[string]string{"label": "value"},
163+
},
164+
}
165+
166+
for _, tt := range tests {
167+
t.Run(tt.name, func(t *testing.T) {
168+
if got := tt.event.Values(); !reflect.DeepEqual(got, tt.wantValues) {
169+
t.Errorf("MultiValueEvent.Values() = %v, want %v", got, tt.wantValues)
170+
}
171+
if got := tt.event.MetricName(); got != tt.wantName {
172+
t.Errorf("MultiValueEvent.MetricName() = %v, want %v", got, tt.wantName)
173+
}
174+
if got := tt.event.MetricType(); got != tt.wantType {
175+
t.Errorf("MultiValueEvent.MetricType() = %v, want %v", got, tt.wantType)
176+
}
177+
if got := tt.event.Labels(); !reflect.DeepEqual(got, tt.wantLabels) {
178+
t.Errorf("MultiValueEvent.Labels() = %v, want %v", got, tt.wantLabels)
179+
}
180+
})
181+
}
182+
}
183+
184+
func TestMultiObserverEvent_Expand(t *testing.T) {
185+
t.Parallel()
186+
tests := []struct {
187+
name string
188+
event *MultiObserverEvent
189+
wantEvents []Event
190+
}{
191+
{
192+
name: "single value no sampling",
193+
event: &MultiObserverEvent{
194+
OMetricName: "test_metric",
195+
OValues: []float64{1.0},
196+
OLabels: map[string]string{"label": "value"},
197+
SampleRate: 0,
198+
},
199+
wantEvents: []Event{
200+
&ObserverEvent{
201+
OMetricName: "test_metric",
202+
OValue: 1.0,
203+
OLabels: map[string]string{"label": "value"},
204+
},
205+
},
206+
},
207+
{
208+
name: "multiple values no sampling",
209+
event: &MultiObserverEvent{
210+
OMetricName: "test_metric",
211+
OValues: []float64{1.0, 2.0, 3.0},
212+
OLabels: map[string]string{"label": "value"},
213+
SampleRate: 0,
214+
},
215+
wantEvents: []Event{
216+
&ObserverEvent{
217+
OMetricName: "test_metric",
218+
OValue: 1.0,
219+
OLabels: map[string]string{"label": "value"},
220+
},
221+
&ObserverEvent{
222+
OMetricName: "test_metric",
223+
OValue: 2.0,
224+
OLabels: map[string]string{"label": "value"},
225+
},
226+
&ObserverEvent{
227+
OMetricName: "test_metric",
228+
OValue: 3.0,
229+
OLabels: map[string]string{"label": "value"},
230+
},
231+
},
232+
},
233+
{
234+
name: "multiple values with sampling",
235+
event: &MultiObserverEvent{
236+
OMetricName: "test_metric",
237+
OValues: []float64{1.0, 2.0},
238+
OLabels: map[string]string{"label": "value"},
239+
SampleRate: 0.5,
240+
},
241+
wantEvents: []Event{
242+
&ObserverEvent{
243+
OMetricName: "test_metric",
244+
OValue: 1.0,
245+
OLabels: map[string]string{"label": "value"},
246+
},
247+
&ObserverEvent{
248+
OMetricName: "test_metric",
249+
OValue: 2.0,
250+
OLabels: map[string]string{"label": "value"},
251+
},
252+
&ObserverEvent{
253+
OMetricName: "test_metric",
254+
OValue: 1.0,
255+
OLabels: map[string]string{"label": "value"},
256+
},
257+
&ObserverEvent{
258+
OMetricName: "test_metric",
259+
OValue: 2.0,
260+
OLabels: map[string]string{"label": "value"},
261+
},
262+
},
263+
},
264+
}
265+
266+
for _, tt := range tests {
267+
t.Run(tt.name, func(t *testing.T) {
268+
t.Parallel()
269+
got := tt.event.Expand()
270+
if len(tt.wantEvents) != len(got) {
271+
t.Fatalf("Expected %d events, but got %d", len(tt.wantEvents), len(got))
272+
}
273+
274+
eventCount := func(events []Event) map[string]int {
275+
counts := make(map[string]int)
276+
for _, event := range events {
277+
oe := event.(*ObserverEvent)
278+
key := fmt.Sprintf("%s%f%v", oe.OMetricName, oe.OValue, oe.OLabels)
279+
counts[key]++
280+
}
281+
return counts
282+
}
283+
284+
wantMap := eventCount(tt.wantEvents)
285+
gotMap := eventCount(got)
286+
287+
for key, count := range wantMap {
288+
if gotMap[key] != count {
289+
t.Fatalf("Event mismatch for key %v: expected %d, got %d", key, count, gotMap[key])
290+
}
291+
}
292+
})
293+
}
294+
}

0 commit comments

Comments
 (0)