Skip to content

Commit b7a2ca5

Browse files
authored
feat: Adding quota monitoring for premium plugins (#1333)
This adds a background monitoring process that will periodically check the remaining quota for a premium plugin. If the quota is exceeded then a context cancellation is triggered, forcing the sync process to stop. fixes: cloudquery/cloudquery-issues#749
1 parent 6d7be0b commit b7a2ca5

File tree

9 files changed

+449
-70
lines changed

9 files changed

+449
-70
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ require (
3838
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
3939
github.com/Joker/jade v1.1.3 // indirect
4040
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
41+
github.com/adrg/xdg v0.4.0 // indirect
4142
github.com/andybalholm/brotli v1.0.5 // indirect
4243
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect
4344
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
4747
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
4848
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0=
4949
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM=
50+
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
51+
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
5052
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
5153
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
5254
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
@@ -556,6 +558,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
556558
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
557559
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
558560
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
561+
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
559562
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
560563
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
561564
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

premium/monitor.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package premium
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
)
8+
9+
var ErrNoQuota = errors.New("no remaining quota for the month, please increase your usage limit if you want to continue syncing this plugin")
10+
11+
const DefaultQuotaCheckInterval = 30 * time.Second
12+
const DefaultMaxQuotaFailures = 10 // 5 minutes
13+
14+
type quotaChecker struct {
15+
qm QuotaMonitor
16+
duration time.Duration
17+
maxConsecutiveFailures int
18+
}
19+
20+
type QuotaCheckOption func(*quotaChecker)
21+
22+
// WithQuotaCheckPeriod controls the time interval between quota checks
23+
func WithQuotaCheckPeriod(duration time.Duration) QuotaCheckOption {
24+
return func(m *quotaChecker) {
25+
m.duration = duration
26+
}
27+
}
28+
29+
// WithQuotaMaxConsecutiveFailures controls the number of consecutive failed quota checks before the context is cancelled
30+
func WithQuotaMaxConsecutiveFailures(n int) QuotaCheckOption {
31+
return func(m *quotaChecker) {
32+
m.maxConsecutiveFailures = n
33+
}
34+
}
35+
36+
// WithCancelOnQuotaExceeded monitors the quota usage at intervals defined by duration and cancels the context if the quota is exceeded
37+
func WithCancelOnQuotaExceeded(ctx context.Context, qm QuotaMonitor, ops ...QuotaCheckOption) (context.Context, error) {
38+
m := quotaChecker{
39+
qm: qm,
40+
duration: DefaultQuotaCheckInterval,
41+
maxConsecutiveFailures: DefaultMaxQuotaFailures,
42+
}
43+
for _, op := range ops {
44+
op(&m)
45+
}
46+
47+
if err := m.checkInitialQuota(ctx); err != nil {
48+
return ctx, err
49+
}
50+
51+
newCtx := m.startQuotaMonitor(ctx)
52+
53+
return newCtx, nil
54+
}
55+
56+
func (qc quotaChecker) checkInitialQuota(ctx context.Context) error {
57+
hasQuota, err := qc.qm.HasQuota(ctx)
58+
if err != nil {
59+
return err
60+
}
61+
62+
if !hasQuota {
63+
return ErrNoQuota
64+
}
65+
66+
return nil
67+
}
68+
69+
func (qc quotaChecker) startQuotaMonitor(ctx context.Context) context.Context {
70+
newCtx, cancelWithCause := context.WithCancelCause(ctx)
71+
go func() {
72+
ticker := time.NewTicker(qc.duration)
73+
consecutiveFailures := 0
74+
var hasQuotaErrors error
75+
for {
76+
select {
77+
case <-newCtx.Done():
78+
return
79+
case <-ticker.C:
80+
hasQuota, err := qc.qm.HasQuota(newCtx)
81+
if err != nil {
82+
consecutiveFailures++
83+
hasQuotaErrors = errors.Join(hasQuotaErrors, err)
84+
if consecutiveFailures >= qc.maxConsecutiveFailures {
85+
cancelWithCause(hasQuotaErrors)
86+
return
87+
}
88+
continue
89+
}
90+
consecutiveFailures = 0
91+
hasQuotaErrors = nil
92+
if !hasQuota {
93+
cancelWithCause(ErrNoQuota)
94+
return
95+
}
96+
}
97+
}
98+
}()
99+
return newCtx
100+
}

premium/monitor_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package premium
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
type quotaResponse struct {
13+
hasQuota bool
14+
err error
15+
}
16+
17+
func newFakeQuotaMonitor(hasQuota ...quotaResponse) *fakeQuotaMonitor {
18+
return &fakeQuotaMonitor{responses: hasQuota}
19+
}
20+
21+
type fakeQuotaMonitor struct {
22+
responses []quotaResponse
23+
calls int
24+
}
25+
26+
func (f *fakeQuotaMonitor) HasQuota(_ context.Context) (bool, error) {
27+
resp := f.responses[f.calls]
28+
if f.calls < len(f.responses)-1 {
29+
f.calls++
30+
}
31+
return resp.hasQuota, resp.err
32+
}
33+
34+
func TestWithCancelOnQuotaExceeded_NoInitialQuota(t *testing.T) {
35+
ctx := context.Background()
36+
37+
responses := []quotaResponse{
38+
{false, nil},
39+
}
40+
_, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(responses...))
41+
42+
require.Error(t, err)
43+
}
44+
45+
func TestWithCancelOnQuotaExceeded_NoQuota(t *testing.T) {
46+
ctx := context.Background()
47+
48+
responses := []quotaResponse{
49+
{true, nil},
50+
{false, nil},
51+
}
52+
ctx, err := WithCancelOnQuotaExceeded(ctx, newFakeQuotaMonitor(responses...), WithQuotaCheckPeriod(1*time.Millisecond))
53+
require.NoError(t, err)
54+
55+
<-ctx.Done()
56+
cause := context.Cause(ctx)
57+
require.Equal(t, ErrNoQuota, cause)
58+
}
59+
60+
func TestWithCancelOnQuotaCheckConsecutiveFailures(t *testing.T) {
61+
ctx := context.Background()
62+
63+
responses := []quotaResponse{
64+
{true, nil},
65+
{false, errors.New("test2")},
66+
{false, errors.New("test3")},
67+
}
68+
ctx, err := WithCancelOnQuotaExceeded(ctx,
69+
newFakeQuotaMonitor(responses...),
70+
WithQuotaCheckPeriod(1*time.Millisecond),
71+
WithQuotaMaxConsecutiveFailures(2),
72+
)
73+
require.NoError(t, err)
74+
<-ctx.Done()
75+
cause := context.Cause(ctx)
76+
require.Equal(t, "test2\ntest3", cause.Error())
77+
}

