Skip to content

Commit 596415b

Browse files
authored
fix(meterexport): allow groubpys (#3689)
1 parent b20b349 commit 596415b

File tree

5 files changed

+4
-32
lines changed

5 files changed

+4
-32
lines changed

openmeter/meterexport/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Service interface {
2828
//
2929
// NOTE: Currently only SUM and COUNT meters are supported.
3030
// NOTE: GroupBy values are not yet supported.
31-
// NOTE: Subjects and Customers are not honored in the exported data.
31+
// NOTE: Customers are not honored in the exported data.
3232
ExportSyntheticMeterData(ctx context.Context, config DataExportConfig, result chan<- streaming.RawEvent, err chan<- error) error
3333

3434
// ExportSyntheticMeterDataIter is an iterator-based wrapper around ExportSyntheticMeterData.

openmeter/meterexport/service/funnel.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,10 @@ func (p funnelParams) validateUnsupportedParams() []error {
4949
errs = append(errs, errors.New("filter customer is not supported"))
5050
}
5151

52-
if len(p.queryParams.FilterSubject) > 0 {
53-
errs = append(errs, errors.New("filter subject is not supported"))
54-
}
55-
5652
if len(p.queryParams.FilterGroupBy) > 0 {
5753
errs = append(errs, errors.New("filter group by is not supported"))
5854
}
5955

60-
if len(p.queryParams.GroupBy) > 0 {
61-
errs = append(errs, errors.New("group by is not supported"))
62-
}
63-
6456
return errs
6557
}
6658

@@ -101,6 +93,7 @@ func (s *service) funnel(ctx context.Context, params funnelParams, resultCh chan
10193
To: &queryTo,
10294
WindowSize: params.queryParams.WindowSize,
10395
WindowTimeZone: params.queryParams.WindowTimeZone,
96+
GroupBy: params.queryParams.GroupBy,
10497
}
10598

10699
rows, err := s.StreamingConnector.QueryMeter(ctx, params.meter.Namespace, params.meter, queryParams)

openmeter/meterexport/service/service.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
type Config struct {
1212
// Configuration
1313
EventSourceGroup string
14-
ExportSubject string
1514

1615
// Dependencies
1716
StreamingConnector streaming.Connector
@@ -21,10 +20,6 @@ type Config struct {
2120
func (c Config) validate() error {
2221
var errs []error
2322

24-
if c.ExportSubject == "" {
25-
errs = append(errs, errors.New("export subject is required"))
26-
}
27-
2823
if c.EventSourceGroup == "" {
2924
errs = append(errs, errors.New("event source group is required"))
3025
}

openmeter/meterexport/service/service_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ func TestExportSyntheticMeterData(t *testing.T) {
209209

210210
// Create service
211211
svc, err := New(Config{
212-
ExportSubject: "test-subject",
213212
EventSourceGroup: "test-source",
214213
StreamingConnector: mockStreaming,
215214
MeterService: mockMeterService,
@@ -308,7 +307,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
308307
}
309308

310309
svc, err := New(Config{
311-
ExportSubject: "test-subject",
312310
EventSourceGroup: "test-source",
313311
StreamingConnector: mockStreaming,
314312
MeterService: mockMeterService,
@@ -384,7 +382,6 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
384382
mockStreaming.AddSimpleEvent("test-meter", 10.0, now.Add(-5*time.Minute))
385383

386384
svc, err := New(Config{
387-
ExportSubject: "test-subject",
388385
EventSourceGroup: "test-source",
389386
StreamingConnector: mockStreaming,
390387
MeterService: mockMeterService,
@@ -453,15 +450,6 @@ func TestServiceNew(t *testing.T) {
453450
assert.Contains(t, err.Error(), "meter service is required")
454451
})
455452

456-
t.Run("should fail without export subject", func(t *testing.T) {
457-
_, err := New(Config{
458-
StreamingConnector: testutils.NewMockStreamingConnector(t),
459-
MeterService: NewMockMeterService(),
460-
})
461-
require.Error(t, err)
462-
assert.Contains(t, err.Error(), "export subject is required")
463-
})
464-
465453
t.Run("should fail without event source group", func(t *testing.T) {
466454
_, err := New(Config{
467455
StreamingConnector: testutils.NewMockStreamingConnector(t),
@@ -475,7 +463,6 @@ func TestServiceNew(t *testing.T) {
475463
svc, err := New(Config{
476464
StreamingConnector: testutils.NewMockStreamingConnector(t),
477465
MeterService: NewMockMeterService(),
478-
ExportSubject: "test-subject",
479466
EventSourceGroup: "test-source",
480467
})
481468
require.NoError(t, err)
@@ -512,7 +499,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
512499
mockStreaming.AddSimpleEvent("test-meter", 30.0, now.Add(-2*time.Minute))
513500

514501
svc, err := New(Config{
515-
ExportSubject: "test-subject",
516502
EventSourceGroup: "test-source",
517503
StreamingConnector: mockStreaming,
518504
MeterService: mockMeterService,
@@ -576,7 +562,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
576562
}
577563

578564
svc, err := New(Config{
579-
ExportSubject: "test-subject",
580565
EventSourceGroup: "test-source",
581566
StreamingConnector: mockStreaming,
582567
MeterService: mockMeterService,
@@ -621,7 +606,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
621606
mockStreaming := testutils.NewMockStreamingConnector(t)
622607

623608
svc, err := New(Config{
624-
ExportSubject: "test-subject",
625609
EventSourceGroup: "test-source",
626610
StreamingConnector: mockStreaming,
627611
MeterService: mockMeterService,
@@ -668,7 +652,6 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
668652
mockStreaming := testutils.NewMockStreamingConnector(t)
669653

670654
svc, err := New(Config{
671-
ExportSubject: "test-subject",
672655
EventSourceGroup: "test-source",
673656
StreamingConnector: mockStreaming,
674657
MeterService: mockMeterService,

openmeter/meterexport/service/syntheticdata.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (s *service) ExportSyntheticMeterData(ctx context.Context, config meterexpo
126126
To: config.Period.To,
127127
WindowSize: &config.ExportWindowSize,
128128
WindowTimeZone: time.UTC,
129+
GroupBy: []string{"subject"},
129130
},
130131
}, meterRowCh, meterRowErrCh)
131132
})
@@ -146,7 +147,7 @@ func (s *service) createEventFromMeterRow(m meter.Meter, row meter.MeterQueryRow
146147
ID: ulid.Make().String(),
147148
Type: m.EventType, // We reuse the same type as the source meter
148149
Source: fmt.Sprintf("%s:%s/%s", s.EventSourceGroup, m.Namespace, m.ID),
149-
Subject: s.ExportSubject,
150+
Subject: lo.FromPtr(row.Subject),
150151
IngestedAt: clock.Now(),
151152
Time: row.WindowStart,
152153
CustomerID: nil,

0 commit comments

Comments
 (0)