Skip to content

Commit 450bd5c

Browse files
authored
refactor(meterexport): break up param signature (#3743)
1 parent 596415b commit 450bd5c

File tree

4 files changed

+134
-81
lines changed

4 files changed

+134
-81
lines changed

openmeter/meterexport/service.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"iter"
7+
"time"
78

89
"github.com/openmeterio/openmeter/openmeter/meter"
910
"github.com/openmeterio/openmeter/openmeter/streaming"
@@ -29,7 +30,7 @@ type Service interface {
2930
// NOTE: Currently only SUM and COUNT meters are supported.
3031
// NOTE: GroupBy values are not yet supported.
3132
// NOTE: Customers are not honored in the exported data.
32-
ExportSyntheticMeterData(ctx context.Context, config DataExportConfig, result chan<- streaming.RawEvent, err chan<- error) error
33+
ExportSyntheticMeterData(ctx context.Context, params DataExportParams, result chan<- streaming.RawEvent, err chan<- error) error
3334

3435
// ExportSyntheticMeterDataIter is an iterator-based wrapper around ExportSyntheticMeterData.
3536
// It returns an iter.Seq2 that yields events and errors. The iterator handles channel management internally.
@@ -43,7 +44,7 @@ type Service interface {
4344
// if err != nil { handle error }
4445
// process(event)
4546
// }
46-
ExportSyntheticMeterDataIter(ctx context.Context, config DataExportConfig) (iter.Seq2[streaming.RawEvent, error], error)
47+
ExportSyntheticMeterDataIter(ctx context.Context, params DataExportParams) (iter.Seq2[streaming.RawEvent, error], error)
4748
}
4849

4950
// TargetMeterDescriptor is a minimal MeterCreateInput which can accurately represent the exported data.
@@ -57,11 +58,11 @@ type DataExportConfig struct {
5758
// Defines in what pre-aggregated windows the synthetic data will be exported in
5859
ExportWindowSize meter.WindowSize
5960

61+
// The time zone used when exporting the synthetic data
62+
ExportWindowTimeZone *time.Location
63+
6064
// The source meter to export data from
6165
MeterID models.NamespacedID
62-
63-
// The period to export data for
64-
Period timeutil.StartBoundedPeriod
6566
}
6667

6768
func (c DataExportConfig) Validate() error {
@@ -71,6 +72,10 @@ func (c DataExportConfig) Validate() error {
7172
errs = append(errs, errors.New("export window size is required"))
7273
}
7374

75+
if c.ExportWindowTimeZone == nil {
76+
errs = append(errs, errors.New("export window time zone is required"))
77+
}
78+
7479
if c.MeterID.Namespace == "" {
7580
errs = append(errs, errors.New("meter namespace is required"))
7681
}
@@ -79,7 +84,24 @@ func (c DataExportConfig) Validate() error {
7984
errs = append(errs, errors.New("meter id is required"))
8085
}
8186

82-
if err := c.Period.Validate(); err != nil {
87+
return errors.Join(errs...)
88+
}
89+
90+
type DataExportParams struct {
91+
DataExportConfig
92+
93+
// The period to export data for
94+
Period timeutil.StartBoundedPeriod
95+
}
96+
97+
func (p DataExportParams) Validate() error {
98+
var errs []error
99+
100+
if err := p.DataExportConfig.Validate(); err != nil {
101+
errs = append(errs, err)
102+
}
103+
104+
if err := p.Period.Validate(); err != nil {
83105
errs = append(errs, err)
84106
}
85107

openmeter/meterexport/service/service_test.go

Lines changed: 97 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestExportSyntheticMeterData(t *testing.T) {
5353
tests := []struct {
5454
name string
5555
meter meter.Meter
56-
config meterexport.DataExportConfig
56+
params meterexport.DataExportParams
5757
events []testutils.SimpleEvent
5858
wantErr bool
5959
wantErrMsg string
@@ -76,11 +76,14 @@ func TestExportSyntheticMeterData(t *testing.T) {
7676
EventType: "test-event",
7777
ValueProperty: lo.ToPtr("$.value"),
7878
},
79-
config: meterexport.DataExportConfig{
80-
ExportWindowSize: meter.WindowSizeMinute,
81-
MeterID: models.NamespacedID{
82-
Namespace: "test-ns",
83-
ID: "meter-1",
79+
params: meterexport.DataExportParams{
80+
DataExportConfig: meterexport.DataExportConfig{
81+
ExportWindowSize: meter.WindowSizeMinute,
82+
ExportWindowTimeZone: time.UTC,
83+
MeterID: models.NamespacedID{
84+
Namespace: "test-ns",
85+
ID: "meter-1",
86+
},
8487
},
8588
Period: timeutil.StartBoundedPeriod{
8689
From: now.Add(-10 * time.Minute),
@@ -110,11 +113,14 @@ func TestExportSyntheticMeterData(t *testing.T) {
110113
Aggregation: meter.MeterAggregationCount,
111114
EventType: "count-event",
112115
},
113-
config: meterexport.DataExportConfig{
114-
ExportWindowSize: meter.WindowSizeMinute,
115-
MeterID: models.NamespacedID{
116-
Namespace: "test-ns",
117-
ID: "meter-count",
116+
params: meterexport.DataExportParams{
117+
DataExportConfig: meterexport.DataExportConfig{
118+
ExportWindowSize: meter.WindowSizeMinute,
119+
ExportWindowTimeZone: time.UTC,
120+
MeterID: models.NamespacedID{
121+
Namespace: "test-ns",
122+
ID: "meter-count",
123+
},
118124
},
119125
Period: timeutil.StartBoundedPeriod{
120126
From: now.Add(-5 * time.Minute),
@@ -144,11 +150,14 @@ func TestExportSyntheticMeterData(t *testing.T) {
144150
EventType: "avg-event",
145151
ValueProperty: lo.ToPtr("$.value"),
146152
},
147-
config: meterexport.DataExportConfig{
148-
ExportWindowSize: meter.WindowSizeMinute,
149-
MeterID: models.NamespacedID{
150-
Namespace: "test-ns",
151-
ID: "meter-avg",
153+
params: meterexport.DataExportParams{
154+
DataExportConfig: meterexport.DataExportConfig{
155+
ExportWindowSize: meter.WindowSizeMinute,
156+
ExportWindowTimeZone: time.UTC,
157+
MeterID: models.NamespacedID{
158+
Namespace: "test-ns",
159+
ID: "meter-avg",
160+
},
152161
},
153162
Period: timeutil.StartBoundedPeriod{
154163
From: now.Add(-5 * time.Minute),
@@ -161,11 +170,14 @@ func TestExportSyntheticMeterData(t *testing.T) {
161170
{
162171
name: "should fail validation with missing meter ID",
163172
meter: meter.Meter{},
164-
config: meterexport.DataExportConfig{
165-
ExportWindowSize: meter.WindowSizeMinute,
166-
MeterID: models.NamespacedID{
167-
Namespace: "test-ns",
168-
ID: "",
173+
params: meterexport.DataExportParams{
174+
DataExportConfig: meterexport.DataExportConfig{
175+
ExportWindowSize: meter.WindowSizeMinute,
176+
ExportWindowTimeZone: time.UTC,
177+
MeterID: models.NamespacedID{
178+
Namespace: "test-ns",
179+
ID: "",
180+
},
169181
},
170182
Period: timeutil.StartBoundedPeriod{
171183
From: now.Add(-5 * time.Minute),
@@ -178,11 +190,14 @@ func TestExportSyntheticMeterData(t *testing.T) {
178190
{
179191
name: "should fail when meter not found",
180192
meter: meter.Meter{},
181-
config: meterexport.DataExportConfig{
182-
ExportWindowSize: meter.WindowSizeMinute,
183-
MeterID: models.NamespacedID{
184-
Namespace: "test-ns",
185-
ID: "non-existent",
193+
params: meterexport.DataExportParams{
194+
DataExportConfig: meterexport.DataExportConfig{
195+
ExportWindowSize: meter.WindowSizeMinute,
196+
ExportWindowTimeZone: time.UTC,
197+
MeterID: models.NamespacedID{
198+
Namespace: "test-ns",
199+
ID: "non-existent",
200+
},
186201
},
187202
Period: timeutil.StartBoundedPeriod{
188203
From: now.Add(-5 * time.Minute),
@@ -221,7 +236,7 @@ func TestExportSyntheticMeterData(t *testing.T) {
221236

222237
// Execute
223238
ctx := context.Background()
224-
err = svc.ExportSyntheticMeterData(ctx, tt.config, resultCh, errCh)
239+
err = svc.ExportSyntheticMeterData(ctx, tt.params, resultCh, errCh)
225240

226241
if tt.wantErr {
227242
require.Error(t, err)
@@ -232,7 +247,7 @@ func TestExportSyntheticMeterData(t *testing.T) {
232247
require.NoError(t, err)
233248

234249
// Get descriptor separately
235-
descriptor, err := svc.GetTargetMeterDescriptor(ctx, tt.config)
250+
descriptor, err := svc.GetTargetMeterDescriptor(ctx, tt.params.DataExportConfig)
236251
require.NoError(t, err)
237252

238253
// Collect results
@@ -319,11 +334,14 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
319334

320335
ctx, cancel := context.WithCancel(context.Background())
321336

322-
config := meterexport.DataExportConfig{
323-
ExportWindowSize: meter.WindowSizeMinute,
324-
MeterID: models.NamespacedID{
325-
Namespace: "test-ns",
326-
ID: "meter-1",
337+
params := meterexport.DataExportParams{
338+
DataExportConfig: meterexport.DataExportConfig{
339+
ExportWindowSize: meter.WindowSizeMinute,
340+
ExportWindowTimeZone: time.UTC,
341+
MeterID: models.NamespacedID{
342+
Namespace: "test-ns",
343+
ID: "meter-1",
344+
},
327345
},
328346
Period: timeutil.StartBoundedPeriod{
329347
From: now.Add(-1000 * time.Minute),
@@ -335,7 +353,7 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
335353
done := make(chan struct{})
336354
go func() {
337355
defer close(done)
338-
_ = svc.ExportSyntheticMeterData(ctx, config, resultCh, errCh)
356+
_ = svc.ExportSyntheticMeterData(ctx, params, resultCh, errCh)
339357
}()
340358

341359
// Receive first event to confirm operation started
@@ -395,19 +413,22 @@ func TestExportSyntheticMeterData_ContextCancellation(t *testing.T) {
395413
ctx, cancel := context.WithCancel(context.Background())
396414
cancel()
397415

398-
config := meterexport.DataExportConfig{
399-
ExportWindowSize: meter.WindowSizeMinute,
400-
MeterID: models.NamespacedID{
401-
Namespace: "test-ns",
402-
ID: "meter-1",
416+
params := meterexport.DataExportParams{
417+
DataExportConfig: meterexport.DataExportConfig{
418+
ExportWindowSize: meter.WindowSizeMinute,
419+
ExportWindowTimeZone: time.UTC,
420+
MeterID: models.NamespacedID{
421+
Namespace: "test-ns",
422+
ID: "meter-1",
423+
},
403424
},
404425
Period: timeutil.StartBoundedPeriod{
405426
From: now.Add(-10 * time.Minute),
406427
To: lo.ToPtr(now),
407428
},
408429
}
409430

410-
exportErr := svc.ExportSyntheticMeterData(ctx, config, resultCh, errCh)
431+
exportErr := svc.ExportSyntheticMeterData(ctx, params, resultCh, errCh)
411432

412433
// The function itself doesn't return an error for context cancellation
413434
// (it's a streaming operation - errors go to channel)
@@ -505,11 +526,14 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
505526
})
506527
require.NoError(t, err)
507528

508-
config := meterexport.DataExportConfig{
509-
ExportWindowSize: meter.WindowSizeMinute,
510-
MeterID: models.NamespacedID{
511-
Namespace: "test-ns",
512-
ID: "meter-1",
529+
params := meterexport.DataExportParams{
530+
DataExportConfig: meterexport.DataExportConfig{
531+
ExportWindowSize: meter.WindowSizeMinute,
532+
ExportWindowTimeZone: time.UTC,
533+
MeterID: models.NamespacedID{
534+
Namespace: "test-ns",
535+
ID: "meter-1",
536+
},
513537
},
514538
Period: timeutil.StartBoundedPeriod{
515539
From: now.Add(-10 * time.Minute),
@@ -518,15 +542,15 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
518542
}
519543

520544
// Get descriptor first
521-
descriptor, err := svc.GetTargetMeterDescriptor(context.Background(), config)
545+
descriptor, err := svc.GetTargetMeterDescriptor(context.Background(), params.DataExportConfig)
522546
require.NoError(t, err)
523547

524548
// Verify descriptor
525549
assert.Equal(t, meter.MeterAggregationSum, descriptor.Aggregation)
526550
assert.Equal(t, testMeter.EventType, descriptor.EventType)
527551
assert.NotNil(t, descriptor.ValueProperty)
528552

529-
seq, err := svc.ExportSyntheticMeterDataIter(context.Background(), config)
553+
seq, err := svc.ExportSyntheticMeterDataIter(context.Background(), params)
530554
require.NoError(t, err)
531555

532556
// Collect events from iterator
@@ -568,19 +592,22 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
568592
})
569593
require.NoError(t, err)
570594

571-
config := meterexport.DataExportConfig{
572-
ExportWindowSize: meter.WindowSizeMinute,
573-
MeterID: models.NamespacedID{
574-
Namespace: "test-ns",
575-
ID: "meter-1",
595+
params := meterexport.DataExportParams{
596+
DataExportConfig: meterexport.DataExportConfig{
597+
ExportWindowSize: meter.WindowSizeMinute,
598+
ExportWindowTimeZone: time.UTC,
599+
MeterID: models.NamespacedID{
600+
Namespace: "test-ns",
601+
ID: "meter-1",
602+
},
576603
},
577604
Period: timeutil.StartBoundedPeriod{
578605
From: now.Add(-100 * time.Minute),
579606
To: lo.ToPtr(now),
580607
},
581608
}
582609

583-
seq, err := svc.ExportSyntheticMeterDataIter(context.Background(), config)
610+
seq, err := svc.ExportSyntheticMeterDataIter(context.Background(), params)
584611
require.NoError(t, err)
585612

586613
// Only consume first 3 events then break
@@ -612,19 +639,21 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
612639
})
613640
require.NoError(t, err)
614641

615-
config := meterexport.DataExportConfig{
616-
ExportWindowSize: meter.WindowSizeMinute,
617-
MeterID: models.NamespacedID{
618-
Namespace: "test-ns",
619-
ID: "", // Missing ID
642+
params := meterexport.DataExportParams{
643+
DataExportConfig: meterexport.DataExportConfig{
644+
ExportWindowSize: meter.WindowSizeMinute,
645+
MeterID: models.NamespacedID{
646+
Namespace: "test-ns",
647+
ID: "", // Missing ID
648+
},
620649
},
621650
Period: timeutil.StartBoundedPeriod{
622651
From: now.Add(-10 * time.Minute),
623652
To: lo.ToPtr(now),
624653
},
625654
}
626655

627-
_, err = svc.ExportSyntheticMeterDataIter(context.Background(), config)
656+
_, err = svc.ExportSyntheticMeterDataIter(context.Background(), params)
628657
require.Error(t, err)
629658
assert.Contains(t, err.Error(), "meter id is required")
630659
})
@@ -658,19 +687,22 @@ func TestExportSyntheticMeterDataIter(t *testing.T) {
658687
})
659688
require.NoError(t, err)
660689

661-
config := meterexport.DataExportConfig{
662-
ExportWindowSize: meter.WindowSizeMinute,
663-
MeterID: models.NamespacedID{
664-
Namespace: "test-ns",
665-
ID: "meter-avg",
690+
params := meterexport.DataExportParams{
691+
DataExportConfig: meterexport.DataExportConfig{
692+
ExportWindowSize: meter.WindowSizeMinute,
693+
ExportWindowTimeZone: time.UTC,
694+
MeterID: models.NamespacedID{
695+
Namespace: "test-ns",
696+
ID: "meter-avg",
697+
},
666698
},
667699
Period: timeutil.StartBoundedPeriod{
668700
From: now.Add(-10 * time.Minute),
669701
To: lo.ToPtr(now),
670702
},
671703
}
672704

673-
_, err = svc.ExportSyntheticMeterDataIter(context.Background(), config)
705+
_, err = svc.ExportSyntheticMeterDataIter(context.Background(), params)
674706
require.Error(t, err)
675707
assert.Contains(t, err.Error(), "unsupported meter aggregation")
676708
})

0 commit comments

Comments
 (0)