Skip to content

Commit d108111

Browse files
authored
feat: allow triggering subscription sync via events (#3695)
1 parent cbb1e28 commit d108111

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

openmeter/billing/worker/subscription/sync.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,19 @@ func (h *Handler) invoicePendingLines(ctx context.Context, customer customer.Cus
121121
})
122122
}
123123

124+
func (h *Handler) HandleSubscriptionSyncEvent(ctx context.Context, event *subscription.SubscriptionSyncEvent) error {
125+
if event == nil {
126+
return nil
127+
}
128+
129+
subsView, err := h.subscriptionService.GetView(ctx, event.Subscription.NamespacedID)
130+
if err != nil {
131+
return fmt.Errorf("getting subscription view: %w", err)
132+
}
133+
134+
return h.SyncronizeSubscriptionAndInvoiceCustomer(ctx, subsView, time.Now())
135+
}
136+
124137
func (h *Handler) SyncronizeSubscriptionAndInvoiceCustomer(ctx context.Context, subs subscription.SubscriptionView, asOf time.Time) error {
125138
span := tracex.StartWithNoValue(ctx, h.tracer, "billing.worker.subscription.sync.SynchronizeSubscriptionAndInvoiceCustomer", trace.WithAttributes(
126139
attribute.String("subscription_id", subs.Subscription.ID),

openmeter/billing/worker/worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ func (w *Worker) eventHandler(opts WorkerOptions) (*grouphandler.NoPublishingHan
147147
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.UpdatedEvent) error {
148148
return w.subscriptionSyncHandler.SyncronizeSubscriptionAndInvoiceCustomer(ctx, event.UpdatedView, time.Now())
149149
}),
150+
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *subscription.SubscriptionSyncEvent) error {
151+
return w.subscriptionSyncHandler.HandleSubscriptionSyncEvent(ctx, event)
152+
}),
150153
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *billing.AdvanceInvoiceEvent) error {
151154
return w.asyncAdvanceHandler.Handle(ctx, event)
152155
}),

openmeter/subscription/events.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package subscription
22

33
import (
44
"context"
5+
"errors"
56

67
"github.com/openmeterio/openmeter/openmeter/event/metadata"
78
"github.com/openmeterio/openmeter/openmeter/session"
@@ -194,3 +195,48 @@ func (s UpdatedEvent) EventMetadata() metadata.EventMetadata {
194195
func (s UpdatedEvent) Validate() error {
195196
return s.UpdatedView.Validate(true)
196197
}
198+
199+
type SubscriptionSyncEvent struct {
200+
Subscription Subscription `json:"subscription"`
201+
}
202+
203+
func NewSubscriptionSyncEvent(ctx context.Context, sub Subscription) SubscriptionSyncEvent {
204+
return SubscriptionSyncEvent{
205+
Subscription: sub,
206+
}
207+
}
208+
209+
var (
210+
_ marshaler.Event = SubscriptionSyncEvent{}
211+
212+
subscriptionSyncEventName = metadata.GetEventName(metadata.EventType{
213+
Subsystem: EventSubsystem,
214+
Name: "subscription.sync",
215+
Version: "v1",
216+
})
217+
)
218+
219+
func (s SubscriptionSyncEvent) EventName() string {
220+
return subscriptionSyncEventName
221+
}
222+
223+
func (s SubscriptionSyncEvent) Validate() error {
224+
var errs []error
225+
226+
if err := s.Subscription.NamespacedID.Validate(); err != nil {
227+
errs = append(errs, err)
228+
}
229+
230+
if s.Subscription.CustomerId == "" {
231+
errs = append(errs, errors.New("customer id is required"))
232+
}
233+
234+
return errors.Join(errs...)
235+
}
236+
237+
func (s SubscriptionSyncEvent) EventMetadata() metadata.EventMetadata {
238+
return metadata.EventMetadata{
239+
Source: metadata.ComposeResourcePath(s.Subscription.Namespace, metadata.EntitySubscription, s.Subscription.ID),
240+
Subject: metadata.ComposeResourcePath(s.Subscription.Namespace, metadata.EntityCustomer, s.Subscription.CustomerId),
241+
}
242+
}

0 commit comments

Comments
 (0)