Skip to content

Commit 2918402

Browse files
authored
feat: Resource rate limit (#1433)
BEGIN_COMMIT_OVERRIDE feat: Introduce a per resource rate limit in addition to a global resource rate limit feat: Set default rate limit of `5` for `SingleResourceMaxConcurrency` and `SingleNestedTableMaxConcurrency` END_COMMIT_OVERRIDE This introduces a rate limit on a resource + table + client basis. Prior to this we only had rate limits on a per table+ client basis and all resources shared the same concurrency bucket This also adds a default value of 5 for both `singleResourceMaxConcurrency` and `singleNestedTableMaxConcurrency`
1 parent 9cd25eb commit 2918402

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

scheduler/scheduler.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
)
2626

2727
const (
28-
DefaultConcurrency = 50000
29-
DefaultMaxDepth = 4
30-
minTableConcurrency = 1
31-
minResourceConcurrency = 100
32-
otelName = "schedule"
28+
DefaultSingleResourceMaxConcurrency = 5
29+
DefaultSingleNestedTableMaxConcurrency = 5
30+
DefaultConcurrency = 50000
31+
DefaultMaxDepth = 4
32+
minTableConcurrency = 1
33+
minResourceConcurrency = 100
34+
otelName = "schedule"
3335
)
3436

3537
var ErrNoTables = errors.New("no tables specified for syncing, review `tables` and `skip_tables` in your config and specify at least one table to sync")
@@ -72,6 +74,12 @@ func WithSingleNestedTableMaxConcurrency(concurrency int64) Option {
7274
}
7375
}
7476

77+
func WithSingleResourceMaxConcurrency(concurrency int64) Option {
78+
return func(s *Scheduler) {
79+
s.singleResourceMaxConcurrency = concurrency
80+
}
81+
}
82+
7583
type SyncOption func(*syncClient)
7684

7785
func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption {
@@ -99,6 +107,9 @@ type Scheduler struct {
99107
singleTableConcurrency sync.Map
100108
// The maximum number of go routines that can be spawned for a single table+client pair
101109
singleNestedTableMaxConcurrency int64
110+
111+
// The maximum number of go routines that can be spawned for a specific resource
112+
singleResourceMaxConcurrency int64
102113
}
103114

104115
type syncClient struct {
@@ -113,9 +124,11 @@ type syncClient struct {
113124

114125
func NewScheduler(opts ...Option) *Scheduler {
115126
s := Scheduler{
116-
caser: caser.New(),
117-
concurrency: DefaultConcurrency,
118-
maxDepth: DefaultMaxDepth,
127+
caser: caser.New(),
128+
concurrency: DefaultConcurrency,
129+
maxDepth: DefaultMaxDepth,
130+
singleResourceMaxConcurrency: DefaultSingleResourceMaxConcurrency,
131+
singleNestedTableMaxConcurrency: DefaultSingleNestedTableMaxConcurrency,
119132
}
120133
for _, opt := range opts {
121134
opt(&s)
@@ -132,10 +145,6 @@ func NewScheduler(opts ...Option) *Scheduler {
132145
}
133146
s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency))
134147

135-
// To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency
136-
if s.singleNestedTableMaxConcurrency == 0 {
137-
s.singleNestedTableMaxConcurrency = int64(tableConcurrency)
138-
}
139148
return &s
140149
}
141150

scheduler/scheduler_dfs.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,28 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
135135
sentValidationErrors := sync.Map{}
136136
for i := range resourcesSlice {
137137
i := i
138-
if err := s.scheduler.resourceSem.Acquire(ctx, 1); err != nil {
138+
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
139+
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
140+
resourceSem := resourceSemVal.(*semaphore.Weighted)
141+
if err := resourceSem.Acquire(ctx, 1); err != nil {
139142
s.logger.Warn().Err(err).Msg("failed to acquire semaphore. context cancelled")
143+
// This means context was cancelled
144+
wg.Wait()
145+
// we have to continue emptying the channel to exit gracefully
146+
return
147+
}
148+
149+
// Once Resource semaphore is acquired we can acquire the global semaphore
150+
if err := s.scheduler.resourceSem.Acquire(ctx, 1); err != nil {
151+
// This means context was cancelled
152+
resourceSem.Release(1)
140153
wg.Wait()
141154
// we have to continue emptying the channel to exit gracefully
142155
return
143156
}
144157
wg.Add(1)
145158
go func() {
159+
defer resourceSem.Release(1)
146160
defer s.scheduler.resourceSem.Release(1)
147161
defer wg.Done()
148162
//nolint:all

0 commit comments

Comments
 (0)