Skip to content

Commit 4d13b18

Browse files
authored
feat: Individual Table and Client rate limit (#1411)
#### Summary This pr introduces a rate semaphore that limits the number of simultaneous go routines that can be resolving the same table + client to just 10. Prior to this a single table could be using all of the available concurrency to resolve a single table. This alleviates the problem where nested tables get throttled because the load is not spread across all client/table pairs but is concentrated on a single client and table. I will update this PR with a benchmark case once #1410 gets merged
1 parent f5a0d47 commit 4d13b18

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

scheduler/scheduler.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"runtime/debug"
8+
"sync"
89
"sync/atomic"
910
"time"
1011

@@ -65,6 +66,12 @@ func WithStrategy(strategy Strategy) Option {
6566
}
6667
}
6768

69+
func WithSingleNestedTableMaxConcurrency(concurrency int64) Option {
70+
return func(s *Scheduler) {
71+
s.singleNestedTableMaxConcurrency = concurrency
72+
}
73+
}
74+
6875
type SyncOption func(*syncClient)
6976

7077
func WithSyncDeterministicCQID(deterministicCQID bool) SyncOption {
@@ -88,6 +95,10 @@ type Scheduler struct {
8895
// Logger to call, this logger is passed to the serve.Serve Client, if not defined Serve will create one instead.
8996
logger zerolog.Logger
9097
concurrency int
98+
// This Map holds all of the concurrency semaphores for each table+client pair.
99+
singleTableConcurrency sync.Map
100+
// The maximum number of go routines that can be spawned for a single table+client pair
101+
singleNestedTableMaxConcurrency int64
91102
}
92103

93104
type syncClient struct {
@@ -120,6 +131,11 @@ func NewScheduler(opts ...Option) *Scheduler {
120131
tableConcurrency = max(tableConcurrency/2, minTableConcurrency)
121132
}
122133
s.resourceSem = semaphore.NewWeighted(int64(resourceConcurrency))
134+
135+
// To preserve backwards compatibility, if singleTableMaxConcurrency is not set, set it to the max concurrency
136+
if s.singleNestedTableMaxConcurrency == 0 {
137+
s.singleNestedTableMaxConcurrency = int64(tableConcurrency)
138+
}
123139
return &s
124140
}
125141

scheduler/scheduler_dfs.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/getsentry/sentry-go"
1414
"go.opentelemetry.io/otel"
1515
"go.opentelemetry.io/otel/attribute"
16+
"golang.org/x/sync/semaphore"
1617
)
1718

1819
func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *schema.Resource) {
@@ -193,14 +194,26 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
193194
resolvedResources <- resource
194195
for _, relation := range resource.Table.Relations {
195196
relation := relation
197+
tableConcurrencyKey := table.Name + "-" + client.ID()
198+
// Acquire the semaphore for the table
199+
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency))
200+
tableSem := tableSemVal.(*semaphore.Weighted)
201+
if err := tableSem.Acquire(ctx, 1); err != nil {
202+
// This means context was cancelled
203+
wg.Wait()
204+
return
205+
}
206+
// Once table semaphore is acquired we can acquire the global semaphore
196207
if err := s.scheduler.tableSems[depth].Acquire(ctx, 1); err != nil {
197208
// This means context was cancelled
209+
tableSem.Release(1)
198210
wg.Wait()
199211
return
200212
}
201213
wg.Add(1)
202214
go func() {
203215
defer wg.Done()
216+
defer tableSem.Release(1)
204217
defer s.scheduler.tableSems[depth].Release(1)
205218
s.resolveTableDfs(ctx, relation, client, resource, resolvedResources, depth+1)
206219
}()

0 commit comments

Comments
 (0)