Skip to content

Commit fb2c455

Browse files
committed
feat: honor namespace lockdown in event processing
1 parent d108111 commit fb2c455

File tree

3 files changed

+37
-1
lines changed

3 files changed

+37
-1
lines changed

app/common/openmeter_billingworker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func NewBillingWorkerOptions(
7878
billingAdapter billing.Adapter,
7979
subscriptionServices SubscriptionServiceWithWorkflow,
8080
subsSyncHandler *billingworkersubscription.Handler,
81+
billingFsConfig config.BillingFeatureSwitchesConfiguration,
8182
logger *slog.Logger,
8283
) billingworker.WorkerOptions {
8384
return billingworker.WorkerOptions{
@@ -90,6 +91,9 @@ func NewBillingWorkerOptions(
9091
SubscriptionService: subscriptionServices.Service,
9192
BillingSubscriptionSyncHandler: subsSyncHandler,
9293
Logger: logger,
94+
95+
// Feature switches
96+
LockdownNamespaces: billingFsConfig.NamespaceLockdown,
9397
}
9498
}
9599

cmd/billing-worker/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openmeter/billing/worker/worker.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log/slog"
7+
"slices"
78
"time"
89

910
"github.com/ThreeDotsLabs/watermill/message"
@@ -31,6 +32,7 @@ type WorkerOptions struct {
3132
// External connectors
3233

3334
SubscriptionService subscription.Service
35+
LockdownNamespaces []string
3436
}
3537

3638
func (w WorkerOptions) Validate() error {
@@ -77,6 +79,7 @@ type Worker struct {
7779
asyncAdvanceHandler *asyncadvance.Handler
7880

7981
nonPublishingHandler *grouphandler.NoPublishingHandler
82+
lockdownNamespaces []string
8083
}
8184

8285
func New(opts WorkerOptions) (*Worker, error) {
@@ -97,6 +100,7 @@ func New(opts WorkerOptions) (*Worker, error) {
97100
billingService: opts.BillingService,
98101
subscriptionSyncHandler: opts.BillingSubscriptionSyncHandler,
99102
asyncAdvanceHandler: asyncAdvancer,
103+
lockdownNamespaces: opts.LockdownNamespaces,
100104
}
101105

102106
router, err := router.NewDefaultRouter(opts.Router)
@@ -136,24 +140,52 @@ func (w *Worker) eventHandler(opts WorkerOptions) (*grouphandler.NoPublishingHan
136140
opts.Router.MetricMeter,
137141

138142
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.CreatedEvent) error {
143+
if event != nil && slices.Contains(w.lockdownNamespaces, event.Subscription.Namespace) {
144+
return nil
145+
}
146+
139147
return w.subscriptionSyncHandler.SyncronizeSubscriptionAndInvoiceCustomer(ctx, event.SubscriptionView, time.Now())
140148
}),
141149
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.CancelledEvent) error {
150+
if event != nil && slices.Contains(w.lockdownNamespaces, event.Subscription.Namespace) {
151+
return nil
152+
}
153+
142154
return w.subscriptionSyncHandler.HandleCancelledEvent(ctx, event)
143155
}),
144156
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.ContinuedEvent) error {
157+
if event != nil && slices.Contains(w.lockdownNamespaces, event.Subscription.Namespace) {
158+
return nil
159+
}
160+
145161
return w.subscriptionSyncHandler.SyncronizeSubscriptionAndInvoiceCustomer(ctx, event.SubscriptionView, time.Now())
146162
}),
147163
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.UpdatedEvent) error {
164+
if event != nil && slices.Contains(w.lockdownNamespaces, event.UpdatedView.Subscription.Namespace) {
165+
return nil
166+
}
167+
148168
return w.subscriptionSyncHandler.SyncronizeSubscriptionAndInvoiceCustomer(ctx, event.UpdatedView, time.Now())
149169
}),
150170
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.SubscriptionSyncEvent) error {
171+
if event != nil && slices.Contains(w.lockdownNamespaces, event.Subscription.Namespace) {
172+
return nil
173+
}
174+
151175
return w.subscriptionSyncHandler.HandleSubscriptionSyncEvent(ctx, event)
152176
}),
153177
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *billing.AdvanceInvoiceEvent) error {
178+
if event != nil && slices.Contains(w.lockdownNamespaces, event.Invoice.Namespace) {
179+
return nil
180+
}
181+
154182
return w.asyncAdvanceHandler.Handle(ctx, event)
155183
}),
156184
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *billing.InvoiceCreatedEvent) error {
185+
if event != nil && slices.Contains(w.lockdownNamespaces, event.EventInvoice.Invoice.Namespace) {
186+
return nil
187+
}
188+
157189
return w.subscriptionSyncHandler.HandleInvoiceCreation(ctx, event.EventInvoice)
158190
}),
159191
)

0 commit comments

Comments
 (0)