Skip to content

Commit 8dde17a

Browse files
authored
feat: reserve event types (#3679)
1 parent 0c4ee47 commit 8dde17a

File tree

10 files changed

+148
-37
lines changed

10 files changed

+148
-37
lines changed

app/common/meter.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package common
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log/slog"
8+
"regexp"
79

810
"github.com/google/wire"
911
"github.com/samber/lo"
@@ -27,13 +29,15 @@ var Meter = wire.NewSet(
2729
var MeterManage = wire.NewSet(
2830
Meter,
2931
NewMeterManageService,
32+
NewReservedEventTypePatterns,
3033
)
3134

3235
var MeterManageWithConfigMeters = wire.NewSet(
3336
wire.FieldsOf(new(config.Configuration), "Meters"),
3437

3538
Meter,
3639
NewMeterManageService,
40+
NewReservedEventTypePatterns,
3741
NewMeterConfigInitializer,
3842
)
3943

@@ -43,15 +47,36 @@ func NewMeterService(
4347
return service.New(meterAdapter)
4448
}
4549

50+
func NewReservedEventTypePatterns(reserved []config.ReservedEventTypePattern) ([]*meter.EventTypePattern, error) {
51+
var errs []error
52+
53+
patterns := make([]*meter.EventTypePattern, 0, len(reserved))
54+
55+
for _, r := range reserved {
56+
pattern, err := regexp.Compile(r)
57+
if err != nil {
58+
errs = append(errs, fmt.Errorf("invalid reserved event type pattern %q: %w", r, err))
59+
60+
continue
61+
}
62+
63+
patterns = append(patterns, pattern)
64+
}
65+
66+
return patterns, errors.Join(errs...)
67+
}
68+
4669
func NewMeterManageService(
4770
meterAdapter *adapter.Adapter,
4871
namespaceManager *namespace.Manager,
4972
publisher eventbus.Publisher,
73+
reservedEventTypes []*meter.EventTypePattern,
5074
) meter.ManageService {
5175
return service.NewManage(
5276
meterAdapter,
5377
publisher,
5478
namespaceManager,
79+
reservedEventTypes,
5580
)
5681
}
5782

app/config/config.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package config
33

44
import (
55
"errors"
6+
"fmt"
7+
"regexp"
68
"strings"
79
"time"
810

@@ -15,6 +17,8 @@ import (
1517
"github.com/openmeterio/openmeter/pkg/models"
1618
)
1719

20+
type ReservedEventTypePattern = string
21+
1822
// Configuration holds any kind of Configuration that comes from the outside world and
1923
// is necessary for running the application.
2024
type Configuration struct {
@@ -25,24 +29,25 @@ type Configuration struct {
2529

2630
Termination TerminationConfig
2731

28-
Aggregation AggregationConfiguration
29-
Entitlements EntitlementsConfiguration
30-
Customer CustomerConfiguration
31-
Dedupe DedupeConfiguration
32-
Events EventsConfiguration
33-
Ingest IngestConfiguration
34-
Meters []*meter.Meter
35-
Namespace NamespaceConfiguration
36-
Portal PortalConfiguration
37-
Postgres PostgresConfig
38-
Sink SinkConfiguration
39-
BalanceWorker BalanceWorkerConfiguration
40-
Notification NotificationConfiguration
41-
ProductCatalog ProductCatalogConfiguration
42-
ProgressManager ProgressManagerConfiguration
43-
Billing BillingConfiguration
44-
Apps AppsConfiguration
45-
Svix SvixConfig
32+
Aggregation AggregationConfiguration
33+
Entitlements EntitlementsConfiguration
34+
Customer CustomerConfiguration
35+
Dedupe DedupeConfiguration
36+
Events EventsConfiguration
37+
Ingest IngestConfiguration
38+
Meters []*meter.Meter
39+
ReservedEventTypes []ReservedEventTypePattern
40+
Namespace NamespaceConfiguration
41+
Portal PortalConfiguration
42+
Postgres PostgresConfig
43+
Sink SinkConfiguration
44+
BalanceWorker BalanceWorkerConfiguration
45+
Notification NotificationConfiguration
46+
ProductCatalog ProductCatalogConfiguration
47+
ProgressManager ProgressManagerConfiguration
48+
Billing BillingConfiguration
49+
Apps AppsConfiguration
50+
Svix SvixConfig
4651
}
4752

4853
// Validate validates the configuration.
@@ -107,6 +112,12 @@ func (c Configuration) Validate() error {
107112
}
108113
}
109114

115+
for _, pattern := range c.ReservedEventTypes {
116+
if _, err := regexp.Compile(pattern); err != nil {
117+
errs = append(errs, fmt.Errorf("reserved event type pattern %q: invalid regular expression", pattern))
118+
}
119+
}
120+
110121
if err := c.BalanceWorker.Validate(); err != nil {
111122
errs = append(errs, errorsx.WithPrefix(err, "balance worker"))
112123
}

app/config/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,11 @@ func TestComplete(t *testing.T) {
411411
EnableSubjectHook: true,
412412
IgnoreErrors: true,
413413
},
414+
ReservedEventTypes: []string{
415+
`^reserved\..*$`,
416+
`^_\..*$`,
417+
`^openmeter\..*$`,
418+
},
414419
}
415420

416421
assert.Equal(t, expected, actual)

app/config/testdata/complete.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ meters:
139139
method: $.method
140140
path: $.path
141141

142+
reservedEventTypes:
143+
- ^reserved\..*$
144+
- ^_\..*$
145+
- ^openmeter\..*$
146+
142147
notification:
143148
enabled: true
144149
reconcileInterval: 1m

cmd/server/wire.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ type Application struct {
8686

8787
func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) {
8888
wire.Build(
89+
wire.FieldsOf(new(config.Configuration), "ReservedEventTypes"),
90+
8991
metadata,
9092
common.App,
9193
common.Billing,

cmd/server/wire_gen.go

Lines changed: 21 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openmeter/meter/adapter/manage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// CreateMeter creates a new meter.
2222
func (a *Adapter) CreateMeter(ctx context.Context, input meterpkg.CreateMeterInput) (meterpkg.Meter, error) {
2323
if err := input.Validate(); err != nil {
24-
return meterpkg.Meter{}, models.NewGenericValidationError(err)
24+
return meterpkg.Meter{}, err
2525
}
2626

2727
return transaction.Run(ctx, a, func(ctx context.Context) (meterpkg.Meter, error) {

openmeter/meter/meter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212

1313
var groupByKeyRegExp = regexp.MustCompile(`^[a-zA-Z_][0-9a-zA-Z_]*$`)
1414

15+
type EventTypePattern = regexp.Regexp
16+
1517
type MeterAggregation string
1618

1719
// Note: keep values up to date in the meter package

openmeter/meter/service.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,15 @@ func (p ListMetersParams) Validate() error {
103103
return errors.Join(errs...)
104104
}
105105

106+
type inputOptions struct {
107+
AllowReservedEventTypes bool
108+
}
109+
110+
var (
111+
_ models.Validator = (*CreateMeterInput)(nil)
112+
_ models.CustomValidator[CreateMeterInput] = (*CreateMeterInput)(nil)
113+
)
114+
106115
// CreateMeterInput is a parameter object for creating a meter.
107116
type CreateMeterInput struct {
108117
Namespace string
@@ -116,6 +125,32 @@ type CreateMeterInput struct {
116125
GroupBy map[string]string
117126
Metadata models.Metadata
118127
Annotations models.Annotations
128+
129+
inputOptions
130+
}
131+
132+
func (i CreateMeterInput) ValidateWith(validators ...models.ValidatorFunc[CreateMeterInput]) error {
133+
return models.Validate(i, validators...)
134+
}
135+
136+
func ValidateCreateMeterInputWithReservedEventTypes(reserved []*EventTypePattern) models.ValidatorFunc[CreateMeterInput] {
137+
return func(input CreateMeterInput) error {
138+
if input.AllowReservedEventTypes {
139+
return nil
140+
}
141+
142+
for _, pattern := range reserved {
143+
if pattern == nil {
144+
continue
145+
}
146+
147+
if ok := pattern.MatchString(input.EventType); ok {
148+
return fmt.Errorf("event type '%s' is reserved: matched pattern '%s'", input.EventType, pattern.String())
149+
}
150+
}
151+
152+
return nil
153+
}
119154
}
120155

121156
// Validate validates the create meter input.
@@ -160,7 +195,7 @@ func (i CreateMeterInput) Validate() error {
160195
errs = append(errs, fmt.Errorf("invalid meter group by: %w", err))
161196
}
162197

163-
return errors.Join(errs...)
198+
return models.NewNillableGenericValidationError(errors.Join(errs...))
164199
}
165200

166201
// UpdateMeterInput is a parameter object for creating a meter.

openmeter/meter/service/manage.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@ var _ meter.ManageService = (*ManageService)(nil)
1717

1818
type ManageService struct {
1919
meter.Service
20-
preUpdateHooks []meter.PreUpdateMeterHook
21-
adapter *adapter.Adapter
22-
publisher eventbus.Publisher
23-
namespaceManager *namespace.Manager
20+
preUpdateHooks []meter.PreUpdateMeterHook
21+
adapter *adapter.Adapter
22+
publisher eventbus.Publisher
23+
namespaceManager *namespace.Manager
24+
reservedEventTypes []*meter.EventTypePattern
2425
}
2526

2627
func NewManage(
2728
adapter *adapter.Adapter,
2829
publisher eventbus.Publisher,
2930
namespaceManager *namespace.Manager,
31+
reservedEventTypes []*meter.EventTypePattern,
3032
) *ManageService {
3133
return &ManageService{
32-
Service: New(adapter),
33-
adapter: adapter,
34-
publisher: publisher,
35-
namespaceManager: namespaceManager,
34+
Service: New(adapter),
35+
adapter: adapter,
36+
publisher: publisher,
37+
namespaceManager: namespaceManager,
38+
reservedEventTypes: reservedEventTypes,
3639
}
3740
}
3841

@@ -44,6 +47,17 @@ func (s *ManageService) RegisterPreUpdateMeterHook(hook meter.PreUpdateMeterHook
4447

4548
// CreateMeter creates a meter
4649
func (s *ManageService) CreateMeter(ctx context.Context, input meter.CreateMeterInput) (meter.Meter, error) {
50+
if err := input.Validate(); err != nil {
51+
return meter.Meter{}, fmt.Errorf("invalid create meter params: %w", err)
52+
}
53+
54+
if err := input.ValidateWith(
55+
// Validate with reserved event types
56+
meter.ValidateCreateMeterInputWithReservedEventTypes(s.reservedEventTypes),
57+
); err != nil {
58+
return meter.Meter{}, fmt.Errorf("invalid create meter params: %w", err)
59+
}
60+
4761
// Create the meter
4862
createdMeter, err := s.adapter.CreateMeter(ctx, input)
4963
if err != nil {

0 commit comments

Comments
 (0)