Skip to content

Commit af8ac87

Browse files
feat: Implement RandomQueue scheduler strategy (#1914)
This PR implements a new Scheduler Strategy based on a _Concurrent Random Queue_. It is based on @erezrokah 's Priority Queue Scheduler Strategy. ## How does it work This is hopefully a much simpler scheduling strategy. It doesn't have any semaphores; it just uses the existing concurrency setting. Table resolvers (and their relations) get `Push`ed into a work queue, and `concurrency` workers `Pull` from this queue, but they pull a random element from it. ## Why it should work better **The key benefit of this strategy is this:** - Assumption 1: most slow syncs are actually slow because of rate limits, not because of I/O limits or too much data. - Assumption 2: the meaty part of the sync is syncing relations, because each child table has a resolver per parent. - Benefit: because the likelihood of picking up a child resolver of a given table is uniformly distributed across the `int32` range, all relation API calls are evenly spread throughout the sync, thus optimally minimising rate limits! ## Does it work better? Still working on results. Notably AWS & Azure yield mixed results; still have to look into why. ### GCP **Before** ``` $ cli sync . Loading spec(s) from . Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 25799, Errors: 0, Warnings: 0, Time: 2m23s ``` UPDATE: GCP is moving to Round Robin strategy, and it's much faster with this strategy: ``` $ cli sync . Loading spec(s) from . Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26355, Errors: 0, Warnings: 0, Time: 40s ``` **After** ``` $ cli sync . Loading spec(s) from . Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26186, Errors: 0, Warnings: 0, Time: 34s ``` **Result: 76.22% reduction in time, or 3.21 times faster.** **Result against Round Robin: 15% reduction in time, or 0.18 times faster (probably within margin of error)** ### BigQuery **Before** ``` $ cli sync bigquery_to_postgresql.yaml Loading spec(s) from bigquery_to_postgresql.yaml Starting sync for: bigquery (cloudquery/[email protected]) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 2m7s ``` **After** ``` $ cli sync bigquery_to_postgresql.yaml Loading spec(s) from bigquery_to_postgresql.yaml Starting sync for: bigquery (cloudquery/[email protected]) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 1m26s ``` **Result: 32.28% reduction in time, or 0.48 times faster** ### SentinelOne **Before** (it was already quite fast due to previous merged improvement) ``` $ cli sync . Loading spec(s) from . Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 15s ``` **After** ``` $ cli sync . Loading spec(s) from . Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 8s ``` **Result: 46.67% reduction in time, or 0.875 times faster** ## How to test - Add a `go.mod` replace for sdk: `replace github.com/cloudquery/plugin-sdk/v4 => github.com/cloudquery/plugin-sdk/v4 v4.63.1-0.20241002131015-243705c940c6` (check last commit on this PR) - Run source plugin via grpc locally; make sure to configure the scheduler strategy to `scheduler.StrategyRandomQueue`. ## How scary is it to merge - This scheduler strategy is not used by any plugins by default, so in principle this should be safe to merge. --------- Co-authored-by: erezrokah <[email protected]>
1 parent 38b4bfd commit af8ac87

15 files changed

+681
-96
lines changed
Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package metrics
22

33
import (
44
"context"
@@ -12,6 +12,10 @@ import (
1212
"go.opentelemetry.io/otel/metric"
1313
)
1414

15+
const (
16+
OtelName = "io.cloudquery"
17+
)
18+
1519
// Metrics is deprecated as we move toward open telemetry for tracing and metrics
1620
type Metrics struct {
1721
TableClient map[string]map[string]*TableClientMetrics
@@ -82,39 +86,39 @@ func (s *Metrics) Equal(other *Metrics) bool {
8286
}
8387

8488
func getOtelMeters(tableName string, clientID string) *OtelMeters {
85-
resources, err := otel.Meter(otelName).Int64Counter("sync.table.resources",
89+
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
8690
metric.WithDescription("Number of resources synced for a table"),
8791
metric.WithUnit("/{tot}"),
8892
)
8993
if err != nil {
9094
return nil
9195
}
9296

93-
errors, err := otel.Meter(otelName).Int64Counter("sync.table.errors",
97+
errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors",
9498
metric.WithDescription("Number of errors encountered while syncing a table"),
9599
metric.WithUnit("/{tot}"),
96100
)
97101
if err != nil {
98102
return nil
99103
}
100104

101-
panics, err := otel.Meter(otelName).Int64Counter("sync.table.panics",
105+
panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics",
102106
metric.WithDescription("Number of panics encountered while syncing a table"),
103107
metric.WithUnit("/{tot}"),
104108
)
105109
if err != nil {
106110
return nil
107111
}
108112

109-
startTime, err := otel.Meter(otelName).Int64Counter("sync.table.start_time",
113+
startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time",
110114
metric.WithDescription("Start time of syncing a table"),
111115
metric.WithUnit("ns"),
112116
)
113117
if err != nil {
114118
return nil
115119
}
116120

117-
endTime, err := otel.Meter(otelName).Int64Counter("sync.table.end_time",
121+
endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time",
118122
metric.WithDescription("End time of syncing a table"),
119123
metric.WithUnit("ns"),
120124
)
@@ -136,7 +140,7 @@ func getOtelMeters(tableName string, clientID string) *OtelMeters {
136140
}
137141
}
138142

139-
func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) {
143+
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
140144
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
141145
for _, client := range clients {
142146
tableName := table.Name
@@ -146,7 +150,7 @@ func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMe
146150
}
147151
}
148152
for _, relation := range table.Relations {
149-
s.initWithClients(relation, clients)
153+
s.InitWithClients(relation, clients)
150154
}
151155
}
152156

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package metrics
22

33
import "testing"
44

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package queue
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
)
7+
8+
// activeWorkSignal is a thread-safe coordinator for awaiting a worker pool
9+
// that relies on a queue that might be temporarily empty.
10+
//
11+
// If queue is empty and workers idle, done!
12+
//
13+
// If the queue is empty but a worker is working on a task, we must wait and check
14+
// if it's empty after that worker finishes. That's why we need this.
15+
//
16+
// Use it like this:
17+
//
18+
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
19+
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
20+
//
21+
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
22+
// - If workers are still active, call `Wait()` to block until state changes.
23+
type activeWorkSignal struct {
24+
countChangeSignal *sync.Cond
25+
activeWorkUnitCount *atomic.Int32
26+
isStarted *atomic.Bool
27+
}
28+
29+
func newActiveWorkSignal() *activeWorkSignal {
30+
return &activeWorkSignal{
31+
countChangeSignal: sync.NewCond(&sync.Mutex{}),
32+
activeWorkUnitCount: &atomic.Int32{},
33+
isStarted: &atomic.Bool{},
34+
}
35+
}
36+
37+
// Add means a worker has started working on a task.
38+
//
39+
// Wake up the work queuing goroutine.
40+
func (s *activeWorkSignal) Add() {
41+
s.activeWorkUnitCount.Add(1)
42+
s.isStarted.Store(true)
43+
s.countChangeSignal.Signal()
44+
}
45+
46+
// Done means a worker has finished working on a task.
47+
//
48+
// If the count became zero, wake up the work queuing goroutine (might have finished).
49+
func (s *activeWorkSignal) Done() {
50+
s.activeWorkUnitCount.Add(-1)
51+
s.countChangeSignal.Signal()
52+
}
53+
54+
// IsIdle returns true if no workers are active. If queue is empty and workers idle, done!
55+
func (s *activeWorkSignal) IsIdle() bool {
56+
return s.isStarted.Load() && s.activeWorkUnitCount.Load() <= 0
57+
}
58+
59+
// Wait blocks until the count of active workers changes.
60+
func (s *activeWorkSignal) Wait() {
61+
if s.activeWorkUnitCount.Load() <= 0 {
62+
return
63+
}
64+
s.countChangeSignal.L.Lock()
65+
defer s.countChangeSignal.L.Unlock()
66+
s.countChangeSignal.Wait()
67+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package queue
2+
3+
import (
4+
"math/rand"
5+
"sync"
6+
)
7+
8+
// ConcurrentRandomQueue is a generic, thread-safe queue
9+
// that pops random elements in O(1) time.
10+
type ConcurrentRandomQueue[T any] struct {
11+
mu sync.Mutex
12+
queue []T
13+
random *rand.Rand
14+
}
15+
16+
func NewConcurrentRandomQueue[T any](seed int64, capacityHint int) *ConcurrentRandomQueue[T] {
17+
return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint), random: rand.New(rand.NewSource(seed))}
18+
}
19+
20+
func (q *ConcurrentRandomQueue[T]) Push(item T) {
21+
q.mu.Lock()
22+
defer q.mu.Unlock()
23+
24+
q.queue = append(q.queue, item)
25+
}
26+
27+
func (q *ConcurrentRandomQueue[T]) Pop() *T {
28+
q.mu.Lock()
29+
defer q.mu.Unlock()
30+
31+
if len(q.queue) == 0 {
32+
return nil
33+
}
34+
idx := q.random.Intn(len(q.queue))
35+
lastIdx := len(q.queue) - 1
36+
q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx]
37+
item := q.queue[lastIdx]
38+
q.queue = q.queue[:lastIdx]
39+
40+
return &item
41+
}

