Skip to content

Commit 46c4297

Browse files
authored
feat: add callback support for tenant and tenant member updates (#3201)
* feat: add callback support for tenant and tenant member updates * add callbacks into tenant_invite.go
1 parent 56a5f26 commit 46c4297

File tree

2 files changed

+192
-15
lines changed

2 files changed

+192
-15
lines changed

pkg/repository/tenant.go

Lines changed: 125 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ type GetQueueMetricsResponse struct {
8686

8787
type TenantRepository interface {
8888
RegisterCreateCallback(callback UnscopedCallback[*sqlcv1.Tenant])
89+
RegisterUpdateCallback(callback UnscopedCallback[*sqlcv1.Tenant])
90+
RegisterDeleteCallback(callback UnscopedCallback[*sqlcv1.Tenant])
91+
92+
RegisterCreateMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow])
93+
RegisterUpdateMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow])
94+
RegisterDeleteMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow])
8995

9096
// CreateTenant creates a new tenant.
9197
CreateTenant(ctx context.Context, opts *CreateTenantOpts) (*sqlcv1.Tenant, error)
@@ -181,6 +187,12 @@ type tenantRepository struct {
181187
cache cache.Cacheable
182188
defaultTenantVersion sqlcv1.TenantMajorEngineVersion
183189
createCallbacks []UnscopedCallback[*sqlcv1.Tenant]
190+
updateCallbacks []UnscopedCallback[*sqlcv1.Tenant]
191+
deleteCallbacks []UnscopedCallback[*sqlcv1.Tenant]
192+
193+
createMemberCallbacks []UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]
194+
updateMemberCallbacks []UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]
195+
deleteMemberCallbacks []UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]
184196
}
185197

