From e2331d9e0b1dd2ea0c406e58a31bdb715e026ac3 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Fri, 23 Jan 2026 14:14:43 +0100 Subject: [PATCH 1/6] refactor: simplify line service --- .../billing/service/gatheringinvoicependinglines.go | 12 +++++++++--- openmeter/billing/service/invoice.go | 4 ++-- openmeter/billing/service/lineservice/linebase.go | 11 ----------- openmeter/billing/service/lineservice/service.go | 6 +++--- .../billing/service/lineservice/usagebasedline.go | 10 ---------- .../service/lineservice/usagebasedlineflat.go | 4 ---- 6 files changed, 14 insertions(+), 33 deletions(-) diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 2c271b2c3..5fbc4e279 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -563,7 +563,7 @@ func (s *Service) splitGatheringInvoiceLine(ctx context.Context, in splitGatheri if !postSplitAtLineSvc.IsPeriodEmptyConsideringTruncations() { gatheringInvoice.Lines.Append(postSplitAtLine) - if err := postSplitAtLineSvc.Validate(ctx, &gatheringInvoice); err != nil { + if err := postSplitAtLineSvc.Validate(ctx); err != nil { return res, fmt.Errorf("validating post split line: %w", err) } } @@ -583,7 +583,7 @@ func (s *Service) splitGatheringInvoiceLine(ctx context.Context, in splitGatheri if preSplitAtLineSvc.IsPeriodEmptyConsideringTruncations() { line.DeletedAt = lo.ToPtr(clock.Now()) } else { - if err := preSplitAtLineSvc.Validate(ctx, &gatheringInvoice); err != nil { + if err := preSplitAtLineSvc.Validate(ctx); err != nil { return res, fmt.Errorf("validating pre split line: %w", err) } } @@ -824,6 +824,12 @@ func (s *Service) moveLinesToInvoice(ctx context.Context, in moveLinesToInvoiceI return slices.Contains(in.LineIDsToMove, line.ID) }) + for _, line := range linesToMove { + if line.Currency != dstInvoice.Currency { + return nil, fmt.Errorf("line[%s]: currency[%s] is not equal to target invoice currency[%s]", line.ID, line.Currency, dstInvoice.Currency) + } + } + if len(linesToMove) != len(in.LineIDsToMove) { return nil, fmt.Errorf("lines to move[%d] must contain the same number of lines as line IDs to move[%d]", len(linesToMove), len(in.LineIDsToMove)) } @@ -833,7 +839,7 @@ func (s *Service) moveLinesToInvoice(ctx context.Context, in moveLinesToInvoiceI return nil, fmt.Errorf("creating line services for lines to move: %w", err) } - if err := linesToAssociate.ValidateForInvoice(ctx, &dstInvoice); err != nil { + if err := linesToAssociate.Validate(ctx); err != nil { return nil, fmt.Errorf("validating lines to move: %w", err) } diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 1f0306a1b..81128e75f 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -674,7 +674,7 @@ func (s Service) checkIfLinesAreInvoicable(ctx context.Context, invoice *billing return errors.Join( lo.Map(inScopeLineServices, func(lineSvc lineservice.Line, _ int) error { - if err := lineSvc.Validate(ctx, invoice); err != nil { + if err := lineSvc.Validate(ctx); err != nil { return fmt.Errorf("validating line[%s]: %w", lineSvc.ID(), err) } @@ -806,7 +806,7 @@ func (s Service) SimulateInvoice(ctx context.Context, input billing.SimulateInvo // Let's update the lines and the detailed lines for _, lineSvc := range inScopeLineSvcs { - if err := lineSvc.Validate(ctx, &invoice); err != nil { + if err := lineSvc.Validate(ctx); err != nil { return billing.StandardInvoice{}, billing.ValidationError{ Err: err, } diff --git a/openmeter/billing/service/lineservice/linebase.go b/openmeter/billing/service/lineservice/linebase.go index 1046b9c33..362efdc37 100644 --- a/openmeter/billing/service/lineservice/linebase.go +++ b/openmeter/billing/service/lineservice/linebase.go @@ -1,7 +1,6 @@ package lineservice import ( - "context" "time" "github.com/openmeterio/openmeter/openmeter/billing" @@ -70,16 +69,6 @@ func (l lineBase) Period() billing.Period { return l.line.Period } -func (l lineBase) Validate(ctx context.Context, invoice *billing.StandardInvoice) error { - if l.line.Currency != invoice.Currency || l.line.Currency == "" { - return billing.ValidationError{ - Err: billing.ErrInvoiceLineCurrencyMismatch, - } - } - - return nil -} - func (l lineBase) IsLastInPeriod() bool { if l.line.SplitLineGroupID == nil { return true diff --git a/openmeter/billing/service/lineservice/service.go b/openmeter/billing/service/lineservice/service.go index c9a26a15b..18c7e9349 100644 --- a/openmeter/billing/service/lineservice/service.go +++ b/openmeter/billing/service/lineservice/service.go @@ -128,7 +128,7 @@ type Line interface { // this method does any truncation for usage based lines. IsPeriodEmptyConsideringTruncations() bool - Validate(context.Context, *billing.StandardInvoice) error + Validate(context.Context) error CanBeInvoicedAsOf(context.Context, CanBeInvoicedAsOfInput) (*billing.Period, error) CalculateDetailedLines() error UpdateTotals() error @@ -136,13 +136,13 @@ type Line interface { type Lines []Line -func (s Lines) ValidateForInvoice(ctx context.Context, invoice *billing.StandardInvoice) error { +func (s Lines) Validate(ctx context.Context) error { return errors.Join(lo.Map(s, func(line Line, idx int) error { if line == nil { return fmt.Errorf("line[%d] is nil", idx) } - if err := line.Validate(ctx, invoice); err != nil { + if err := line.Validate(ctx); err != nil { id := line.ID() if id == "" { id = fmt.Sprintf("line[%d]", idx) diff --git a/openmeter/billing/service/lineservice/usagebasedline.go b/openmeter/billing/service/lineservice/usagebasedline.go index f11c4c5ad..0ebc0d474 100644 --- a/openmeter/billing/service/lineservice/usagebasedline.go +++ b/openmeter/billing/service/lineservice/usagebasedline.go @@ -48,16 +48,6 @@ func (l usageBasedLine) Validate(ctx context.Context, targetInvoice *billing.Sta return err } - if err := l.lineBase.Validate(ctx, targetInvoice); err != nil { - return err - } - - if err := targetInvoice.Customer.Validate(); err != nil { - return billing.ValidationError{ - Err: billing.ErrInvoiceCreateUBPLineCustomerUsageAttributionInvalid, - } - } - if l.line.Period.Truncate(streaming.MinimumWindowSizeDuration).IsEmpty() { return billing.ValidationError{ Err: billing.ErrInvoiceCreateUBPLinePeriodIsEmpty, diff --git a/openmeter/billing/service/lineservice/usagebasedlineflat.go b/openmeter/billing/service/lineservice/usagebasedlineflat.go index 1a8d14f2e..7a912e80d 100644 --- a/openmeter/billing/service/lineservice/usagebasedlineflat.go +++ b/openmeter/billing/service/lineservice/usagebasedlineflat.go @@ -24,10 +24,6 @@ func (l ubpFlatFeeLine) Validate(ctx context.Context, targetInvoice *billing.Sta } } - if err := l.lineBase.Validate(ctx, targetInvoice); err != nil { - outErr = append(outErr, err) - } - // Usage discounts are not allowed // TODO[later]: Once we have cleaned up the line types, let's move as much as possible to the line's validation if l.line.RateCardDiscounts.Usage != nil { From 63724259703e68b4c8cff167deef9b72d126b450 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 27 Jan 2026 16:05:54 +0100 Subject: [PATCH 2/6] chore: simplify lineservice --- .../service/gatheringinvoicependinglines.go | 38 +++-- openmeter/billing/service/invoice.go | 67 +++++---- .../billing/service/invoicecalc/calculator.go | 28 ++-- .../billing/service/invoicecalc/details.go | 5 +- openmeter/billing/service/invoicecalc/mock.go | 8 +- .../billing/service/lineservice/linebase.go | 5 - .../billing/service/lineservice/meters.go | 130 ------------------ .../billing/service/lineservice/service.go | 68 ++------- .../service/lineservice/usagebasedline.go | 19 +-- .../lineservice/usagebasedline_test.go | 5 + .../service/lineservice/usagebasedlineflat.go | 32 +---- openmeter/billing/service/service.go | 13 +- openmeter/billing/service/stdinvoiceline.go | 4 + openmeter/billing/service/stdinvoicestate.go | 3 +- openmeter/billing/stdinvoice.go | 8 ++ openmeter/billing/stdinvoiceline.go | 74 ++++++++-- openmeter/meter/mockadapter/meter.go | 1 + 17 files changed, 172 insertions(+), 336 deletions(-) delete mode 100644 openmeter/billing/service/lineservice/meters.go diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 5fbc4e279..86546b3eb 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -197,6 +197,10 @@ func (in handleInvoicePendingLinesForCurrencyInput) Validate() error { return fmt.Errorf("gathering invoice: %w", err) } + if in.FeatureMeters == nil { + return fmt.Errorf("feature meters cannot be nil") + } + if len(in.InScopeLines) == 0 { return fmt.Errorf("in scope lines must contain at least one line") } @@ -275,6 +279,7 @@ type gatheringInvoiceWithFeatureMeters struct { Invoice billing.StandardInvoice FeatureMeters billing.FeatureMeters } + type gatherInScopeLineInput struct { GatheringInvoicesByCurrency map[currencyx.Code]gatheringInvoiceWithFeatureMeters // If set restricts the lines to be included to these IDs, otherwise the AsOf is used @@ -292,12 +297,12 @@ func (s *Service) gatherInScopeLines(ctx context.Context, in gatherInScopeLineIn billableLineIDs := make(map[string]interface{}) for currency, invoice := range in.GatheringInvoicesByCurrency { - lineSrvs, err := s.lineService.FromEntities(invoice.Invoice.Lines.OrEmpty(), invoice.FeatureMeters) + lineSrvs, err := lineservice.FromEntities(invoice.Invoice.Lines.OrEmpty(), invoice.FeatureMeters) if err != nil { return nil, err } - linesWithResolvedPeriods, err := lineSrvs.ResolveBillablePeriod(ctx, lineservice.ResolveBillablePeriodInput{ + linesWithResolvedPeriods, err := lineSrvs.ResolveBillablePeriod(lineservice.ResolveBillablePeriodInput{ AsOf: in.AsOf, ProgressiveBilling: in.ProgressiveBilling, }) @@ -370,6 +375,10 @@ func (i prepareLinesToBillInput) Validate() error { errs = append(errs, fmt.Errorf("no lines to bill")) } + if i.FeatureMeters == nil { + errs = append(errs, fmt.Errorf("feature meters cannot be nil")) + } + if i.GatheringInvoice.Lines.IsAbsent() { errs = append(errs, fmt.Errorf("gathering invoice must have lines expanded")) } @@ -555,17 +564,17 @@ func (s *Service) splitGatheringInvoiceLine(ctx context.Context, in splitGatheri l.ChildUniqueReferenceID = nil }) - postSplitAtLineSvc, err := s.lineService.FromEntity(postSplitAtLine, in.FeatureMeters) + postSplitAtLineSvc, err := lineservice.FromEntity(postSplitAtLine, in.FeatureMeters) if err != nil { return res, fmt.Errorf("creating line service: %w", err) } if !postSplitAtLineSvc.IsPeriodEmptyConsideringTruncations() { - gatheringInvoice.Lines.Append(postSplitAtLine) - - if err := postSplitAtLineSvc.Validate(ctx); err != nil { + if err := postSplitAtLine.Validate(); err != nil { return res, fmt.Errorf("validating post split line: %w", err) } + + gatheringInvoice.Lines.Append(postSplitAtLine) } // Let's update the original line to only contain the period up to the splitAt time @@ -574,7 +583,9 @@ func (s *Service) splitGatheringInvoiceLine(ctx context.Context, in splitGatheri line.SplitLineGroupID = lo.ToPtr(splitLineGroupID) line.ChildUniqueReferenceID = nil - preSplitAtLineSvc, err := s.lineService.FromEntity(line, in.FeatureMeters) + preSplitAtLine := line + + preSplitAtLineSvc, err := lineservice.FromEntity(line, in.FeatureMeters) if err != nil { return res, fmt.Errorf("creating line service: %w", err) } @@ -583,7 +594,7 @@ func (s *Service) splitGatheringInvoiceLine(ctx context.Context, in splitGatheri if preSplitAtLineSvc.IsPeriodEmptyConsideringTruncations() { line.DeletedAt = lo.ToPtr(clock.Now()) } else { - if err := preSplitAtLineSvc.Validate(ctx); err != nil { + if err := preSplitAtLine.Validate(); err != nil { return res, fmt.Errorf("validating pre split line: %w", err) } } @@ -619,6 +630,10 @@ func (in createStandardInvoiceFromGatheringLinesInput) Validate() error { errs = append(errs, fmt.Errorf("effective billing profile: %w", err)) } + if in.FeatureMeters == nil { + errs = append(errs, fmt.Errorf("feature meters cannot be nil")) + } + if len(in.Lines) == 0 { errs = append(errs, fmt.Errorf("lines must contain at least one line")) } @@ -834,12 +849,7 @@ func (s *Service) moveLinesToInvoice(ctx context.Context, in moveLinesToInvoiceI return nil, fmt.Errorf("lines to move[%d] must contain the same number of lines as line IDs to move[%d]", len(linesToMove), len(in.LineIDsToMove)) } - linesToAssociate, err := s.lineService.FromEntities(linesToMove, in.FeatureMeters) - if err != nil { - return nil, fmt.Errorf("creating line services for lines to move: %w", err) - } - - if err := linesToAssociate.Validate(ctx); err != nil { + if err := linesToMove.Validate(); err != nil { return nil, fmt.Errorf("validating lines to move: %w", err) } diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 81128e75f..e96012b9b 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -150,6 +150,10 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat return invoice, fmt.Errorf("fetching profile: %w", err) } + if customerProfile.Customer == nil { + return invoice, fmt.Errorf("customer profile is nil") + } + featureMeters, err := s.resolveFeatureMeters(ctx, invoice.Lines.OrEmpty()) if err != nil { return invoice, fmt.Errorf("resolving feature meters: %w", err) @@ -174,7 +178,7 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat hasInvoicableLines := mo.Option[bool]{} for _, lineSvc := range inScopeLineSvcs { - period, err := lineSvc.CanBeInvoicedAsOf(ctx, lineservice.CanBeInvoicedAsOfInput{ + period, err := lineSvc.CanBeInvoicedAsOf(lineservice.CanBeInvoicedAsOfInput{ AsOf: now, ProgressiveBilling: customerProfile.MergedProfile.WorkflowConfig.Invoicing.ProgressiveBilling, }) @@ -189,8 +193,7 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat invoice.QuantitySnapshotedAt = lo.ToPtr(now) - if err := s.invoiceCalculator.CalculateGatheringInvoiceWithLiveData(&invoice, invoicecalc.CalculatorDependencies{ - LineService: s.lineService, + if err := s.invoiceCalculator.CalculateGatheringInvoiceWithLiveData(&invoice, invoicecalc.CalculationDependencies{ FeatureMeters: featureMeters, }); err != nil { return invoice, fmt.Errorf("calculating invoice: %w", err) @@ -570,18 +573,11 @@ func (s *Service) UpdateInvoice(ctx context.Context, input billing.UpdateInvoice return billing.StandardInvoice{}, fmt.Errorf("editing invoice: %w", err) } - featureMeters, err := s.resolveFeatureMeters(ctx, invoice.Lines.OrEmpty()) - if err != nil { - return billing.StandardInvoice{}, fmt.Errorf("resolving feature meters: %w", err) - } - - normalizedLines, err := invoice.Lines.WithNormalizedValues() + invoice.Lines, err = invoice.Lines.WithNormalizedValues() if err != nil { return billing.StandardInvoice{}, fmt.Errorf("normalizing lines: %w", err) } - invoice.Lines = normalizedLines - if err := s.invoiceCalculator.CalculateGatheringInvoice(&invoice); err != nil { return billing.StandardInvoice{}, fmt.Errorf("calculating invoice[%s]: %w", invoice.ID, err) } @@ -661,24 +657,28 @@ func (s Service) updateInvoice(ctx context.Context, in billing.UpdateInvoiceAdap return invoice, nil } -func (s Service) checkIfLinesAreInvoicable(ctx context.Context, invoice *billing.StandardInvoice, progressiveBilling bool, featureMeters billing.FeatureMeters) error { - inScopeLineServices, err := s.lineService.FromEntities( - lo.Filter(invoice.Lines.OrEmpty(), func(line *billing.StandardLine, _ int) bool { - return line.DeletedAt == nil - }), - featureMeters, - ) +func (s Service) checkIfLinesAreInvoicable(ctx context.Context, invoice *billing.StandardInvoice, progressiveBilling bool) error { + featureMeters, err := s.resolveFeatureMeters(ctx, invoice.Lines.OrEmpty()) if err != nil { - return fmt.Errorf("creating line services: %w", err) + return fmt.Errorf("resolving feature meters: %w", err) } + linesToCheck := lo.Filter(invoice.Lines.OrEmpty(), func(line *billing.StandardLine, _ int) bool { + return line.DeletedAt == nil + }) + return errors.Join( - lo.Map(inScopeLineServices, func(lineSvc lineservice.Line, _ int) error { - if err := lineSvc.Validate(ctx); err != nil { - return fmt.Errorf("validating line[%s]: %w", lineSvc.ID(), err) + lo.Map(linesToCheck, func(line *billing.StandardLine, _ int) error { + if err := line.Validate(); err != nil { + return fmt.Errorf("validating line[%s]: %w", line.ID, err) + } + + lineSvc, err := lineservice.FromEntity(line, featureMeters) + if err != nil { + return fmt.Errorf("creating line service: %w", err) } - period, err := lineSvc.CanBeInvoicedAsOf(ctx, lineservice.CanBeInvoicedAsOfInput{ + period, err := lineSvc.CanBeInvoicedAsOf(lineservice.CanBeInvoicedAsOfInput{ AsOf: lineSvc.InvoiceAt(), ProgressiveBilling: progressiveBilling, }) @@ -799,17 +799,15 @@ func (s Service) SimulateInvoice(ctx context.Context, input billing.SimulateInvo return billing.StandardInvoice{}, fmt.Errorf("resolving feature meters: %w", err) } - inScopeLineSvcs, err := s.lineService.FromEntities(invoice.Lines.OrEmpty(), featureMeters) - if err != nil { - return billing.StandardInvoice{}, fmt.Errorf("creating line services: %w", err) - } - // Let's update the lines and the detailed lines - for _, lineSvc := range inScopeLineSvcs { - if err := lineSvc.Validate(ctx); err != nil { - return billing.StandardInvoice{}, billing.ValidationError{ - Err: err, - } + for _, line := range invoice.Lines.OrEmpty() { + if err := line.Validate(); err != nil { + return billing.StandardInvoice{}, fmt.Errorf("validating line[%s]: %w", line.ID, err) + } + + lineSvc, err := lineservice.FromEntity(line, featureMeters) + if err != nil { + return billing.StandardInvoice{}, fmt.Errorf("creating line service: %w", err) } if err := lineSvc.CalculateDetailedLines(); err != nil { @@ -822,8 +820,7 @@ func (s Service) SimulateInvoice(ctx context.Context, input billing.SimulateInvo } // Let's simulate a recalculation of the invoice - if err := s.invoiceCalculator.Calculate(&invoice, invoicecalc.CalculatorDependencies{ - LineService: s.lineService, + if err := s.invoiceCalculator.Calculate(&invoice, invoicecalc.CalculationDependencies{ FeatureMeters: featureMeters, }); err != nil { return billing.StandardInvoice{}, err diff --git a/openmeter/billing/service/invoicecalc/calculator.go b/openmeter/billing/service/invoicecalc/calculator.go index 5a6b1a5f3..5d8ae2e2e 100644 --- a/openmeter/billing/service/invoicecalc/calculator.go +++ b/openmeter/billing/service/invoicecalc/calculator.go @@ -1,10 +1,10 @@ package invoicecalc import ( + "context" "errors" "github.com/openmeterio/openmeter/openmeter/billing" - "github.com/openmeterio/openmeter/openmeter/billing/service/lineservice" ) type invoiceCalculatorsByType struct { @@ -40,18 +40,22 @@ var InvoiceCalculations = invoiceCalculatorsByType{ } type ( - Calculation func(*billing.StandardInvoice, CalculatorDependencies) error + Calculation func(*billing.StandardInvoice, CalculationDependencies) error ) +type CalculationDependencies struct { + FeatureMeters billing.FeatureMeters +} + type Calculator interface { - Calculate(*billing.StandardInvoice, CalculatorDependencies) error + Calculate(*billing.StandardInvoice, CalculationDependencies) error CalculateGatheringInvoice(*billing.StandardInvoice) error - CalculateGatheringInvoiceWithLiveData(*billing.StandardInvoice, CalculatorDependencies) error + CalculateGatheringInvoiceWithLiveData(*billing.StandardInvoice, CalculationDependencies) error } -type CalculatorDependencies struct { - LineService *lineservice.Service - FeatureMeters billing.FeatureMeters +type ServiceDependencies interface { + // RecalculateInvoiceTotals recalculates the totals of an invoice + RecalculateInvoiceTotals(ctx context.Context, invoice *billing.StandardInvoice) error } type calculator struct{} @@ -60,11 +64,11 @@ func New() Calculator { return &calculator{} } -func (c *calculator) Calculate(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { +func (c *calculator) Calculate(invoice *billing.StandardInvoice, deps CalculationDependencies) error { return c.applyCalculations(invoice, InvoiceCalculations.Invoice, deps) } -func (c *calculator) applyCalculations(invoice *billing.StandardInvoice, calculators []Calculation, deps CalculatorDependencies) error { +func (c *calculator) applyCalculations(invoice *billing.StandardInvoice, calculators []Calculation, deps CalculationDependencies) error { var outErr error for _, calc := range calculators { err := calc(invoice, deps) @@ -85,10 +89,10 @@ func (c *calculator) CalculateGatheringInvoice(invoice *billing.StandardInvoice) return errors.New("invoice is not a gathering invoice") } - return c.applyCalculations(invoice, InvoiceCalculations.GatheringInvoice, CalculatorDependencies{}) + return c.applyCalculations(invoice, InvoiceCalculations.GatheringInvoice, CalculationDependencies{}) } -func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { +func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.StandardInvoice, deps CalculationDependencies) error { if invoice.Status != billing.StandardInvoiceStatusGathering { return errors.New("invoice is not a gathering invoice") } @@ -97,7 +101,7 @@ func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.Stan } func WithNoDependencies(cb func(inv *billing.StandardInvoice) error) Calculation { - return func(inv *billing.StandardInvoice, _ CalculatorDependencies) error { + return func(inv *billing.StandardInvoice, _ CalculationDependencies) error { return cb(inv) } } diff --git a/openmeter/billing/service/invoicecalc/details.go b/openmeter/billing/service/invoicecalc/details.go index d1f39820b..499bb5225 100644 --- a/openmeter/billing/service/invoicecalc/details.go +++ b/openmeter/billing/service/invoicecalc/details.go @@ -7,14 +7,15 @@ import ( "github.com/samber/lo" "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/billing/service/lineservice" ) -func RecalculateDetailedLinesAndTotals(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { +func RecalculateDetailedLinesAndTotals(invoice *billing.StandardInvoice, deps CalculationDependencies) error { if invoice.Lines.IsAbsent() { return errors.New("cannot recaulculate invoice without expanded lines") } - lines, err := deps.LineService.FromEntities(invoice.Lines.OrEmpty(), deps.FeatureMeters) + lines, err := lineservice.FromEntities(invoice.Lines.OrEmpty(), deps.FeatureMeters) if err != nil { return fmt.Errorf("creating line services: %w", err) } diff --git a/openmeter/billing/service/invoicecalc/mock.go b/openmeter/billing/service/invoicecalc/mock.go index 32afbd6be..df5575a0a 100644 --- a/openmeter/billing/service/invoicecalc/mock.go +++ b/openmeter/billing/service/invoicecalc/mock.go @@ -26,7 +26,7 @@ type mockCalculator struct { calculateGatheringInvoiceWithLiveDataResultCalled bool } -func (m *mockCalculator) Calculate(i *billing.StandardInvoice, deps CalculatorDependencies) error { +func (m *mockCalculator) Calculate(i *billing.StandardInvoice, deps CalculationDependencies) error { m.calculateResultCalled = true res := m.calculateResult.MustGet() @@ -56,7 +56,7 @@ func (m *mockCalculator) CalculateGatheringInvoice(i *billing.StandardInvoice) e billing.ValidationComponentOpenMeter) } -func (m *mockCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculatorDependencies) error { +func (m *mockCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculationDependencies) error { m.calculateGatheringInvoiceWithLiveDataResultCalled = true res := m.calculateGatheringInvoiceWithLiveDataResult.MustGet() @@ -120,7 +120,7 @@ func NewMockableCalculator(_ *testing.T, upstream Calculator) *MockableInvoiceCa } } -func (m *MockableInvoiceCalculator) Calculate(i *billing.StandardInvoice, deps CalculatorDependencies) error { +func (m *MockableInvoiceCalculator) Calculate(i *billing.StandardInvoice, deps CalculationDependencies) error { outErr := m.upstream.Calculate(i, deps) if m.mock != nil { @@ -146,7 +146,7 @@ func (m *MockableInvoiceCalculator) CalculateGatheringInvoice(i *billing.Standar return outErr } -func (m *MockableInvoiceCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculatorDependencies) error { +func (m *MockableInvoiceCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculationDependencies) error { outErr := m.upstream.CalculateGatheringInvoiceWithLiveData(i, deps) if m.mock != nil { diff --git a/openmeter/billing/service/lineservice/linebase.go b/openmeter/billing/service/lineservice/linebase.go index 362efdc37..12de5a59b 100644 --- a/openmeter/billing/service/lineservice/linebase.go +++ b/openmeter/billing/service/lineservice/linebase.go @@ -40,7 +40,6 @@ var _ LineBase = (*lineBase)(nil) type lineBase struct { line *billing.StandardLine - service *Service featureMeters billing.FeatureMeters currency currencyx.Calculator } @@ -105,10 +104,6 @@ func (l lineBase) IsDeleted() bool { return l.line.DeletedAt != nil } -func (l lineBase) Service() *Service { - return l.service -} - func (l lineBase) ResetTotals() { l.line.Totals = billing.Totals{} } diff --git a/openmeter/billing/service/lineservice/meters.go b/openmeter/billing/service/lineservice/meters.go deleted file mode 100644 index 1b1ecfc0d..000000000 --- a/openmeter/billing/service/lineservice/meters.go +++ /dev/null @@ -1,130 +0,0 @@ -package lineservice - -import ( - "context" - "fmt" - "slices" - - "github.com/alpacahq/alpacadecimal" - - "github.com/openmeterio/openmeter/openmeter/billing" - "github.com/openmeterio/openmeter/openmeter/meter" - "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" - "github.com/openmeterio/openmeter/openmeter/streaming" -) - -type getFeatureUsageInput struct { - Line *billing.StandardLine - Meter meter.Meter - Feature feature.Feature - Customer billing.InvoiceCustomer -} - -func (i getFeatureUsageInput) Validate() error { - if i.Line == nil { - return fmt.Errorf("line is required") - } - - if slices.Contains([]meter.MeterAggregation{ - meter.MeterAggregationAvg, - meter.MeterAggregationMin, - }, i.Meter.Aggregation) { - if i.Line.SplitLineHierarchy != nil { - return fmt.Errorf("aggregation %s is not supported for split lines", i.Meter.Aggregation) - } - } - - if err := i.Customer.Validate(); err != nil { - return fmt.Errorf("customer: %w", err) - } - - return nil -} - -type featureUsageResponse struct { - // LinePeriodQty is the quantity of the usage for the line for the period - LinePeriodQty alpacadecimal.Decimal - // PreLinePeriodQty is the quantity of the usage for the line for the period before the current period - PreLinePeriodQty alpacadecimal.Decimal -} - -func (s *Service) getFeatureUsage(ctx context.Context, in getFeatureUsageInput) (*featureUsageResponse, error) { - // Validation - if err := in.Validate(); err != nil { - return nil, err - } - - meterQueryParams := streaming.QueryParams{ - FilterCustomer: []streaming.Customer{in.Customer}, - From: &in.Line.Period.Start, - To: &in.Line.Period.End, - FilterGroupBy: in.Feature.MeterGroupByFilters, - } - - lineHierarchy := in.Line.SplitLineHierarchy - - // If we are the first line in the split, we don't need to calculate the pre period - if lineHierarchy == nil || lineHierarchy.Group.ServicePeriod.Start.Equal(in.Line.Period.Start) { - meterValues, err := s.StreamingConnector.QueryMeter( - ctx, - in.Line.Namespace, - in.Meter, - meterQueryParams, - ) - if err != nil { - return nil, fmt.Errorf("querying line[%s] meter[%s]: %w", in.Line.ID, in.Meter.Key, err) - } - - return &featureUsageResponse{ - LinePeriodQty: summarizeMeterQueryRow(meterValues), - }, nil - } - - // Let's calculate [parent.start ... line.start] values - preLineQuery := meterQueryParams - preLineQuery.From = &lineHierarchy.Group.ServicePeriod.Start - preLineQuery.To = &in.Line.Period.Start - - preLineResult, err := s.StreamingConnector.QueryMeter( - ctx, - in.Line.Namespace, - in.Meter, - preLineQuery, - ) - if err != nil { - return nil, fmt.Errorf("querying pre line[%s] period meter[%s]: %w", in.Line.ID, in.Meter.Key, err) - } - - preLineQty := summarizeMeterQueryRow(preLineResult) - - // Let's calculate [parent.start ... line.end] values - upToLineEnd := meterQueryParams - upToLineEnd.From = &lineHierarchy.Group.ServicePeriod.Start - upToLineEnd.To = &in.Line.Period.End - - upToLineEndResult, err := s.StreamingConnector.QueryMeter( - ctx, - in.Line.Namespace, - in.Meter, - upToLineEnd, - ) - if err != nil { - return nil, fmt.Errorf("querying up to line[%s] end meter[%s]: %w", in.Line.ID, in.Meter.Key, err) - } - - upToLineQty := summarizeMeterQueryRow(upToLineEndResult) - - return &featureUsageResponse{ - LinePeriodQty: upToLineQty.Sub(preLineQty), - PreLinePeriodQty: preLineQty, - }, nil -} - -func summarizeMeterQueryRow(in []meter.MeterQueryRow) alpacadecimal.Decimal { - sum := alpacadecimal.Decimal{} - for _, row := range in { - sum = sum.Add(alpacadecimal.NewFromFloat(row.Value)) - } - - return sum -} diff --git a/openmeter/billing/service/lineservice/service.go b/openmeter/billing/service/lineservice/service.go index 18c7e9349..dcf694ec8 100644 --- a/openmeter/billing/service/lineservice/service.go +++ b/openmeter/billing/service/lineservice/service.go @@ -1,8 +1,6 @@ package lineservice import ( - "context" - "errors" "fmt" "time" @@ -10,47 +8,19 @@ import ( "github.com/openmeterio/openmeter/openmeter/billing" "github.com/openmeterio/openmeter/openmeter/productcatalog" - "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/pkg/slicesx" ) -type Service struct { - Config -} - -type Config struct { - StreamingConnector streaming.Connector -} - -func (c Config) Validate() error { - if c.StreamingConnector == nil { - return fmt.Errorf("streaming connector is required") - } - - return nil -} - -func New(in Config) (*Service, error) { - if err := in.Validate(); err != nil { - return nil, err - } - - return &Service{ - Config: in, - }, nil -} - -func (s *Service) FromEntity(line *billing.StandardLine, featureMeters billing.FeatureMeters) (Line, error) { +func FromEntity(line *billing.StandardLine, featureMeters billing.FeatureMeters) (Line, error) { currencyCalc, err := line.Currency.Calculator() if err != nil { return nil, fmt.Errorf("creating currency calculator: %w", err) } base := lineBase{ - service: s, - featureMeters: featureMeters, line: line, currency: currencyCalc, + featureMeters: featureMeters, } if line.UsageBased.Price.Type() == productcatalog.FlatPriceType { @@ -64,14 +34,14 @@ func (s *Service) FromEntity(line *billing.StandardLine, featureMeters billing.F }, nil } -func (s *Service) FromEntities(line []*billing.StandardLine, featureMeters billing.FeatureMeters) (Lines, error) { +func FromEntities(line []*billing.StandardLine, featureMeters billing.FeatureMeters) (Lines, error) { return slicesx.MapWithErr(line, func(l *billing.StandardLine) (Line, error) { - return s.FromEntity(l, featureMeters) + return FromEntity(l, featureMeters) }) } // UpdateTotalsFromDetailedLines is a helper method to update the totals of a line from its detailed lines. -func (s *Service) UpdateTotalsFromDetailedLines(line *billing.StandardLine) error { +func UpdateTotalsFromDetailedLines(line *billing.StandardLine) error { // Calculate the line totals for idx, detailedLine := range line.DetailedLines { if detailedLine.DeletedAt != nil { @@ -122,39 +92,17 @@ type ( type Line interface { LineBase - Service() *Service - // IsPeriodEmptyConsideringTruncations returns true if the line has an empty period. This is different from Period.IsEmpty() as // this method does any truncation for usage based lines. IsPeriodEmptyConsideringTruncations() bool - Validate(context.Context) error - CanBeInvoicedAsOf(context.Context, CanBeInvoicedAsOfInput) (*billing.Period, error) + CanBeInvoicedAsOf(CanBeInvoicedAsOfInput) (*billing.Period, error) CalculateDetailedLines() error UpdateTotals() error } type Lines []Line -func (s Lines) Validate(ctx context.Context) error { - return errors.Join(lo.Map(s, func(line Line, idx int) error { - if line == nil { - return fmt.Errorf("line[%d] is nil", idx) - } - - if err := line.Validate(ctx); err != nil { - id := line.ID() - if id == "" { - id = fmt.Sprintf("line[%d]", idx) - } - - return fmt.Errorf("line[%s]: %w", id, err) - } - - return nil - })...) -} - func (s Lines) ToEntities() []*billing.StandardLine { return lo.Map(s, func(service Line, _ int) *billing.StandardLine { return service.ToEntity() @@ -166,10 +114,10 @@ type LineWithBillablePeriod struct { BillablePeriod billing.Period } -func (s Lines) ResolveBillablePeriod(ctx context.Context, in ResolveBillablePeriodInput) ([]LineWithBillablePeriod, error) { +func (s Lines) ResolveBillablePeriod(in ResolveBillablePeriodInput) ([]LineWithBillablePeriod, error) { out := make([]LineWithBillablePeriod, 0, len(s)) for _, lineSrv := range s { - billablePeriod, err := lineSrv.CanBeInvoicedAsOf(ctx, in) + billablePeriod, err := lineSrv.CanBeInvoicedAsOf(in) if err != nil { return nil, fmt.Errorf("checking if line can be invoiced: %w", err) } diff --git a/openmeter/billing/service/lineservice/usagebasedline.go b/openmeter/billing/service/lineservice/usagebasedline.go index 0ebc0d474..023c0d24d 100644 --- a/openmeter/billing/service/lineservice/usagebasedline.go +++ b/openmeter/billing/service/lineservice/usagebasedline.go @@ -1,7 +1,6 @@ package lineservice import ( - "context" "fmt" "github.com/alpacahq/alpacadecimal" @@ -43,21 +42,7 @@ type usageBasedLine struct { lineBase } -func (l usageBasedLine) Validate(ctx context.Context, targetInvoice *billing.StandardInvoice) error { - if _, err := l.featureMeters.Get(l.line.UsageBased.FeatureKey, true); err != nil { - return err - } - - if l.line.Period.Truncate(streaming.MinimumWindowSizeDuration).IsEmpty() { - return billing.ValidationError{ - Err: billing.ErrInvoiceCreateUBPLinePeriodIsEmpty, - } - } - - return nil -} - -func (l usageBasedLine) CanBeInvoicedAsOf(ctx context.Context, in CanBeInvoicedAsOfInput) (*billing.Period, error) { +func (l usageBasedLine) CanBeInvoicedAsOf(in CanBeInvoicedAsOfInput) (*billing.Period, error) { if !in.ProgressiveBilling { // If we are not doing progressive billing, we can only bill the line if asof >= line.period.end if in.AsOf.Before(l.line.Period.End) { @@ -132,7 +117,7 @@ func (l usageBasedLine) CanBeInvoicedAsOf(ctx context.Context, in CanBeInvoicedA } func (l *usageBasedLine) UpdateTotals() error { - return l.service.UpdateTotalsFromDetailedLines(l.line) + return UpdateTotalsFromDetailedLines(l.line) } func (l *usageBasedLine) CalculateDetailedLines() error { diff --git a/openmeter/billing/service/lineservice/usagebasedline_test.go b/openmeter/billing/service/lineservice/usagebasedline_test.go index e3aeebdff..1e3f79608 100644 --- a/openmeter/billing/service/lineservice/usagebasedline_test.go +++ b/openmeter/billing/service/lineservice/usagebasedline_test.go @@ -29,6 +29,11 @@ var ubpTestFullPeriod = billing.Period{ End: lo.Must(time.Parse(time.RFC3339, "2021-01-02T00:00:00Z")), } +type featureUsageResponse struct { + LinePeriodQty alpacadecimal.Decimal + PreLinePeriodQty alpacadecimal.Decimal +} + type ubpCalculationTestCase struct { price productcatalog.Price discounts billing.Discounts diff --git a/openmeter/billing/service/lineservice/usagebasedlineflat.go b/openmeter/billing/service/lineservice/usagebasedlineflat.go index 7a912e80d..df3f96d0e 100644 --- a/openmeter/billing/service/lineservice/usagebasedlineflat.go +++ b/openmeter/billing/service/lineservice/usagebasedlineflat.go @@ -1,8 +1,6 @@ package lineservice import ( - "context" - "errors" "fmt" "github.com/openmeterio/openmeter/openmeter/billing" @@ -14,33 +12,7 @@ type ubpFlatFeeLine struct { lineBase } -func (l ubpFlatFeeLine) Validate(ctx context.Context, targetInvoice *billing.StandardInvoice) error { - var outErr []error - - if l.line.UsageBased.FeatureKey != "" { - _, err := l.featureMeters.Get(l.line.UsageBased.FeatureKey, false) - if err != nil { - outErr = append(outErr, fmt.Errorf("fetching feature[%s]: %w", l.line.UsageBased.FeatureKey, err)) - } - } - - // Usage discounts are not allowed - // TODO[later]: Once we have cleaned up the line types, let's move as much as possible to the line's validation - if l.line.RateCardDiscounts.Usage != nil { - outErr = append(outErr, errors.New("usage discounts are not supported for usage based flat fee lines")) - } - - // Percentage discounts are allowed - if l.line.RateCardDiscounts.Percentage != nil { - if err := l.line.RateCardDiscounts.Percentage.Validate(); err != nil { - outErr = append(outErr, err) - } - } - - return errors.Join(outErr...) -} - -func (l ubpFlatFeeLine) CanBeInvoicedAsOf(_ context.Context, in CanBeInvoicedAsOfInput) (*billing.Period, error) { +func (l ubpFlatFeeLine) CanBeInvoicedAsOf(in CanBeInvoicedAsOfInput) (*billing.Period, error) { if !in.AsOf.Before(l.line.InvoiceAt) { return &l.line.Period, nil } @@ -73,7 +45,7 @@ func (l ubpFlatFeeLine) CalculateDetailedLines() error { } func (l *ubpFlatFeeLine) UpdateTotals() error { - return l.service.UpdateTotalsFromDetailedLines(l.line) + return UpdateTotalsFromDetailedLines(l.line) } func (l ubpFlatFeeLine) IsPeriodEmptyConsideringTruncations() bool { diff --git a/openmeter/billing/service/service.go b/openmeter/billing/service/service.go index 63cf163c0..26ec5e96f 100644 --- a/openmeter/billing/service/service.go +++ b/openmeter/billing/service/service.go @@ -9,7 +9,6 @@ import ( "github.com/openmeterio/openmeter/openmeter/app" "github.com/openmeterio/openmeter/openmeter/billing" "github.com/openmeterio/openmeter/openmeter/billing/service/invoicecalc" - "github.com/openmeterio/openmeter/openmeter/billing/service/lineservice" "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" @@ -30,8 +29,7 @@ type Service struct { meterService meter.Service streamingConnector streaming.Connector - lineService *lineservice.Service - publisher eventbus.Publisher + publisher eventbus.Publisher advancementStrategy billing.AdvancementStrategy fsNamespaceLockdown []string @@ -116,15 +114,6 @@ func New(config Config) (*Service, error) { invoiceCalculator: invoicecalc.New(), } - lineSvc, err := lineservice.New(lineservice.Config{ - StreamingConnector: config.StreamingConnector, - }) - if err != nil { - return nil, fmt.Errorf("creating line service: %w", err) - } - - svc.lineService = lineSvc - return svc, nil } diff --git a/openmeter/billing/service/stdinvoiceline.go b/openmeter/billing/service/stdinvoiceline.go index b39a6b48e..d4e194dfc 100644 --- a/openmeter/billing/service/stdinvoiceline.go +++ b/openmeter/billing/service/stdinvoiceline.go @@ -116,6 +116,10 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C gatheringInvoiceID := gatheringInvoice.ID + if err := gatheringInvoice.Validate(); err != nil { + return nil, fmt.Errorf("validating gathering invoice: %w", err) + } + if err := s.invoiceCalculator.CalculateGatheringInvoice(&gatheringInvoice); err != nil { return nil, fmt.Errorf("calculating invoice[%s]: %w", gatheringInvoiceID, err) } diff --git a/openmeter/billing/service/stdinvoicestate.go b/openmeter/billing/service/stdinvoicestate.go index 0bac1f1a7..c06ddf9eb 100644 --- a/openmeter/billing/service/stdinvoicestate.go +++ b/openmeter/billing/service/stdinvoicestate.go @@ -611,8 +611,7 @@ func (m *InvoiceStateMachine) calculateInvoice(ctx context.Context) error { return fmt.Errorf("resolving feature meters: %w", err) } - return m.Calculator.Calculate(&m.Invoice, invoicecalc.CalculatorDependencies{ - LineService: m.Service.lineService, + return m.Calculator.Calculate(&m.Invoice, invoicecalc.CalculationDependencies{ FeatureMeters: featureMeters, }) } diff --git a/openmeter/billing/stdinvoice.go b/openmeter/billing/stdinvoice.go index 270a87ac2..36b1b788b 100644 --- a/openmeter/billing/stdinvoice.go +++ b/openmeter/billing/stdinvoice.go @@ -291,6 +291,14 @@ func (i StandardInvoice) Validate() error { outErr = errors.Join(outErr, ValidationWithFieldPrefix("lines", err)) } + if i.Lines.IsPresent() { + for _, line := range i.Lines.OrEmpty() { + if line.Currency != i.Currency { + outErr = errors.Join(outErr, fmt.Errorf("line[%s]: currency[%s] is not equal to invoice currency[%s]", line.ID, line.Currency, i.Currency)) + } + } + } + return outErr } diff --git a/openmeter/billing/stdinvoiceline.go b/openmeter/billing/stdinvoiceline.go index 43f8b9f99..73074976d 100644 --- a/openmeter/billing/stdinvoiceline.go +++ b/openmeter/billing/stdinvoiceline.go @@ -86,6 +86,18 @@ func (i StandardLineBase) Validate() error { errs = append(errs, fmt.Errorf("invalid managed by %s", i.ManagedBy)) } + if i.RateCardDiscounts.Percentage != nil { + if err := i.RateCardDiscounts.Percentage.Validate(); err != nil { + errs = append(errs, fmt.Errorf("percentage discounts: %w", err)) + } + } + + if i.RateCardDiscounts.Usage != nil { + if err := i.RateCardDiscounts.Usage.Validate(); err != nil { + errs = append(errs, fmt.Errorf("usage discounts: %w", err)) + } + } + return errors.Join(errs...) } @@ -242,6 +254,32 @@ func (i StandardLine) Clone() *StandardLine { return i.clone(cloneOptions{}) } +// NormalizeValues normalizes the values of the line to ensure they are matching the expected invariants: +// - Period is truncated to the minimum window size duration +// - InvoiceAt is truncated to the minimum window size duration +// - UsageBased.Price is normalized to have the default inAdvance payment term for flat prices +func (i StandardLine) WithNormalizedValues() (*StandardLine, error) { + out := i.Clone() + + out.Period = out.Period.Truncate(streaming.MinimumWindowSizeDuration) + out.InvoiceAt = out.InvoiceAt.Truncate(streaming.MinimumWindowSizeDuration) + + if out.UsageBased.Price.Type() == productcatalog.FlatPriceType { + // Let's apply the default inAdvance payment term for flat prices + flatPrice, err := out.UsageBased.Price.AsFlat() + if err != nil { + return nil, fmt.Errorf("converting price to flat price: %w", err) + } + + if flatPrice.PaymentTerm == "" { + flatPrice.PaymentTerm = productcatalog.InAdvancePaymentTerm + out.UsageBased.Price = productcatalog.NewPriceFrom(flatPrice) + } + } + + return out, nil +} + type cloneOptions struct { skipDBState bool skipChildren bool @@ -297,26 +335,36 @@ func (i StandardLine) Validate() error { errs = append(errs, fmt.Errorf("detailed lines: %w", err)) } - if err := i.ValidateUsageBased(); err != nil { - errs = append(errs, err) + for _, detailedLine := range i.DetailedLines { + if detailedLine.Currency != i.Currency { + errs = append(errs, fmt.Errorf("detailed line[%s]: currency[%s] is not equal to line currency[%s]", detailedLine.ID, detailedLine.Currency, i.Currency)) + } } - if err := i.RateCardDiscounts.ValidateForPrice(i.UsageBased.Price); err != nil { - errs = append(errs, fmt.Errorf("rateCardDiscounts: %w", err)) + if err := i.UsageBased.Validate(); err != nil { + errs = append(errs, err) } - return errors.Join(errs...) -} - -func (i StandardLine) ValidateUsageBased() error { - var errs []error + if i.UsageBased.Price.Type() != productcatalog.FlatPriceType { + if i.InvoiceAt. + Truncate(streaming.MinimumWindowSizeDuration). + Before(i.Period.Truncate(streaming.MinimumWindowSizeDuration).End) { + errs = append(errs, fmt.Errorf("invoice at (%s) must be after period end (%s) for usage based line", i.InvoiceAt, i.Period.Truncate(streaming.MinimumWindowSizeDuration).End)) + } - if err := i.UsageBased.Validate(); err != nil { - errs = append(errs, err) + if i.Period.Truncate(streaming.MinimumWindowSizeDuration).IsEmpty() { + errs = append(errs, ValidationError{ + Err: ErrInvoiceCreateUBPLinePeriodIsEmpty, + }) + } + } else { + if i.RateCardDiscounts.Usage != nil { + errs = append(errs, fmt.Errorf("usage discounts are not allowed for flat price lines")) + } } - if i.DependsOnMeteredQuantity() && i.InvoiceAt.Before(i.Period.Truncate(streaming.MinimumWindowSizeDuration).End) { - errs = append(errs, fmt.Errorf("invoice at (%s) must be after period end (%s) for usage based line", i.InvoiceAt, i.Period.Truncate(streaming.MinimumWindowSizeDuration).End)) + if err := i.RateCardDiscounts.ValidateForPrice(i.UsageBased.Price); err != nil { + errs = append(errs, fmt.Errorf("rateCardDiscounts: %w", err)) } return errors.Join(errs...) diff --git a/openmeter/meter/mockadapter/meter.go b/openmeter/meter/mockadapter/meter.go index 67a2beea1..7e08e14cb 100644 --- a/openmeter/meter/mockadapter/meter.go +++ b/openmeter/meter/mockadapter/meter.go @@ -41,6 +41,7 @@ func (c *adapter) ListMeters(_ context.Context, params meter.ListMetersParams) ( } // In memory pagination: case #2 if there is pagination settings return the paginated dataset + pageNumberIndex := params.PageNumber - 1 if pageNumberIndex*params.PageSize > len(meters) { From af00cc9761f67c75f9fe5dbc39d8cd75a42f849e Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Wed, 28 Jan 2026 10:14:28 +0100 Subject: [PATCH 3/6] fix: rebase errors --- openmeter/billing/service/invoice.go | 16 ++++++------ .../billing/service/invoicecalc/calculator.go | 26 +++++++------------ .../billing/service/invoicecalc/details.go | 2 +- openmeter/billing/service/invoicecalc/mock.go | 8 +++--- openmeter/billing/service/stdinvoicestate.go | 2 +- openmeter/meter/mockadapter/meter.go | 1 - 6 files changed, 24 insertions(+), 31 deletions(-) diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index e96012b9b..25d36fec6 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -193,7 +193,7 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat invoice.QuantitySnapshotedAt = lo.ToPtr(now) - if err := s.invoiceCalculator.CalculateGatheringInvoiceWithLiveData(&invoice, invoicecalc.CalculationDependencies{ + if err := s.invoiceCalculator.CalculateGatheringInvoiceWithLiveData(&invoice, invoicecalc.CalculatorDependencies{ FeatureMeters: featureMeters, }); err != nil { return invoice, fmt.Errorf("calculating invoice: %w", err) @@ -588,6 +588,11 @@ func (s *Service) UpdateInvoice(ctx context.Context, input billing.UpdateInvoice } } + featureMeters, err := s.resolveFeatureMeters(ctx, invoice.Lines.OrEmpty()) + if err != nil { + return billing.StandardInvoice{}, fmt.Errorf("resolving feature meters: %w", err) + } + // Check if the new lines are still invoicable if err := s.checkIfLinesAreInvoicable(ctx, &invoice, customerProfile.MergedProfile.WorkflowConfig.Invoicing.ProgressiveBilling, featureMeters); err != nil { return billing.StandardInvoice{}, err @@ -657,12 +662,7 @@ func (s Service) updateInvoice(ctx context.Context, in billing.UpdateInvoiceAdap return invoice, nil } -func (s Service) checkIfLinesAreInvoicable(ctx context.Context, invoice *billing.StandardInvoice, progressiveBilling bool) error { - featureMeters, err := s.resolveFeatureMeters(ctx, invoice.Lines.OrEmpty()) - if err != nil { - return fmt.Errorf("resolving feature meters: %w", err) - } - +func (s Service) checkIfLinesAreInvoicable(ctx context.Context, invoice *billing.StandardInvoice, progressiveBilling bool, featureMeters billing.FeatureMeters) error { linesToCheck := lo.Filter(invoice.Lines.OrEmpty(), func(line *billing.StandardLine, _ int) bool { return line.DeletedAt == nil }) @@ -820,7 +820,7 @@ func (s Service) SimulateInvoice(ctx context.Context, input billing.SimulateInvo } // Let's simulate a recalculation of the invoice - if err := s.invoiceCalculator.Calculate(&invoice, invoicecalc.CalculationDependencies{ + if err := s.invoiceCalculator.Calculate(&invoice, invoicecalc.CalculatorDependencies{ FeatureMeters: featureMeters, }); err != nil { return billing.StandardInvoice{}, err diff --git a/openmeter/billing/service/invoicecalc/calculator.go b/openmeter/billing/service/invoicecalc/calculator.go index 5d8ae2e2e..ccd39411f 100644 --- a/openmeter/billing/service/invoicecalc/calculator.go +++ b/openmeter/billing/service/invoicecalc/calculator.go @@ -1,7 +1,6 @@ package invoicecalc import ( - "context" "errors" "github.com/openmeterio/openmeter/openmeter/billing" @@ -40,22 +39,17 @@ var InvoiceCalculations = invoiceCalculatorsByType{ } type ( - Calculation func(*billing.StandardInvoice, CalculationDependencies) error + Calculation func(*billing.StandardInvoice, CalculatorDependencies) error ) -type CalculationDependencies struct { - FeatureMeters billing.FeatureMeters -} - type Calculator interface { - Calculate(*billing.StandardInvoice, CalculationDependencies) error + Calculate(*billing.StandardInvoice, CalculatorDependencies) error CalculateGatheringInvoice(*billing.StandardInvoice) error - CalculateGatheringInvoiceWithLiveData(*billing.StandardInvoice, CalculationDependencies) error + CalculateGatheringInvoiceWithLiveData(*billing.StandardInvoice, CalculatorDependencies) error } -type ServiceDependencies interface { - // RecalculateInvoiceTotals recalculates the totals of an invoice - RecalculateInvoiceTotals(ctx context.Context, invoice *billing.StandardInvoice) error +type CalculatorDependencies struct { + FeatureMeters billing.FeatureMeters } type calculator struct{} @@ -64,11 +58,11 @@ func New() Calculator { return &calculator{} } -func (c *calculator) Calculate(invoice *billing.StandardInvoice, deps CalculationDependencies) error { +func (c *calculator) Calculate(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { return c.applyCalculations(invoice, InvoiceCalculations.Invoice, deps) } -func (c *calculator) applyCalculations(invoice *billing.StandardInvoice, calculators []Calculation, deps CalculationDependencies) error { +func (c *calculator) applyCalculations(invoice *billing.StandardInvoice, calculators []Calculation, deps CalculatorDependencies) error { var outErr error for _, calc := range calculators { err := calc(invoice, deps) @@ -89,10 +83,10 @@ func (c *calculator) CalculateGatheringInvoice(invoice *billing.StandardInvoice) return errors.New("invoice is not a gathering invoice") } - return c.applyCalculations(invoice, InvoiceCalculations.GatheringInvoice, CalculationDependencies{}) + return c.applyCalculations(invoice, InvoiceCalculations.GatheringInvoice, CalculatorDependencies{}) } -func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.StandardInvoice, deps CalculationDependencies) error { +func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { if invoice.Status != billing.StandardInvoiceStatusGathering { return errors.New("invoice is not a gathering invoice") } @@ -101,7 +95,7 @@ func (c *calculator) CalculateGatheringInvoiceWithLiveData(invoice *billing.Stan } func WithNoDependencies(cb func(inv *billing.StandardInvoice) error) Calculation { - return func(inv *billing.StandardInvoice, _ CalculationDependencies) error { + return func(inv *billing.StandardInvoice, _ CalculatorDependencies) error { return cb(inv) } } diff --git a/openmeter/billing/service/invoicecalc/details.go b/openmeter/billing/service/invoicecalc/details.go index 499bb5225..2edc5b375 100644 --- a/openmeter/billing/service/invoicecalc/details.go +++ b/openmeter/billing/service/invoicecalc/details.go @@ -10,7 +10,7 @@ import ( "github.com/openmeterio/openmeter/openmeter/billing/service/lineservice" ) -func RecalculateDetailedLinesAndTotals(invoice *billing.StandardInvoice, deps CalculationDependencies) error { +func RecalculateDetailedLinesAndTotals(invoice *billing.StandardInvoice, deps CalculatorDependencies) error { if invoice.Lines.IsAbsent() { return errors.New("cannot recaulculate invoice without expanded lines") } diff --git a/openmeter/billing/service/invoicecalc/mock.go b/openmeter/billing/service/invoicecalc/mock.go index df5575a0a..32afbd6be 100644 --- a/openmeter/billing/service/invoicecalc/mock.go +++ b/openmeter/billing/service/invoicecalc/mock.go @@ -26,7 +26,7 @@ type mockCalculator struct { calculateGatheringInvoiceWithLiveDataResultCalled bool } -func (m *mockCalculator) Calculate(i *billing.StandardInvoice, deps CalculationDependencies) error { +func (m *mockCalculator) Calculate(i *billing.StandardInvoice, deps CalculatorDependencies) error { m.calculateResultCalled = true res := m.calculateResult.MustGet() @@ -56,7 +56,7 @@ func (m *mockCalculator) CalculateGatheringInvoice(i *billing.StandardInvoice) e billing.ValidationComponentOpenMeter) } -func (m *mockCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculationDependencies) error { +func (m *mockCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculatorDependencies) error { m.calculateGatheringInvoiceWithLiveDataResultCalled = true res := m.calculateGatheringInvoiceWithLiveDataResult.MustGet() @@ -120,7 +120,7 @@ func NewMockableCalculator(_ *testing.T, upstream Calculator) *MockableInvoiceCa } } -func (m *MockableInvoiceCalculator) Calculate(i *billing.StandardInvoice, deps CalculationDependencies) error { +func (m *MockableInvoiceCalculator) Calculate(i *billing.StandardInvoice, deps CalculatorDependencies) error { outErr := m.upstream.Calculate(i, deps) if m.mock != nil { @@ -146,7 +146,7 @@ func (m *MockableInvoiceCalculator) CalculateGatheringInvoice(i *billing.Standar return outErr } -func (m *MockableInvoiceCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculationDependencies) error { +func (m *MockableInvoiceCalculator) CalculateGatheringInvoiceWithLiveData(i *billing.StandardInvoice, deps CalculatorDependencies) error { outErr := m.upstream.CalculateGatheringInvoiceWithLiveData(i, deps) if m.mock != nil { diff --git a/openmeter/billing/service/stdinvoicestate.go b/openmeter/billing/service/stdinvoicestate.go index c06ddf9eb..1aeb95c5a 100644 --- a/openmeter/billing/service/stdinvoicestate.go +++ b/openmeter/billing/service/stdinvoicestate.go @@ -611,7 +611,7 @@ func (m *InvoiceStateMachine) calculateInvoice(ctx context.Context) error { return fmt.Errorf("resolving feature meters: %w", err) } - return m.Calculator.Calculate(&m.Invoice, invoicecalc.CalculationDependencies{ + return m.Calculator.Calculate(&m.Invoice, invoicecalc.CalculatorDependencies{ FeatureMeters: featureMeters, }) } diff --git a/openmeter/meter/mockadapter/meter.go b/openmeter/meter/mockadapter/meter.go index 7e08e14cb..67a2beea1 100644 --- a/openmeter/meter/mockadapter/meter.go +++ b/openmeter/meter/mockadapter/meter.go @@ -41,7 +41,6 @@ func (c *adapter) ListMeters(_ context.Context, params meter.ListMetersParams) ( } // In memory pagination: case #2 if there is pagination settings return the paginated dataset - pageNumberIndex := params.PageNumber - 1 if pageNumberIndex*params.PageSize > len(meters) { From 04a6f9607a472cc6a3291d19bd3788cad09f42dc Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Jan 2026 13:12:32 +0100 Subject: [PATCH 4/6] fix: self review --- openmeter/billing/service/invoice.go | 2 +- openmeter/billing/service/stdinvoiceline.go | 26 ++++++++------------- openmeter/billing/stdinvoiceline.go | 26 --------------------- 3 files changed, 11 insertions(+), 43 deletions(-) diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 25d36fec6..22f502d25 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -171,7 +171,7 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat return invoice, fmt.Errorf("snapshotting lines: %w", err) } - inScopeLineSvcs, err := s.lineService.FromEntities(inScopeLines, featureMeters) + inScopeLineSvcs, err := lineservice.FromEntities(inScopeLines, featureMeters) if err != nil { return invoice, fmt.Errorf("creating line services: %w", err) } diff --git a/openmeter/billing/service/stdinvoiceline.go b/openmeter/billing/service/stdinvoiceline.go index d4e194dfc..8dc9d116c 100644 --- a/openmeter/billing/service/stdinvoiceline.go +++ b/openmeter/billing/service/stdinvoiceline.go @@ -61,11 +61,6 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C return nil, nil } - featureMeters, err := s.resolveFeatureMeters(ctx, input.Lines) - if err != nil { - return nil, fmt.Errorf("resolving feature meters: %w", err) - } - // let's resolve the customer's settings customerProfile, err := s.GetCustomerOverride(ctx, billing.GetCustomerOverrideInput{ Customer: input.Customer, @@ -97,23 +92,22 @@ func (s *Service) CreatePendingInvoiceLines(ctx context.Context, input billing.C l.ID = ulid.Make().String() l.InvoiceID = gatheringInvoice.ID - return l.WithNormalizedValues() + normalizedLine, err := l.WithNormalizedValues() + if err != nil { + return nil, fmt.Errorf("normalizing line[%s]: %w", l.ID, err) + } + + if err := normalizedLine.Validate(); err != nil { + return nil, fmt.Errorf("validating line[%s]: %w", l.ID, err) + } + + return normalizedLine, nil }) if err != nil { return nil, fmt.Errorf("mapping lines: %w", err) } - lineServices, err := s.lineService.FromEntities(linesToCreate, featureMeters) - if err != nil { - return nil, fmt.Errorf("creating line services: %w", err) - } - - if err := lineServices.ValidateForInvoice(ctx, &gatheringInvoice); err != nil { - return nil, fmt.Errorf("validating lines: %w", err) - } - gatheringInvoice.Lines.Append(linesToCreate...) - gatheringInvoiceID := gatheringInvoice.ID if err := gatheringInvoice.Validate(); err != nil { diff --git a/openmeter/billing/stdinvoiceline.go b/openmeter/billing/stdinvoiceline.go index 73074976d..a4addd5ff 100644 --- a/openmeter/billing/stdinvoiceline.go +++ b/openmeter/billing/stdinvoiceline.go @@ -254,32 +254,6 @@ func (i StandardLine) Clone() *StandardLine { return i.clone(cloneOptions{}) } -// NormalizeValues normalizes the values of the line to ensure they are matching the expected invariants: -// - Period is truncated to the minimum window size duration -// - InvoiceAt is truncated to the minimum window size duration -// - UsageBased.Price is normalized to have the default inAdvance payment term for flat prices -func (i StandardLine) WithNormalizedValues() (*StandardLine, error) { - out := i.Clone() - - out.Period = out.Period.Truncate(streaming.MinimumWindowSizeDuration) - out.InvoiceAt = out.InvoiceAt.Truncate(streaming.MinimumWindowSizeDuration) - - if out.UsageBased.Price.Type() == productcatalog.FlatPriceType { - // Let's apply the default inAdvance payment term for flat prices - flatPrice, err := out.UsageBased.Price.AsFlat() - if err != nil { - return nil, fmt.Errorf("converting price to flat price: %w", err) - } - - if flatPrice.PaymentTerm == "" { - flatPrice.PaymentTerm = productcatalog.InAdvancePaymentTerm - out.UsageBased.Price = productcatalog.NewPriceFrom(flatPrice) - } - } - - return out, nil -} - type cloneOptions struct { skipDBState bool skipChildren bool From 3fc7750863104528b6543417633dde40495707d6 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Jan 2026 13:22:17 +0100 Subject: [PATCH 5/6] fix: rabbit review --- openmeter/billing/service/lineservice/service.go | 2 +- openmeter/billing/stdinvoiceline.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/openmeter/billing/service/lineservice/service.go b/openmeter/billing/service/lineservice/service.go index dcf694ec8..32e91948f 100644 --- a/openmeter/billing/service/lineservice/service.go +++ b/openmeter/billing/service/lineservice/service.go @@ -57,7 +57,7 @@ func UpdateTotalsFromDetailedLines(line *billing.StandardLine) error { } // WARNING: Even if tempting to add discounts etc. here to the totals, we should always keep the logic as is. - // The usageBasedLine will never be syncorinzed directly to stripe or other apps, only the detailed lines. + // The usageBasedLine will never be synchronized directly to stripe or other apps, only the detailed lines. // // Given that the external systems will have their own logic for calculating the totals, we cannot expect // any custom logic implemented here to be carried over to the external systems. diff --git a/openmeter/billing/stdinvoiceline.go b/openmeter/billing/stdinvoiceline.go index a4addd5ff..d79c56b14 100644 --- a/openmeter/billing/stdinvoiceline.go +++ b/openmeter/billing/stdinvoiceline.go @@ -297,6 +297,16 @@ func (i *StandardLine) SaveDBSnapshot() { func (i StandardLine) Validate() error { var errs []error + + // Fail fast cases (most of the validation logic uses these) + if i.UsageBased == nil { + return errors.New("usage based line is required") + } + + if i.UsageBased.Price == nil { + return errors.New("usage based line price is required") + } + if err := i.StandardLineBase.Validate(); err != nil { errs = append(errs, err) } From 30e29be9670a2ce652eb038fe06e5e13f258b6f5 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 29 Jan 2026 13:24:34 +0100 Subject: [PATCH 6/6] fix: more rabbit reviews --- .../billing/service/gatheringinvoicependinglines.go | 12 ------------ openmeter/billing/service/invoice.go | 4 ---- 2 files changed, 16 deletions(-) diff --git a/openmeter/billing/service/gatheringinvoicependinglines.go b/openmeter/billing/service/gatheringinvoicependinglines.go index 86546b3eb..ceb48eec8 100644 --- a/openmeter/billing/service/gatheringinvoicependinglines.go +++ b/openmeter/billing/service/gatheringinvoicependinglines.go @@ -197,10 +197,6 @@ func (in handleInvoicePendingLinesForCurrencyInput) Validate() error { return fmt.Errorf("gathering invoice: %w", err) } - if in.FeatureMeters == nil { - return fmt.Errorf("feature meters cannot be nil") - } - if len(in.InScopeLines) == 0 { return fmt.Errorf("in scope lines must contain at least one line") } @@ -375,10 +371,6 @@ func (i prepareLinesToBillInput) Validate() error { errs = append(errs, fmt.Errorf("no lines to bill")) } - if i.FeatureMeters == nil { - errs = append(errs, fmt.Errorf("feature meters cannot be nil")) - } - if i.GatheringInvoice.Lines.IsAbsent() { errs = append(errs, fmt.Errorf("gathering invoice must have lines expanded")) } @@ -630,10 +622,6 @@ func (in createStandardInvoiceFromGatheringLinesInput) Validate() error { errs = append(errs, fmt.Errorf("effective billing profile: %w", err)) } - if in.FeatureMeters == nil { - errs = append(errs, fmt.Errorf("feature meters cannot be nil")) - } - if len(in.Lines) == 0 { errs = append(errs, fmt.Errorf("lines must contain at least one line")) } diff --git a/openmeter/billing/service/invoice.go b/openmeter/billing/service/invoice.go index 22f502d25..75674d5f9 100644 --- a/openmeter/billing/service/invoice.go +++ b/openmeter/billing/service/invoice.go @@ -159,10 +159,6 @@ func (s *Service) recalculateGatheringInvoice(ctx context.Context, in recalculat return invoice, fmt.Errorf("resolving feature meters: %w", err) } - if customerProfile.Customer == nil { - return invoice, fmt.Errorf("customer profile is nil") - } - inScopeLines := lo.Filter(invoice.Lines.OrEmpty(), func(line *billing.StandardLine, _ int) bool { return line.DeletedAt == nil })