scheduler/queue/scheduler.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
6+
"github.com/cloudquery/plugin-sdk/v4/caser"
7+
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
8+
"github.com/cloudquery/plugin-sdk/v4/schema"
9+
"github.com/google/uuid"
10+
"github.com/rs/zerolog"
11+
"golang.org/x/sync/errgroup"
12+
)
13+
14+
const DefaultWorkerCount = 1000
15+
16+
// WorkUnit is an atomic unit of work that the scheduler syncs.
17+
//
18+
// It is one table resolver (same as all other scheduler strategies).
19+
//
20+
// But if it is a non-top-level table, it is bound to a single parent resource.
21+
type WorkUnit struct {
22+
Table *schema.Table
23+
Client schema.ClientMeta
24+
Parent *schema.Resource
25+
}
26+
27+
type Scheduler struct {
28+
workerCount int
29+
logger zerolog.Logger
30+
caser *caser.Caser
31+
deterministicCQID bool
32+
metrics *metrics.Metrics
33+
invocationID string
34+
seed int64
35+
}
36+
37+
type Option func(*Scheduler)
38+
39+
func WithWorkerCount(workerCount int) Option {
40+
return func(d *Scheduler) {
41+
d.workerCount = workerCount
42+
}
43+
}
44+
45+
func WithCaser(c *caser.Caser) Option {
46+
return func(d *Scheduler) {
47+
d.caser = c
48+
}
49+
}
50+
51+
func WithDeterministicCQID(deterministicCQID bool) Option {
52+
return func(d *Scheduler) {
53+
d.deterministicCQID = deterministicCQID
54+
}
55+
}
56+
57+
func WithInvocationID(invocationID string) Option {
58+
return func(d *Scheduler) {
59+
d.invocationID = invocationID
60+
}
61+
}
62+
63+
func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed int64, opts ...Option) *Scheduler {
64+
scheduler := &Scheduler{
65+
logger: logger,
66+
metrics: m,
67+
workerCount: DefaultWorkerCount,
68+
caser: caser.New(),
69+
invocationID: uuid.New().String(),
70+
seed: seed,
71+
}
72+
73+
for _, opt := range opts {
74+
opt(scheduler)
75+
}
76+
77+
return scheduler
78+
}
79+
80+
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
81+
if len(tableClients) == 0 {
82+
return
83+
}
84+
queue := NewConcurrentRandomQueue[WorkUnit](d.seed, len(tableClients))
85+
for _, tc := range tableClients {
86+
queue.Push(tc)
87+
}
88+
89+
jobs := make(chan *WorkUnit)
90+
activeWorkSignal := newActiveWorkSignal()
91+
92+
// Worker pool
93+
workerPool, _ := errgroup.WithContext(ctx)
94+
for w := 0; w < d.workerCount; w++ {
95+
workerPool.Go(func() error {
96+
newWorker(
97+
jobs,
98+
queue,
99+
resolvedResources,
100+
d.logger,
101+
d.caser,
102+
d.invocationID,
103+
d.deterministicCQID,
104+
d.metrics,
105+
).work(ctx, activeWorkSignal)
106+
return nil
107+
})
108+
}
109+
110+
// Work distribution
111+
go func() {
112+
defer close(jobs)
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
return
117+
default:
118+
item := queue.Pop()
119+
120+
// There is work to do
121+
if item != nil {
122+
jobs <- item
123+
continue
124+
}
125+
126+
// Queue is empty and no active work, done!
127+
if activeWorkSignal.IsIdle() {
128+
return
129+
}
130+
131+
// Queue is empty and there is active work, wait for changes
132+
activeWorkSignal.Wait()
133+
}
134+
}
135+
}()
136+
137+
_ = workerPool.Wait()
138+
}

0 commit comments

Comments
 (0)