186198
func newTenantRepository(shared *sharedRepository, cacheDuration time.Duration) TenantRepository {
@@ -199,6 +211,46 @@ func (r *tenantRepository) RegisterCreateCallback(callback UnscopedCallback[*sql
199211
r.createCallbacks = append(r.createCallbacks, callback)
200212
}
201213

214+
func (r *tenantRepository) RegisterUpdateCallback(callback UnscopedCallback[*sqlcv1.Tenant]) {
215+
if r.updateCallbacks == nil {
216+
r.updateCallbacks = make([]UnscopedCallback[*sqlcv1.Tenant], 0)
217+
}
218+
219+
r.updateCallbacks = append(r.updateCallbacks, callback)
220+
}
221+
222+
func (r *tenantRepository) RegisterDeleteCallback(callback UnscopedCallback[*sqlcv1.Tenant]) {
223+
if r.deleteCallbacks == nil {
224+
r.deleteCallbacks = make([]UnscopedCallback[*sqlcv1.Tenant], 0)
225+
}
226+
227+
r.deleteCallbacks = append(r.deleteCallbacks, callback)
228+
}
229+
230+
func (r *tenantRepository) RegisterCreateMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]) {
231+
if r.createMemberCallbacks == nil {
232+
r.createMemberCallbacks = make([]UnscopedCallback[*sqlcv1.PopulateTenantMembersRow], 0)
233+
}
234+
235+
r.createMemberCallbacks = append(r.createMemberCallbacks, callback)
236+
}
237+
238+
func (r *tenantRepository) RegisterUpdateMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]) {
239+
if r.updateMemberCallbacks == nil {
240+
r.updateMemberCallbacks = make([]UnscopedCallback[*sqlcv1.PopulateTenantMembersRow], 0)
241+
}
242+
243+
r.updateMemberCallbacks = append(r.updateMemberCallbacks, callback)
244+
}
245+
246+
func (r *tenantRepository) RegisterDeleteMemberCallback(callback UnscopedCallback[*sqlcv1.PopulateTenantMembersRow]) {
247+
if r.deleteMemberCallbacks == nil {
248+
r.deleteMemberCallbacks = make([]UnscopedCallback[*sqlcv1.PopulateTenantMembersRow], 0)
249+
}
250+
251+
r.deleteMemberCallbacks = append(r.deleteMemberCallbacks, callback)
252+
}
253+
202254
func (r *tenantRepository) CreateTenant(ctx context.Context, opts *CreateTenantOpts) (*sqlcv1.Tenant, error) {
203255
if err := r.v.Validate(opts); err != nil {
204256
return nil, err
@@ -310,11 +362,21 @@ func (r *tenantRepository) UpdateTenant(ctx context.Context, id uuid.UUID, opts
310362
params.Version = *opts.Version
311363
}
312364

313-
return r.queries.UpdateTenant(
365+
updated, err := r.queries.UpdateTenant(
314366
ctx,
315367
r.pool,
316368
params,
317369
)
370+
371+
if err != nil {
372+
return nil, err
373+
}
374+
375+
for _, cb := range r.updateCallbacks {
376+
cb.Do(r.l, updated)
377+
}
378+
379+
return updated, nil
318380
}
319381

320382
func (r *tenantRepository) GetTenantByID(ctx context.Context, id uuid.UUID) (*sqlcv1.Tenant, error) {
@@ -350,7 +412,17 @@ func (r *tenantRepository) CreateTenantMember(ctx context.Context, tenantId uuid
350412
return nil, err
351413
}
352414

353-
return r.populateSingleTenantMember(ctx, createdMember.ID)
415+
populated, err := r.populateSingleTenantMember(ctx, createdMember.ID)
416+
417+
if err != nil {
418+
return nil, err
419+
}
420+
421+
for _, cb := range r.createMemberCallbacks {
422+
cb.Do(r.l, populated)
423+
}
424+
425+
return populated, nil
354426
}
355427

356428
func (r *tenantRepository) GetTenantMemberByID(ctx context.Context, memberId uuid.UUID) (*sqlcv1.PopulateTenantMembersRow, error) {
@@ -447,7 +519,17 @@ func (r *tenantRepository) UpdateTenantMember(ctx context.Context, memberId uuid
447519
return nil, err
448520
}
449521

450-
return r.populateSingleTenantMember(ctx, updatedMember.ID)
522+
populated, err := r.populateSingleTenantMember(ctx, updatedMember.ID)
523+
524+
if err != nil {
525+
return nil, err
526+
}
527+
528+
for _, cb := range r.updateMemberCallbacks {
529+
cb.Do(r.l, populated)
530+
}
531+
532+
return populated, nil
451533
}
452534

453535
func (r *sharedRepository) populateSingleTenantMember(ctx context.Context, ids uuid.UUID) (*sqlcv1.PopulateTenantMembersRow, error) {
@@ -473,11 +555,26 @@ func (r *sharedRepository) populateTenantMembers(ctx context.Context, ids []uuid
473555
}
474556

475557
func (r *tenantRepository) DeleteTenantMember(ctx context.Context, memberId uuid.UUID) error {
476-
return r.queries.DeleteTenantMember(
477-
ctx,
478-
r.pool,
479-
memberId,
480-
)
558+
var populated *sqlcv1.PopulateTenantMembersRow
559+
560+
if len(r.deleteMemberCallbacks) > 0 {
561+
var err error
562+
populated, err = r.populateSingleTenantMember(ctx, memberId)
563+
564+
if err != nil {
565+
return err
566+
}
567+
}
568+
569+
if err := r.queries.DeleteTenantMember(ctx, r.pool, memberId); err != nil {
570+
return err
571+
}
572+
573+
for _, cb := range r.deleteMemberCallbacks {
574+
cb.Do(r.l, populated)
575+
}
576+
577+
return nil
481578
}
482579

483580
func (r *tenantRepository) GetQueueMetrics(ctx context.Context, tenantId uuid.UUID, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error) {
@@ -789,7 +886,26 @@ func (r *tenantRepository) RebalanceInactiveSchedulerPartitions(ctx context.Cont
789886
}
790887

791888
func (r *tenantRepository) DeleteTenant(ctx context.Context, id uuid.UUID) error {
792-
return r.queries.DeleteTenant(ctx, r.pool, id)
889+
var tenant *sqlcv1.Tenant
890+
891+
if len(r.deleteCallbacks) > 0 {
892+
var err error
893+
tenant, err = r.queries.GetTenantByID(ctx, r.pool, id)
894+
895+
if err != nil {
896+
return err
897+
}
898+
}
899+
900+
if err := r.queries.DeleteTenant(ctx, r.pool, id); err != nil {
901+
return err
902+
}
903+
904+
for _, cb := range r.deleteCallbacks {
905+
cb.Do(r.l, tenant)
906+
}
907+
908+
return nil
793909
}
794910

795911
func (r *tenantRepository) GetTenantUsageData(ctx context.Context, tenantId uuid.UUID) (*sqlcv1.GetTenantUsageDataRow, error) {

pkg/repository/tenant_invite.go

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ type ListTenantInvitesOpts struct {
4848
}
4949

5050
type TenantInviteRepository interface {
51+
RegisterCreateCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink])
52+
RegisterUpdateCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink])
53+
RegisterDeleteCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink])
54+
5155
// CreateTenantInvite creates a new tenant invite with the given options
5256
CreateTenantInvite(ctx context.Context, tenantId uuid.UUID, opts *CreateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error)
5357

