Skip to content

Commit 40a0095

Browse files
authored
feat: Add basic testing large syncs support (#1862)
#### Summary A very crude and simple way to do cloudquery/cloudquery-issues#1846. Adds a new hidden ~~fuzz~~ test scheduler that only multiplies the clients (at the moment). The code is based on the shuffle scheduler then adds then duplicates client based on the multiplier. See example: ``` cloudquery sync examples/pagerduty-postgres.yml Loading spec(s) from examples/pagerduty-postgres.yml Starting sync for: pagerduty (local@/Users/erezrokah/code/github/cloudquery/cloudquery-private/plugins/source/pagerduty/pagerduty) -> [postgresql (cloudquery/postgresql@v8.2.7)] Sync completed successfully. Resources: 525, Errors: 0, Warnings: 0, Time: 7s ``` ``` CQ_DEBUG_SYNC_MULTIPLIER=50 cloudquery sync examples/pagerduty-postgres.yml Loading spec(s) from examples/pagerduty-postgres.yml Starting sync for: pagerduty (local@/Users/erezrokah/code/github/cloudquery/cloudquery-private/plugins/source/pagerduty/pagerduty) -> [postgresql (cloudquery/postgresql@v8.2.7)] Sync completed successfully. Resources: 26385, Errors: 0, Warnings: 0, Time: 2m12s ``` This has a couple of downsides/tradeoffs 1. There will be clients with duplicate IDs which breaks the metrics counts https://github.com/cloudquery/plugin-sdk/blob/25ed3d25a529a22f351ab92e22fb03a19c9557d4/scheduler/metrics.go#L144 2. If a plugin uses the client ID to ensure uniqueness for state client keys, that logic will break too 3. If a table doesn't have any resources the impact of the multiplier will be lower However I think this is still useful if we want to artificially make a sync large (e.g. simulate a sync on many AWS accounts) ---
1 parent 25ed3d2 commit 40a0095

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

scheduler/scheduler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
217217
resources := make(chan *schema.Resource)
218218
go func() {
219219
defer close(resources)
220+
testMultiplier, err := getTestMultiplier()
221+
if err != nil {
222+
panic(err)
223+
}
224+
if testMultiplier > 0 {
225+
syncClient.syncTest(ctx, testMultiplier, resources)
226+
return
227+
}
220228
switch s.strategy {
221229
case StrategyDFS:
222230
syncClient.syncDfs(ctx, resources)

scheduler/scheduler_tezt.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
"sync"
9+
10+
"github.com/cloudquery/plugin-sdk/v4/schema"
11+
)
12+
13+
const (
14+
// This is an environment variable and not a spec option in each plugin to make it easier to enable it
15+
cqDebugSyncMultiplier = "CQ_DEBUG_SYNC_MULTIPLIER"
16+
)
17+
18+
func getTestMultiplier() (int, error) {
19+
strValue, ok := os.LookupEnv(cqDebugSyncMultiplier)
20+
if ok {
21+
intValue, err := strconv.Atoi(strValue)
22+
if err != nil {
23+
return 0, fmt.Errorf("failed to parse %s=%s as integer: %w", cqDebugSyncMultiplier, strValue, err)
24+
}
25+
return intValue, nil
26+
}
27+
return 0, nil
28+
}
29+
30+
func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedResources chan<- *schema.Resource) {
31+
// we have this because plugins can return sometimes clients in a random way which will cause
32+
// differences between this run and the next one.
33+
preInitialisedClients := make([][]schema.ClientMeta, len(s.tables))
34+
tableNames := make([]string, len(s.tables))
35+
for i, table := range s.tables {
36+
tableNames[i] = table.Name
37+
clients := []schema.ClientMeta{s.client}
38+
if table.Multiplex != nil {
39+
clients = table.Multiplex(s.client)
40+
}
41+
// Detect duplicate clients while multiplexing
42+
seenClients := make(map[string]bool)
43+
for _, c := range clients {
44+
if _, ok := seenClients[c.ID()]; !ok {
45+
seenClients[c.ID()] = true
46+
} else {
47+
s.logger.Warn().Str("client", c.ID()).Str("table", table.Name).Msg("multiplex returned duplicate client")
48+
}
49+
}
50+
preInitialisedClients[i] = clients
51+
// we do this here to avoid locks so we initialize the metrics structure once in the main goroutine
52+
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
53+
s.metrics.initWithClients(table, clients)
54+
}
55+
56+
// First interleave the tables like in round-robin
57+
tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
58+
// Then shuffle the tableClients to randomize the order in which they are retrieved.
59+
// We use a fixed seed so that runs with the same tables and clients perform similarly across syncs
60+
// however, if the table order changes, the seed will change and the shuffle order will be different,
61+
// so users have a little bit of control over the randomization.
62+
seed := hashTableNames(tableNames)
63+
allClients := make([]tableClient, 0, len(tableClients)*syncMultiplier)
64+
for _, tc := range tableClients {
65+
for i := 0; i < syncMultiplier; i++ {
66+
allClients = append(allClients, tc)
67+
}
68+
}
69+
shuffle(allClients, seed)
70+
71+
var wg sync.WaitGroup
72+
for _, tc := range allClients {
73+
table := tc.table
74+
cl := tc.client
75+
if err := s.scheduler.tableSems[0].Acquire(ctx, 1); err != nil {
76+
// This means context was cancelled
77+
wg.Wait()
78+
return
79+
}
80+
wg.Add(1)
81+
go func() {
82+
defer wg.Done()
83+
defer s.scheduler.tableSems[0].Release(1)
84+
// Not checking for error here as nothing much to do.
85+
// the error is logged and this happens when context is cancelled.
86+
// This currently uses the DFS algorithm to resolve the tables, but this
87+
// may change in the future.
88+
s.resolveTableDfs(ctx, table, cl, nil, resolvedResources, 1)
89+
}()
90+
}
91+
92+
// Wait for all the worker goroutines to finish
93+
wg.Wait()
94+
}

0 commit comments

Comments
 (0)