premium/tables.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package premium
2+
3+
import "github.com/cloudquery/plugin-sdk/v4/schema"
4+
5+
// ContainsPaidTables returns true if any of the tables are paid
6+
func ContainsPaidTables(tables schema.Tables) bool {
7+
for _, t := range tables {
8+
if t.IsPaid {
9+
return true
10+
}
11+
}
12+
return false
13+
}
14+
15+
// MakeAllTablesPaid sets all tables to paid
16+
func MakeAllTablesPaid(tables schema.Tables) schema.Tables {
17+
for _, table := range tables {
18+
MakeTablePaid(table)
19+
}
20+
return tables
21+
}
22+
23+
// MakeTablePaid sets the table to paid
24+
func MakeTablePaid(table *schema.Table) *schema.Table {
25+
table.IsPaid = true
26+
return table
27+
}

premium/tables_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package premium
2+
3+
import (
4+
"github.com/cloudquery/plugin-sdk/v4/schema"
5+
"github.com/stretchr/testify/assert"
6+
"testing"
7+
)
8+
9+
func TestContainsPaidTables(t *testing.T) {
10+
noPaidTables := schema.Tables{
11+
&schema.Table{Name: "table1", IsPaid: false},
12+
&schema.Table{Name: "table2", IsPaid: false},
13+
&schema.Table{Name: "table3", IsPaid: false},
14+
}
15+
16+
paidTables := schema.Tables{
17+
&schema.Table{Name: "table1", IsPaid: false},
18+
&schema.Table{Name: "table2", IsPaid: true},
19+
&schema.Table{Name: "table3", IsPaid: false},
20+
}
21+
22+
assert.False(t, ContainsPaidTables(noPaidTables), "no paid tables")
23+
assert.True(t, ContainsPaidTables(paidTables), "paid tables")
24+
}
25+
26+
func TestMakeAllTablesPaid(t *testing.T) {
27+
noPaidTables := schema.Tables{
28+
&schema.Table{Name: "table1", IsPaid: false},
29+
&schema.Table{Name: "table2", IsPaid: false},
30+
&schema.Table{Name: "table3", IsPaid: false},
31+
}
32+
33+
paidTables := MakeAllTablesPaid(noPaidTables)
34+
35+
assert.Equal(t, 3, len(paidTables))
36+
for _, table := range paidTables {
37+
assert.True(t, table.IsPaid)
38+
}
39+
}

0 commit comments

Comments
 (0)