Skip to content

Commit 371b20f

Browse files
authored
feat: Implement batch sender. (#1995)
## Problem The DFS scheduler implements concurrent resolution of resources, but only if resources are sent in batches to the top-level `res` channel. Thus, the same plugin syncs a lot quicker simply by batching resources before sending them through the channel. ## Solution this PR implements ``` // BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout. // // - If item is already a slice, it will be sent directly // - Otherwise, it will be added to the current batch // - If the current batch has reached the batch size, it will be sent immediately // - Otherwise, a timer will be started to send the current batch after the batch timeout type BatchSender struct { ``` Currently with a batch timeout of `100ms` and batch size of `100`, it manages a 5x improvement on a sync that reproduces the issue brought forward by the community (https://github.com/jeromewir/cq-source-concurrency-childtable-example): <img width="982" alt="Screenshot 2024-12-03 at 17 17 23" src="https://github.com/user-attachments/assets/7a1f7dc5-4aca-4263-8aa6-0ec10d73cca3"> The PR tries to touch the least amount of scheduler code, and stays away from tricky language constructs as much as possible.
1 parent 58a1635 commit 371b20f

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package batchsender
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/cloudquery/plugin-sdk/v4/helpers"
8+
)
9+
10+
const (
11+
batchSize = 100
12+
batchTimeout = 100 * time.Millisecond
13+
)
14+
15+
// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout.
16+
//
17+
// - If item is already a slice, it will be sent directly
18+
// - Otherwise, it will be added to the current batch
19+
// - If the current batch has reached the batch size, it will be sent immediately
20+
// - Otherwise, a timer will be started to send the current batch after the batch timeout
21+
type BatchSender struct {
22+
sendFn func(any)
23+
items []any
24+
timer *time.Timer
25+
itemsLock sync.Mutex
26+
}
27+
28+
func NewBatchSender(sendFn func(any)) *BatchSender {
29+
return &BatchSender{sendFn: sendFn}
30+
}
31+
32+
func (bs *BatchSender) Send(item any) {
33+
if bs.timer != nil {
34+
bs.timer.Stop()
35+
}
36+
37+
items := helpers.InterfaceSlice(item)
38+
39+
// If item is already a slice, send it directly
40+
// together with the current batch
41+
if len(items) > 1 {
42+
bs.flush(items...)
43+
return
44+
}
45+
46+
// Otherwise, add item to the current batch
47+
bs.appendToBatch(items...)
48+
49+
// If the current batch has reached the batch size, send it
50+
if len(bs.items) >= batchSize {
51+
bs.flush()
52+
return
53+
}
54+
55+
// Otherwise, start a timer to send the current batch after the batch timeout
56+
bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() })
57+
}
58+
59+
func (bs *BatchSender) appendToBatch(items ...any) {
60+
bs.itemsLock.Lock()
61+
defer bs.itemsLock.Unlock()
62+
63+
bs.items = append(bs.items, items...)
64+
}
65+
66+
func (bs *BatchSender) flush(items ...any) {
67+
bs.itemsLock.Lock()
68+
defer bs.itemsLock.Unlock()
69+
70+
bs.items = append(bs.items, items...)
71+
72+
if len(bs.items) == 0 {
73+
return
74+
}
75+
76+
bs.sendFn(bs.items)
77+
bs.items = nil
78+
}
79+
80+
func (bs *BatchSender) Close() {
81+
if bs.timer != nil {
82+
bs.timer.Stop()
83+
}
84+
bs.flush()
85+
}

scheduler/scheduler_dfs.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/cloudquery/plugin-sdk/v4/helpers"
12+
"github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender"
1213
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1415
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -121,9 +122,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
121122
}
122123
}()
123124

125+
batchSender := batchsender.NewBatchSender(func(item any) {
126+
s.resolveResourcesDfs(ctx, table, client, parent, item, resolvedResources, depth)
127+
})
124128
for r := range res {
125-
s.resolveResourcesDfs(ctx, table, client, parent, r, resolvedResources, depth)
129+
batchSender.Send(r)
126130
}
131+
batchSender.Close()
127132

128133
// we don't need any waitgroups here because we are waiting for the channel to close
129134
endTime := time.Now()

0 commit comments

Comments
 (0)