Skip to content

Commit bd5eae7

Browse files
authored
chore: remove redis based highwatermark cache (#3605)
1 parent 2620162 commit bd5eae7

File tree

10 files changed

+52
-449
lines changed

10 files changed

+52
-449
lines changed

app/common/openmeter_balanceworker.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ var BalanceWorker = wire.NewSet(
3232
NewBalanceWorkerOptions,
3333
NewBalanceWorker,
3434
BalanceWorkerGroup,
35-
NewBalanceWorkerFilterStateStorage,
3635
)
3736

3837
var BalanceWorkerAdapter = wire.NewSet(
@@ -94,7 +93,7 @@ func NewBalanceWorkerOptions(
9493
subjectService subject.Service,
9594
customerService customer.Service,
9695
logger *slog.Logger,
97-
filterStateStorage balanceworker.FilterStateStorage,
96+
balanceWorkerConfiguration config.BalanceWorkerConfiguration,
9897
) balanceworker.WorkerOptions {
9998
return balanceworker.WorkerOptions{
10099
SystemEventsTopic: eventConfig.SystemEvents.Topic,
@@ -109,7 +108,7 @@ func NewBalanceWorkerOptions(
109108
Logger: logger,
110109
MetricMeter: routerOptions.MetricMeter,
111110
NotificationService: notificationService,
112-
FilterStateStorage: filterStateStorage,
111+
HighWatermarkCacheSize: balanceWorkerConfiguration.StateStorage.HighWatermarkCache.LRUCacheSize,
113112
}
114113
}
115114

@@ -122,30 +121,6 @@ func NewBalanceWorker(workerOptions balanceworker.WorkerOptions) (*balanceworker
122121
return worker, nil
123122
}
124123

125-
func NewBalanceWorkerFilterStateStorage(conf config.BalanceWorkerConfiguration) (balanceworker.FilterStateStorage, error) {
126-
switch conf.StateStorage.Driver {
127-
case config.BalanceWorkerStateStorageDriverRedis:
128-
redis, err := conf.StateStorage.GetRedisBackendConfiguration()
129-
if err != nil {
130-
return balanceworker.FilterStateStorage{}, fmt.Errorf("failed to get redis backend configuration: %w", err)
131-
}
132-
133-
client, err := redis.NewClient()
134-
if err != nil {
135-
return balanceworker.FilterStateStorage{}, fmt.Errorf("failed to create redis client: %w", err)
136-
}
137-
138-
return balanceworker.NewFilterStateStorage(balanceworker.FilterStateStorageRedis{
139-
Client: client,
140-
Expiration: redis.Expiration,
141-
})
142-
case config.BalanceWorkerStateStorageDriverInMemory:
143-
return balanceworker.NewFilterStateStorage(balanceworker.FilterStateStorageInMemory{})
144-
default:
145-
return balanceworker.FilterStateStorage{}, fmt.Errorf("unsupported state storage driver: %s", conf.StateStorage.Driver)
146-
}
147-
}
148-
149124
func BalanceWorkerGroup(
150125
ctx context.Context,
151126
worker *balanceworker.Worker,

app/config/balanceworker.go

Lines changed: 11 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,10 @@ package config
22

33
import (
44
"errors"
5-
"fmt"
6-
"slices"
7-
"time"
85

9-
"github.com/mitchellh/mapstructure"
106
"github.com/spf13/viper"
117

128
"github.com/openmeterio/openmeter/pkg/errorsx"
13-
"github.com/openmeterio/openmeter/pkg/models"
14-
"github.com/openmeterio/openmeter/pkg/redis"
159
)
1610

1711
type BalanceWorkerConfiguration struct {
@@ -46,115 +40,35 @@ type rawBalanceWorkerStateStorageConfiguration struct {
4640
}
4741

4842
type BalanceWorkerStateStorageConfiguration struct {
49-
Driver BalanceWorkerStateStorageDriver
50-
51-
BalanceWorkerStateStorageBackendConfiguration
52-
}
53-
54-
func (c *BalanceWorkerStateStorageConfiguration) DecodeMap(v map[string]any) error {
55-
var raw rawBalanceWorkerStateStorageConfiguration
56-
57-
if err := mapstructure.Decode(v, &raw); err != nil {
58-
return err
59-
}
60-
61-
c.Driver = raw.Driver
62-
63-
switch c.Driver {
64-
case BalanceWorkerStateStorageDriverRedis:
65-
var redisConfig BalanceWorkerStateStorageRedisBackendConfiguration
66-
67-
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
68-
Metadata: nil,
69-
Result: &redisConfig,
70-
WeaklyTypedInput: true,
71-
DecodeHook: mapstructure.ComposeDecodeHookFunc(
72-
mapstructure.StringToTimeDurationHookFunc(),
73-
),
74-
})
75-
if err != nil {
76-
return err
77-
}
78-
79-
if err := decoder.Decode(raw.Config); err != nil {
80-
return err
81-
}
82-
83-
c.BalanceWorkerStateStorageBackendConfiguration = redisConfig
84-
case BalanceWorkerStateStorageDriverInMemory:
85-
// no config
86-
}
87-
88-
return nil
43+
HighWatermarkCache BalanceWorkerHighWatermarkCacheConfiguration
8944
}
9045

9146
func (c BalanceWorkerStateStorageConfiguration) Validate() error {
92-
errs := []error{}
93-
if !slices.Contains([]BalanceWorkerStateStorageDriver{
94-
BalanceWorkerStateStorageDriverRedis,
95-
BalanceWorkerStateStorageDriverInMemory,
96-
}, c.Driver) {
97-
errs = append(errs, fmt.Errorf("invalid driver: %s", c.Driver))
98-
}
47+
var errs []error
9948

100-
if c.Driver == BalanceWorkerStateStorageDriverRedis {
101-
if c.BalanceWorkerStateStorageBackendConfiguration == nil {
102-
errs = append(errs, errors.New("state storage backend configuration is required"))
103-
} else {
104-
if err := c.BalanceWorkerStateStorageBackendConfiguration.Validate(); err != nil {
105-
errs = append(errs, fmt.Errorf("state storage backend: %w", err))
106-
}
107-
}
49+
if err := c.HighWatermarkCache.Validate(); err != nil {
50+
errs = append(errs, errorsx.WithPrefix(err, "high watermark cache"))
10851
}
10952

11053
return errors.Join(errs...)
11154
}
11255

113-
func (c BalanceWorkerStateStorageConfiguration) GetRedisBackendConfiguration() (BalanceWorkerStateStorageRedisBackendConfiguration, error) {
114-
if c.Driver != BalanceWorkerStateStorageDriverRedis {
115-
return BalanceWorkerStateStorageRedisBackendConfiguration{}, fmt.Errorf("driver is not redis")
116-
}
117-
118-
if c.BalanceWorkerStateStorageBackendConfiguration == nil {
119-
return BalanceWorkerStateStorageRedisBackendConfiguration{}, errors.New("state storage backend configuration is required")
120-
}
121-
122-
redisConfig, ok := c.BalanceWorkerStateStorageBackendConfiguration.(BalanceWorkerStateStorageRedisBackendConfiguration)
123-
if !ok {
124-
return BalanceWorkerStateStorageRedisBackendConfiguration{}, fmt.Errorf("state storage backend configuration is not a redis configuration")
125-
}
126-
127-
return redisConfig, nil
56+
type BalanceWorkerHighWatermarkCacheConfiguration struct {
57+
LRUCacheSize int
12858
}
12959

130-
type BalanceWorkerStateStorageBackendConfiguration interface {
131-
models.Validator
132-
}
133-
134-
type BalanceWorkerStateStorageRedisBackendConfiguration struct {
135-
redis.Config `mapstructure:",squash"`
136-
Expiration time.Duration
137-
}
138-
139-
func (c BalanceWorkerStateStorageRedisBackendConfiguration) Validate() error {
140-
errs := []error{}
141-
142-
if c.Expiration <= 0 {
143-
errs = append(errs, errors.New("expiration should be greater than 0"))
144-
}
145-
146-
if err := c.Config.Validate(); err != nil {
147-
errs = append(errs, fmt.Errorf("redis: %w", err))
60+
func (c BalanceWorkerHighWatermarkCacheConfiguration) Validate() error {
61+
if c.LRUCacheSize <= 0 {
62+
return errors.New("LRU cache size must be positive")
14863
}
14964

150-
return errors.Join(errs...)
65+
return nil
15166
}
15267

15368
func ConfigureBalanceWorker(v *viper.Viper) {
15469
ConfigureConsumer(v, "balanceWorker")
15570
v.SetDefault("balanceWorker.dlq.topic", "om_sys.balance_worker_dlq")
15671
v.SetDefault("balanceWorker.consumerGroupName", "om_balance_worker")
15772

158-
v.SetDefault("balanceWorker.stateStorage.driver", BalanceWorkerStateStorageDriverInMemory)
159-
v.SetDefault("balanceWorker.stateStorage.config.expiration", 24*time.Hour)
73+
v.SetDefault("balanceWorker.stateStorage.highWatermarkCache.lruCacheSize", 100_000)
16074
}

app/config/config_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,8 @@ func TestComplete(t *testing.T) {
341341
ConsumerGroupName: "om_balance_worker",
342342
},
343343
StateStorage: BalanceWorkerStateStorageConfiguration{
344-
Driver: BalanceWorkerStateStorageDriverRedis,
345-
BalanceWorkerStateStorageBackendConfiguration: BalanceWorkerStateStorageRedisBackendConfiguration{
346-
Expiration: 23 * time.Hour,
347-
Config: redis.Config{
348-
Address: "127.0.0.1:6379",
349-
},
344+
HighWatermarkCache: BalanceWorkerHighWatermarkCacheConfiguration{
345+
LRUCacheSize: 100_000,
350346
},
351347
},
352348
},

cmd/balance-worker/wire_gen.go

Lines changed: 1 addition & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/jobs/entitlement/recalculatesnapshots.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,15 @@ func NewRecalculateBalanceSnapshotsCommand() *cobra.Command {
1414
Use: "recalculate-balance-snapshots",
1515
Short: "Recalculate balance snapshots and send the resulting events into the eventbus",
1616
RunE: func(cmd *cobra.Command, args []string) error {
17-
filterStateStorage, err := balanceworker.NewFilterStateStorage(balanceworker.FilterStateStorageInMemory{})
18-
if err != nil {
19-
return err
20-
}
21-
2217
recalculator, err := balanceworker.NewRecalculator(balanceworker.RecalculatorOptions{
23-
Entitlement: internal.App.EntitlementRegistry,
24-
EventBus: internal.App.EventPublisher,
25-
MetricMeter: internal.App.Meter,
26-
NotificationService: internal.App.NotificationService,
27-
FilterStateStorage: filterStateStorage,
28-
Logger: internal.App.Logger,
29-
Customer: internal.App.Customer,
30-
Subject: internal.App.Subject,
18+
Entitlement: internal.App.EntitlementRegistry,
19+
EventBus: internal.App.EventPublisher,
20+
MetricMeter: internal.App.Meter,
21+
NotificationService: internal.App.NotificationService,
22+
HighWatermarkCacheSize: 100_000,
23+
Logger: internal.App.Logger,
24+
Customer: internal.App.Customer,
25+
Subject: internal.App.Subject,
3126
})
3227
if err != nil {
3328
return err

openmeter/entitlement/balanceworker/filters.go

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ var _ models.Validator = (*EntitlementFiltersConfig)(nil)
3838
type EntitlementFiltersConfig struct {
3939
NotificationService notification.Service
4040
MetricMeter metric.Meter
41-
StateStorage FilterStateStorage
4241
Logger *slog.Logger
42+
43+
HighWatermarkCacheSize int
4344
}
4445

4546
func (c EntitlementFiltersConfig) Validate() error {
@@ -53,8 +54,8 @@ func (c EntitlementFiltersConfig) Validate() error {
5354
errs = append(errs, fmt.Errorf("metric meter is required"))
5455
}
5556

56-
if err := c.StateStorage.Validate(); err != nil {
57-
errs = append(errs, fmt.Errorf("state storage: %w", err))
57+
if c.HighWatermarkCacheSize <= 0 {
58+
errs = append(errs, fmt.Errorf("high watermark cache size must be positive"))
5859
}
5960

6061
if c.Logger == nil {
@@ -76,6 +77,10 @@ type EntitlementFilters struct {
7677
}
7778

7879
func NewEntitlementFilters(cfg EntitlementFiltersConfig) (*EntitlementFilters, error) {
80+
if err := cfg.Validate(); err != nil {
81+
return nil, err
82+
}
83+
7984
notificationFilter, err := filters.NewNotificationsFilter(filters.NotificationsFilterConfig{
8085
NotificationService: cfg.NotificationService,
8186
CacheTTL: defaultCacheTTL,
@@ -85,33 +90,7 @@ func NewEntitlementFilters(cfg EntitlementFiltersConfig) (*EntitlementFilters, e
8590
return nil, err
8691
}
8792

88-
var highWatermarkCacheBackend filters.HighWatermarkBackend
89-
90-
switch cfg.StateStorage.Driver() {
91-
case FilterStateStorageDriverRedis:
92-
redis, err := cfg.StateStorage.Redis()
93-
if err != nil {
94-
return nil, err
95-
}
96-
97-
highWatermarkCacheBackend, err = filters.NewHighWatermarkRedisBackend(filters.HighWatermarkRedisBackendConfig{
98-
Redis: redis.Client,
99-
Logger: cfg.Logger,
100-
Expiration: redis.Expiration,
101-
})
102-
if err != nil {
103-
return nil, err
104-
}
105-
case FilterStateStorageDriverInMemory:
106-
highWatermarkCacheBackend, err = filters.NewHighWatermarkInMemoryBackend(defaultLRUCacheSize)
107-
if err != nil {
108-
return nil, err
109-
}
110-
default:
111-
return nil, fmt.Errorf("unsupported state storage driver: %s", cfg.StateStorage.Driver())
112-
}
113-
114-
highWatermarkCache, err := filters.NewHighWatermarkCache(highWatermarkCacheBackend)
93+
highWatermarkCache, err := filters.NewHighWatermarkCache(cfg.HighWatermarkCacheSize)
11594
if err != nil {
11695
return nil, err
11796
}

0 commit comments

Comments
 (0)