Skip to content

Commit 861409e

Browse files
authored
feat(streaming): list subjects (#3395)
1 parent 7e18ee1 commit 861409e

File tree

22 files changed

+409
-64
lines changed

22 files changed

+409
-64
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ require (
481481
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
482482
github.com/x448/float16 v0.8.4 // indirect
483483
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
484-
github.com/xdg-go/scram v1.1.2 // indirect
484+
github.com/xdg-go/scram v1.1.2
485485
github.com/xdg-go/stringprep v1.0.4 // indirect
486486
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
487487
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1999,8 +1999,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
19991999
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
20002000
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
20012001
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
2002-
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
2003-
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
20042002
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
20052003
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
20062004
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

openmeter/customer/adapter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package customer
33
import (
44
"context"
55

6+
"github.com/openmeterio/openmeter/openmeter/streaming"
67
"github.com/openmeterio/openmeter/pkg/framework/entutils"
78
"github.com/openmeterio/openmeter/pkg/pagination"
89
)
@@ -15,6 +16,7 @@ type Adapter interface {
1516

1617
type CustomerAdapter interface {
1718
ListCustomers(ctx context.Context, params ListCustomersInput) (pagination.Result[Customer], error)
19+
ListCustomerUsageAttributions(ctx context.Context, input ListCustomerUsageAttributionsInput) (pagination.Result[streaming.CustomerUsageAttribution], error)
1820
CreateCustomer(ctx context.Context, params CreateCustomerInput) (*Customer, error)
1921
DeleteCustomer(ctx context.Context, customer DeleteCustomerInput) error
2022
GetCustomer(ctx context.Context, customer GetCustomerInput) (*Customer, error)

openmeter/customer/adapter/customer.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
plandb "github.com/openmeterio/openmeter/openmeter/ent/db/plan"
1717
"github.com/openmeterio/openmeter/openmeter/ent/db/predicate"
1818
subscriptiondb "github.com/openmeterio/openmeter/openmeter/ent/db/subscription"
19+
"github.com/openmeterio/openmeter/openmeter/streaming"
1920
"github.com/openmeterio/openmeter/pkg/clock"
2021
"github.com/openmeterio/openmeter/pkg/framework/entutils"
2122
"github.com/openmeterio/openmeter/pkg/models"
@@ -127,6 +128,80 @@ func (a *adapter) ListCustomers(ctx context.Context, input customer.ListCustomer
127128
})
128129
}
129130

131+
// ListCustomerUsageAttributions lists customers usage attributions
132+
func (a *adapter) ListCustomerUsageAttributions(ctx context.Context, input customer.ListCustomerUsageAttributionsInput) (pagination.Result[streaming.CustomerUsageAttribution], error) {
133+
if err := input.Validate(); err != nil {
134+
return pagination.Result[streaming.CustomerUsageAttribution]{}, models.NewGenericValidationError(err)
135+
}
136+
137+
return entutils.TransactingRepo(ctx, a, func(ctx context.Context, repo *adapter) (pagination.Result[streaming.CustomerUsageAttribution], error) {
138+
// Build the database query
139+
now := clock.Now().UTC()
140+
141+
query := repo.db.Customer.Query().
142+
// We only need to select the fields we need for the usage attribution to optimize the query
143+
Select(
144+
customerdb.FieldID,
145+
customerdb.FieldKey,
146+
).
147+
Where(customerdb.Namespace(input.Namespace)).
148+
Order(customerdb.ByID(sql.OrderAsc()))
149+
query = WithSubjects(query, now)
150+
151+
// Filters
152+
if len(input.CustomerIDs) > 0 {
153+
query = query.Where(customerdb.IDIn(input.CustomerIDs...))
154+
}
155+
156+
// Do not return deleted customers by default
157+
if !input.IncludeDeleted {
158+
query = query.Where(customerdb.Or(
159+
customerdb.DeletedAtIsNil(),
160+
customerdb.DeletedAtGTE(now),
161+
))
162+
}
163+
164+
// Response
165+
response := pagination.Result[streaming.CustomerUsageAttribution]{
166+
Page: input.Page,
167+
}
168+
169+
paged, err := query.Paginate(ctx, input.Page)
170+
if err != nil {
171+
return response, err
172+
}
173+
174+
result := make([]streaming.CustomerUsageAttribution, 0, len(paged.Items))
175+
for _, item := range paged.Items {
176+
if item == nil {
177+
a.logger.WarnContext(ctx, "invalid query result: nil customer received")
178+
continue
179+
}
180+
181+
subjectKeys, err := subjectKeysFromDBEntity(*item)
182+
if err != nil {
183+
return response, err
184+
}
185+
186+
usageAttribution := streaming.CustomerUsageAttribution{
187+
ID: item.ID,
188+
SubjectKeys: subjectKeys,
189+
}
190+
191+
if item.Key != "" {
192+
usageAttribution.Key = &item.Key
193+
}
194+
195+
result = append(result, usageAttribution)
196+
}
197+
198+
response.TotalCount = paged.TotalCount
199+
response.Items = result
200+
201+
return response, nil
202+
})
203+
}
204+
130205
// CreateCustomer creates a new customer
131206
func (a *adapter) CreateCustomer(ctx context.Context, input customer.CreateCustomerInput) (*customer.Customer, error) {
132207
if err := input.Validate(); err != nil {

openmeter/customer/adapter/entitymapping.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package adapter
22

33
import (
44
"errors"
5+
"slices"
56

67
"github.com/samber/lo"
78

@@ -11,23 +12,11 @@ import (
1112
)
1213

1314
func CustomerFromDBEntity(e db.Customer) (*customer.Customer, error) {
14-
subjects, err := e.Edges.SubjectsOrErr()
15+
subjectKeys, err := subjectKeysFromDBEntity(e)
1516
if err != nil {
16-
if db.IsNotLoaded(err) {
17-
return nil, errors.New("subjects must be loaded for customer")
18-
}
19-
2017
return nil, err
2118
}
2219

23-
subjectKeys := lo.FilterMap(subjects, func(item *db.CustomerSubjects, _ int) (string, bool) {
24-
if item == nil {
25-
return "", false
26-
}
27-
28-
return item.SubjectKey, true
29-
})
30-
3120
subscriptions, err := e.Edges.SubscriptionOrErr()
3221
if err != nil {
3322
if db.IsNotLoaded(err) {
@@ -96,3 +85,27 @@ func CustomerFromDBEntity(e db.Customer) (*customer.Customer, error) {
9685

9786
return result, nil
9887
}
88+
89+
func subjectKeysFromDBEntity(customerEntity db.Customer) ([]string, error) {
90+
subjectEntities, err := customerEntity.Edges.SubjectsOrErr()
91+
if err != nil {
92+
if db.IsNotLoaded(err) {
93+
return nil, errors.New("subjects must be loaded for customer")
94+
}
95+
96+
return nil, err
97+
}
98+
99+
subjectKeys := lo.FilterMap(subjectEntities, func(item *db.CustomerSubjects, _ int) (string, bool) {
100+
if item == nil {
101+
return "", false
102+
}
103+
104+
return item.SubjectKey, true
105+
})
106+
107+
// Sort the subject keys to make sure the order is consistent
108+
slices.Sort(subjectKeys)
109+
110+
return subjectKeys, nil
111+
}

openmeter/customer/customer.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,25 @@ func (i ListCustomersInput) Validate() error {
211211
return nil
212212
}
213213

214+
// ListCustomerUsageAttributionsInput represents the input for the ListCustomerUsageAttributions method
215+
216+
type ListCustomerUsageAttributionsInput struct {
217+
Namespace string
218+
pagination.Page
219+
220+
// Filters
221+
IncludeDeleted bool
222+
CustomerIDs []string
223+
}
224+
225+
func (i ListCustomerUsageAttributionsInput) Validate() error {
226+
if i.Namespace == "" {
227+
return models.NewGenericValidationError(errors.New("namespace is required"))
228+
}
229+
230+
return nil
231+
}
232+
214233
// CreateCustomerInput represents the input for the CreateCustomer method
215234
type CreateCustomerInput struct {
216235
Namespace string

openmeter/customer/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package customer
33
import (
44
"context"
55

6+
"github.com/openmeterio/openmeter/openmeter/streaming"
67
"github.com/openmeterio/openmeter/pkg/models"
78
"github.com/openmeterio/openmeter/pkg/pagination"
89
)
@@ -20,6 +21,7 @@ type RequestValidatorService interface {
2021

2122
type CustomerService interface {
2223
ListCustomers(ctx context.Context, params ListCustomersInput) (pagination.Result[Customer], error)
24+
ListCustomerUsageAttributions(ctx context.Context, input ListCustomerUsageAttributionsInput) (pagination.Result[streaming.CustomerUsageAttribution], error)
2325
CreateCustomer(ctx context.Context, params CreateCustomerInput) (*Customer, error)
2426
DeleteCustomer(ctx context.Context, customer DeleteCustomerInput) error
2527
GetCustomer(ctx context.Context, customer GetCustomerInput) (*Customer, error)

openmeter/customer/service/customer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/openmeterio/openmeter/openmeter/customer"
8+
"github.com/openmeterio/openmeter/openmeter/streaming"
89
"github.com/openmeterio/openmeter/pkg/framework/transaction"
910
"github.com/openmeterio/openmeter/pkg/models"
1011
"github.com/openmeterio/openmeter/pkg/pagination"
@@ -17,6 +18,11 @@ func (s *Service) ListCustomers(ctx context.Context, input customer.ListCustomer
1718
return s.adapter.ListCustomers(ctx, input)
1819
}
1920

21+
// ListCustomerUsageAttributions lists customer usage attributions
22+
func (s *Service) ListCustomerUsageAttributions(ctx context.Context, input customer.ListCustomerUsageAttributionsInput) (pagination.Result[streaming.CustomerUsageAttribution], error) {
23+
return s.adapter.ListCustomerUsageAttributions(ctx, input)
24+
}
25+
2026
// CreateCustomer creates a customer
2127
func (s *Service) CreateCustomer(ctx context.Context, input customer.CreateCustomerInput) (*customer.Customer, error) {
2228
// Validate the input

0 commit comments

Comments
 (0)