Skip to content

Commit 2b1ba30

Browse files
authored
feat: Add shuffle scheduler (#1218)
Falling somewhere between DFS and Round Robin, the `shuffle` scheduler uses randomization with a fixed seed to spread the load across tables and clients more evenly.
1 parent 44956d9 commit 2b1ba30

File tree

2 files changed

+81
-1
lines changed

2 files changed

+81
-1
lines changed

scheduler/scheduler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
const (
3535
StrategyDFS Strategy = iota
3636
StrategyRoundRobin
37+
StrategyShuffle
3738
)
3839

3940
type Strategy int
@@ -84,10 +85,11 @@ func (s *Strategy) Validate() error {
8485
return fmt.Errorf("unknown scheduler strategy: %d", s)
8586
}
8687

87-
var AllStrategies = Strategies{StrategyDFS, StrategyRoundRobin}
88+
var AllStrategies = Strategies{StrategyDFS, StrategyRoundRobin, StrategyShuffle}
8889
var AllStrategyNames = [...]string{
8990
StrategyDFS: "dfs",
9091
StrategyRoundRobin: "round-robin",
92+
StrategyShuffle: "shuffle",
9193
}
9294

9395
func StrategyForName(s string) (Strategy, error) {
@@ -250,6 +252,8 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
250252
syncClient.syncDfs(ctx, resources)
251253
case StrategyRoundRobin:
252254
syncClient.syncRoundRobin(ctx, resources)
255+
case StrategyShuffle:
256+
syncClient.syncShuffle(ctx, resources)
253257
default:
254258
panic(fmt.Errorf("unknown scheduler %s", s.strategy.String()))
255259
}

scheduler/scheduler_shuffle.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"hash/fnv"
6+
"math/rand"
7+
"strings"
8+
"sync"
9+
10+
"github.com/cloudquery/plugin-sdk/v4/schema"
11+
)
12+
13+
func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *schema.Resource) {
14+
// We have this because plugins can return sometimes clients in a random way which will cause
15+
// differences between this run and the next one.
16+
preInitialisedClients := make([][]schema.ClientMeta, len(s.tables))
17+
tableNames := make([]string, len(s.tables))
18+
for i, table := range s.tables {
19+
tableNames[i] = table.Name
20+
clients := []schema.ClientMeta{s.client}
21+
if table.Multiplex != nil {
22+
clients = table.Multiplex(s.client)
23+
}
24+
preInitialisedClients[i] = clients
25+
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
26+
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
27+
s.metrics.initWithClients(table, clients)
28+
}
29+
30+
// First interleave the tables like in round-robin
31+
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
32+
33+
// Then shuffle the tableClients to randomize the order in which they are retrieved.
34+
// We use a fixed seed so that runs with the same tables and clients perform similarly across syncs
35+
// however, if the table order changes, the seed will change and the shuffle order will be different,
36+
// so users have a little bit of control over the randomization.
37+
seed := hashTableNames(tableNames)
38+
shuffle(tableClients, seed)
39+
40+
var wg sync.WaitGroup
41+
for _, tc := range tableClients {
42+
table := tc.table
43+
cl := tc.client
44+
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
45+
// This means context was cancelled
46+
wg.Wait()
47+
return
48+
}
49+
wg.Add(1)
50+
go func() {
51+
defer wg.Done()
52+
defer s.scheduler.tableSems[0].Release(1)
53+
// Not checking for error here as nothing much to do.
54+
// the error is logged and this happens when context is cancelled.
55+
// This currently uses the DFS algorithm to resolve the tables, but this
56+
// may change in the future.
57+
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
58+
}()
59+
}
60+
61+
// Wait for all the worker goroutines to finish
62+
wg.Wait()
63+
}
64+
65+
func hashTableNames(tableNames []string) int64 {
66+
h := fnv.New32a()
67+
h.Write([]byte(strings.Join(tableNames, ",")))
68+
return int64(h.Sum32())
69+
}
70+
71+
func shuffle(tableClients []tableClient, seed int64) {
72+
r := rand.New(rand.NewSource(seed))
73+
r.Shuffle(len(tableClients), func(i, j int) {
74+
tableClients[i], tableClients[j] = tableClients[j], tableClients[i]
75+
})
76+
}

0 commit comments

Comments
 (0)