Skip to content

Commit 0301ed7

Browse files
authored
feat: Adding a batch updater to allow usage updates to be batched (#1326)
This will allows a client to update the usage asynchronously. The updater will only call the API when a configurable number of rows have been updated or a timeout is reached. In addition the updater will flush any remaining rows before closing. If an error is encountered then an exponential backoff up to a max number of retries or a maximum wait time (relative to the start time of the last query) will be followed. In addition, if the server replies with a status of `429` and includes a `Retry-After` header, then the client will wait that number of seconds before retrying. It might be worth considering just using a retry library also e.g. https://github.com/avast/retry-go.
1 parent ee24384 commit 0301ed7

File tree

4 files changed

+658
-0
lines changed

4 files changed

+658
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ require (
3838
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
3939
github.com/Joker/jade v1.1.3 // indirect
4040
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
41+
github.com/adrg/xdg v0.4.0 // indirect
4142
github.com/andybalholm/brotli v1.0.5 // indirect
4243
github.com/apache/arrow/go/v13 v13.0.0-20230731205701-112f94971882 // indirect
4344
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
4747
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
4848
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 h1:KkH3I3sJuOLP3TjA/dfr4NAY8bghDwnXiU7cTKxQqo0=
4949
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06/go.mod h1:7erjKLwalezA0k99cWs5L11HWOAPNjdUZ6RxH1BXbbM=
50+
github.com/adrg/xdg v0.4.0 h1:RzRqFcjH4nE5C6oTAxhBtoE2IRyjBSa62SCbyPidvls=
51+
github.com/adrg/xdg v0.4.0/go.mod h1:N6ag73EX4wyxeaoeHctc1mas01KZgsj5tYiAIwqJE/E=
5052
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
5153
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
5254
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
@@ -556,6 +558,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
556558
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
557559
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
558560
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
561+
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
559562
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
560563
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
561564
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

premium/usage.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package premium
2+
3+
import (
4+
"context"
5+
"fmt"
6+
cqapi "github.com/cloudquery/cloudquery-api-go"
7+
"github.com/google/uuid"
8+
"github.com/rs/zerolog/log"
9+
"math/rand"
10+
"net/http"
11+
"sync/atomic"
12+
"time"
13+
)
14+
15+
const (
16+
defaultBatchLimit = 1000
17+
defaultMaxRetries = 5
18+
defaultMaxWaitTime = 60 * time.Second
19+
defaultMinTimeBetweenFlushes = 10 * time.Second
20+
defaultMaxTimeBetweenFlushes = 30 * time.Second
21+
)
22+
23+
type UsageClient interface {
24+
// Increase updates the usage by the given number of rows
25+
Increase(context.Context, uint32)
26+
// HasQuota returns true if the quota has not been exceeded
27+
HasQuota(context.Context) (bool, error)
28+
// Close flushes any remaining rows and closes the quota service
29+
Close() error
30+
}
31+
32+
type UpdaterOptions func(updater *BatchUpdater)
33+
34+
// WithBatchLimit sets the maximum number of rows to update in a single request
35+
func WithBatchLimit(batchLimit uint32) UpdaterOptions {
36+
return func(updater *BatchUpdater) {
37+
updater.batchLimit = batchLimit
38+
}
39+
}
40+
41+
// WithMaxTimeBetweenFlushes sets the flush duration - the time at which an update will be triggered even if the batch limit is not reached
42+
func WithMaxTimeBetweenFlushes(maxTimeBetweenFlushes time.Duration) UpdaterOptions {
43+
return func(updater *BatchUpdater) {
44+
updater.maxTimeBetweenFlushes = maxTimeBetweenFlushes
45+
}
46+
}
47+
48+
// WithMinTimeBetweenFlushes sets the minimum time between updates
49+
func WithMinTimeBetweenFlushes(minTimeBetweenFlushes time.Duration) UpdaterOptions {
50+
return func(updater *BatchUpdater) {
51+
updater.minTimeBetweenFlushes = minTimeBetweenFlushes
52+
}
53+
}
54+
55+
// WithMaxRetries sets the maximum number of retries to update the usage in case of an API error
56+
func WithMaxRetries(maxRetries int) UpdaterOptions {
57+
return func(updater *BatchUpdater) {
58+
updater.maxRetries = maxRetries
59+
}
60+
}
61+
62+
// WithMaxWaitTime sets the maximum time to wait before retrying a failed update
63+
func WithMaxWaitTime(maxWaitTime time.Duration) UpdaterOptions {
64+
return func(updater *BatchUpdater) {
65+
updater.maxWaitTime = maxWaitTime
66+
}
67+
}
68+
69+
type BatchUpdater struct {
70+
apiClient *cqapi.ClientWithResponses
71+
72+
// Plugin details
73+
teamName string
74+
pluginTeam string
75+
pluginKind string
76+
pluginName string
77+
78+
// Configuration
79+
batchLimit uint32
80+
maxRetries int
81+
maxWaitTime time.Duration
82+
minTimeBetweenFlushes time.Duration
83+
maxTimeBetweenFlushes time.Duration
84+
85+
// State
86+
lastUpdateTime time.Time
87+
rowsToUpdate atomic.Uint32
88+
triggerUpdate chan struct{}
89+
done chan struct{}
90+
closeError chan error
91+
isClosed bool
92+
}
93+
94+
func NewUsageClient(ctx context.Context, apiClient *cqapi.ClientWithResponses, teamName, pluginTeam, pluginKind, pluginName string, ops ...UpdaterOptions) *BatchUpdater {
95+
u := &BatchUpdater{
96+
apiClient: apiClient,
97+
98+
teamName: teamName,
99+
pluginTeam: pluginTeam,
100+
pluginKind: pluginKind,
101+
pluginName: pluginName,
102+
103+
batchLimit: defaultBatchLimit,
104+
minTimeBetweenFlushes: defaultMinTimeBetweenFlushes,
105+
maxTimeBetweenFlushes: defaultMaxTimeBetweenFlushes,
106+
maxRetries: defaultMaxRetries,
107+
maxWaitTime: defaultMaxWaitTime,
108+
triggerUpdate: make(chan struct{}),
109+
done: make(chan struct{}),
110+
closeError: make(chan error),
111+
}
112+
for _, op := range ops {
113+
op(u)
114+
}
115+
116+
u.backgroundUpdater(ctx)
117+
118+
return u
119+
}
120+
121+
func (u *BatchUpdater) Increase(_ context.Context, rows uint32) error {
122+
if rows <= 0 {
123+
return fmt.Errorf("rows must be greater than zero got %d", rows)
124+
}
125+
126+
if u.isClosed {
127+
return fmt.Errorf("usage updater is closed")
128+
}
129+
130+
u.rowsToUpdate.Add(rows)
131+
132+
// Trigger an update unless an update is already in process
133+
select {
134+
case u.triggerUpdate <- struct{}{}:
135+
default:
136+
return nil
137+
}
138+
139+
return nil
140+
}
141+
142+
func (u *BatchUpdater) HasQuota(ctx context.Context) (bool, error) {
143+
usage, err := u.apiClient.GetTeamPluginUsageWithResponse(ctx, u.teamName, u.pluginTeam, cqapi.PluginKind(u.pluginKind), u.pluginName)
144+
if err != nil {
145+
return false, fmt.Errorf("failed to get usage: %w", err)
146+
}
147+
if usage.StatusCode() != http.StatusOK {
148+
return false, fmt.Errorf("failed to get usage: %s", usage.Status())
149+
}
150+
return *usage.JSON200.RemainingRows > 0, nil
151+
}
152+
153+
func (u *BatchUpdater) Close(_ context.Context) error {
154+
u.isClosed = true
155+
156+
close(u.done)
157+
158+
return <-u.closeError
159+
}
160+
161+
func (u *BatchUpdater) backgroundUpdater(ctx context.Context) {
162+
started := make(chan struct{})
163+
164+
flushDuration := time.NewTicker(u.maxTimeBetweenFlushes)
165+
166+
go func() {
167+
started <- struct{}{}
168+
for {
169+
select {
170+
case <-u.triggerUpdate:
171+
if time.Since(u.lastUpdateTime) < u.minTimeBetweenFlushes {
172+
// Not enough time since last update
173+
continue
174+
}
175+
176+
rowsToUpdate := u.rowsToUpdate.Load()
177+
if rowsToUpdate < u.batchLimit {
178+
// Not enough rows to update
179+
continue
180+
}
181+
if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil {
182+
log.Warn().Err(err).Msg("failed to update usage")
183+
continue
184+
}
185+
u.rowsToUpdate.Add(-rowsToUpdate)
186+
case <-flushDuration.C:
187+
if time.Since(u.lastUpdateTime) < u.minTimeBetweenFlushes {
188+
// Not enough time since last update
189+
continue
190+
}
191+
rowsToUpdate := u.rowsToUpdate.Load()
192+
if rowsToUpdate == 0 {
193+
continue
194+
}
195+
if err := u.updateUsageWithRetryAndBackoff(ctx, rowsToUpdate); err != nil {
196+
log.Warn().Err(err).Msg("failed to update usage")
197+
continue
198+
}
199+
u.rowsToUpdate.Add(-rowsToUpdate)
200+
case <-u.done:
201+
remainingRows := u.rowsToUpdate.Load()
202+
if remainingRows != 0 {
203+
if err := u.updateUsageWithRetryAndBackoff(ctx, remainingRows); err != nil {
204+
u.closeError <- err
205+
return
206+
}
207+
u.rowsToUpdate.Add(-remainingRows)
208+
}
209+
u.closeError <- nil
210+
return
211+
}
212+
}
213+
}()
214+
<-started
215+
}
216+
217+
func (u *BatchUpdater) updateUsageWithRetryAndBackoff(ctx context.Context, numberToUpdate uint32) error {
218+
for retry := 0; retry < u.maxRetries; retry++ {
219+
queryStartTime := time.Now()
220+
221+
resp, err := u.apiClient.IncreaseTeamPluginUsageWithResponse(ctx, u.teamName, cqapi.IncreaseTeamPluginUsageJSONRequestBody{
222+
RequestId: uuid.New(),
223+
PluginTeam: u.pluginTeam,
224+
PluginKind: cqapi.PluginKind(u.pluginKind),
225+
PluginName: u.pluginName,
226+
Rows: int(numberToUpdate),
227+
})
228+
if err != nil {
229+
return fmt.Errorf("failed to update usage: %w", err)
230+
}
231+
if resp.StatusCode() >= 200 && resp.StatusCode() < 300 {
232+
u.lastUpdateTime = time.Now().UTC()
233+
return nil
234+
}
235+
236+
retryDuration, err := u.calculateRetryDuration(resp.StatusCode(), resp.HTTPResponse.Header, queryStartTime, retry)
237+
if err != nil {
238+
return fmt.Errorf("failed to calculate retry duration: %w", err)
239+
}
240+
if retryDuration > 0 {
241+
time.Sleep(retryDuration)
242+
}
243+
}
244+
return fmt.Errorf("failed to update usage: max retries exceeded")
245+
}
246+
247+
// calculateRetryDuration calculates the duration to sleep relative to the query start time before retrying an update
248+
func (u *BatchUpdater) calculateRetryDuration(statusCode int, headers http.Header, queryStartTime time.Time, retry int) (time.Duration, error) {
249+
if !retryableStatusCode(statusCode) {
250+
return 0, fmt.Errorf("non-retryable status code: %d", statusCode)
251+
}
252+
253+
// Check if we have a retry-after header
254+
retryAfter := headers.Get("Retry-After")
255+
if retryAfter != "" {
256+
retryDelay, err := time.ParseDuration(retryAfter + "s")
257+
if err != nil {
258+
return 0, fmt.Errorf("failed to parse retry-after header: %w", err)
259+
}
260+
return retryDelay, nil
261+
}
262+
263+
// Calculate exponential backoff
264+
baseRetry := min(time.Duration(1<<retry)*time.Second, u.maxWaitTime)
265+
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
266+
retryDelay := baseRetry + jitter
267+
return retryDelay - time.Since(queryStartTime), nil
268+
}
269+
270+
func retryableStatusCode(statusCode int) bool {
271+
return statusCode == http.StatusTooManyRequests || statusCode == http.StatusServiceUnavailable
272+
}

0 commit comments

Comments
 (0)