Skip to content

Commit adce99c

Browse files
authored
feat: Update concurrency formula. (#1907)
## The Problem There's a "bug" in the concurrency formula on the scheduler (all strategies). This is the formula: ```go tableConcurrency := max(s.concurrency/minResourceConcurrency, minTableConcurrency) resourceConcurrency := tableConcurrency * minResourceConcurrency ``` But these values are hardcoded: ```go minResourceConcurrency := 100 minTableConcurrency := 1 ``` So if you replace: ```go tableConcurrency := max(s.concurrency/100, 1) resourceConcurrency := tableConcurrency * 100 ``` This means that any plugin whose default concurrency is `<= 100` will have a table concurrency of `1`, even if it's the only table being synced (assuming no one changes the default). ## The Fix I made a very subtle change in the formula. Only if concurrency is `<= 100`, I change the `minResourceConcurrency` to `concurrency/10`. This decreases the resource concurrency up to 10x (that we don't seem to be hitting anyway), and increases the table concurrency up to 10x. ## Plugins affected (on default concurrency) - bamboo-hr - bigquery - ~clickhouse~ (doesn't use scheduler) - confluence - crowddev - ~file~ (doesn't use scheduler) - leanix - oracledb - ~s3~ (doesn't use scheduler) - sentinelone - servicenow - shopify - sonarqube - statuspage ## The Results I'm still working on the results (it's trickier than it seems). In principle, they are very encouraging: ### BigQuery **Before** ``` $ cli sync bigquery_to_postgresql.yaml Loading spec(s) from bigquery_to_postgresql.yaml Starting sync for: bigquery (cloudquery/[email protected]) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 2m4s ``` **After** ``` $ cli sync bigquery_to_postgresql.yaml Loading spec(s) from bigquery_to_postgresql.yaml Starting sync for: bigquery (cloudquery/[email protected]) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 1m27s ``` **Result** 1.43x of regular speed (43% faster) ### Sentinelone **Before** ``` $ cli sync . Loading spec(s) from . Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 1231, Errors: 0, Warnings: 0, Time: 1m4s ``` **After** ``` $ cli sync . Loading spec(s) from . Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 1231, Errors: 0, Warnings: 0, Time: 15s ``` **Result** 4.27x of regular speed (327% faster) ### Sonarqube **Before** ``` $ cli sync sonarqube_to_postgresql.yaml Loading spec(s) from sonarqube_to_postgresql.yaml Starting sync for: sonarqube (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 4594, Errors: 0, Warnings: 0, Time: 39s ``` **After** ``` $ cli sync sonarqube_to_postgresql.yaml Loading spec(s) from sonarqube_to_postgresql.yaml Starting sync for: sonarqube (grpc@localhost:7777) -> [postgresql (cloudquery/[email protected])] Sync completed successfully. Resources: 4594, Errors: 0, Warnings: 0, Time: 22s ``` **Result** 1.77x of regular speed (77% faster)
1 parent bea3b00 commit adce99c

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

scheduler/scheduler.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,16 @@ func NewScheduler(opts ...Option) *Scheduler {
159159
for _, opt := range opts {
160160
opt(&s)
161161
}
162+
163+
actualMinResourceConcurrency := minResourceConcurrency
164+
if s.concurrency <= minResourceConcurrency {
165+
actualMinResourceConcurrency = max(s.concurrency/10, 1)
166+
}
167+
162168
// This is very similar to the concurrent web crawler problem with some minor changes.
163169
// We are using DFS/Round-Robin to make sure memory usage is capped at O(h) where h is the height of the tree.
164-
tableConcurrency := max(s.concurrency/minResourceConcurrency, minTableConcurrency)
165-
resourceConcurrency := tableConcurrency * minResourceConcurrency
170+
tableConcurrency := max(s.concurrency/actualMinResourceConcurrency, minTableConcurrency)
171+
resourceConcurrency := tableConcurrency * actualMinResourceConcurrency
166172
s.tableSems = make([]*semaphore.Weighted, s.maxDepth)
167173
for i := uint64(0); i < s.maxDepth; i++ {
168174
s.tableSems[i] = semaphore.NewWeighted(int64(tableConcurrency))

0 commit comments

Comments
 (0)