@@ -70,6 +74,10 @@ type TenantInviteRepository interface {
7074

7175
type tenantInviteRepository struct {
7276
*sharedRepository
77+
78+
createCallbacks []UnscopedCallback[*sqlcv1.TenantInviteLink]
79+
updateCallbacks []UnscopedCallback[*sqlcv1.TenantInviteLink]
80+
deleteCallbacks []UnscopedCallback[*sqlcv1.TenantInviteLink]
7381
}
7482

7583
func newTenantInviteRepository(shared *sharedRepository) TenantInviteRepository {
@@ -78,6 +86,30 @@ func newTenantInviteRepository(shared *sharedRepository) TenantInviteRepository
7886
}
7987
}
8088

89+
func (r *tenantInviteRepository) RegisterCreateCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink]) {
90+
if r.createCallbacks == nil {
91+
r.createCallbacks = make([]UnscopedCallback[*sqlcv1.TenantInviteLink], 0)
92+
}
93+
94+
r.createCallbacks = append(r.createCallbacks, callback)
95+
}
96+
97+
func (r *tenantInviteRepository) RegisterUpdateCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink]) {
98+
if r.updateCallbacks == nil {
99+
r.updateCallbacks = make([]UnscopedCallback[*sqlcv1.TenantInviteLink], 0)
100+
}
101+
102+
r.updateCallbacks = append(r.updateCallbacks, callback)
103+
}
104+
105+
func (r *tenantInviteRepository) RegisterDeleteCallback(callback UnscopedCallback[*sqlcv1.TenantInviteLink]) {
106+
if r.deleteCallbacks == nil {
107+
r.deleteCallbacks = make([]UnscopedCallback[*sqlcv1.TenantInviteLink], 0)
108+
}
109+
110+
r.deleteCallbacks = append(r.deleteCallbacks, callback)
111+
}
112+
81113
func (r *tenantInviteRepository) CreateTenantInvite(ctx context.Context, tenantId uuid.UUID, opts *CreateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error) {
82114
if err := r.v.Validate(opts); err != nil {
83115
return nil, err
@@ -142,6 +174,10 @@ func (r *tenantInviteRepository) CreateTenantInvite(ctx context.Context, tenantI
142174
return nil, err
143175
}
144176

177+
for _, cb := range r.createCallbacks {
178+
cb.Do(r.l, invite)
179+
}
180+
145181
return invite, nil
146182
}
147183

@@ -214,17 +250,42 @@ func (r *tenantInviteRepository) UpdateTenantInvite(ctx context.Context, id uuid
214250
}
215251
}
216252

217-
return r.queries.UpdateTenantInvite(
253+
updated, err := r.queries.UpdateTenantInvite(
218254
ctx,
219255
r.pool,
220256
params,
221257
)
258+
259+
if err != nil {
260+
return nil, err
261+
}
262+
263+
for _, cb := range r.updateCallbacks {
264+
cb.Do(r.l, updated)
265+
}
266+
267+
return updated, nil
222268
}
223269

224270
func (r *tenantInviteRepository) DeleteTenantInvite(ctx context.Context, id uuid.UUID) error {
225-
return r.queries.DeleteTenantInvite(
226-
ctx,
227-
r.pool,
228-
id,
229-
)
271+
var invite *sqlcv1.TenantInviteLink
272+
273+
if len(r.deleteCallbacks) > 0 {
274+
var err error
275+
invite, err = r.queries.GetInviteById(ctx, r.pool, id)
276+
277+
if err != nil {
278+
return err
279+
}
280+
}
281+
282+
if err := r.queries.DeleteTenantInvite(ctx, r.pool, id); err != nil {
283+
return err
284+
}
285+
286+
for _, cb := range r.deleteCallbacks {
287+
cb.Do(r.l, invite)
288+
}
289+
290+
return nil
230291
}

0 commit comments

Comments
 (0)