Skip to content

Commit 226d3a0

Browse files
authored
Merge branch 'main' into feat/arrow_v18
2 parents 56bf438 + 88a7833 commit 226d3a0

File tree

5 files changed

+100
-3
lines changed

5 files changed

+100
-3
lines changed

.release-please-manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
".": "4.70.2"
2+
".": "4.71.0"
33
}

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [4.71.0](https://github.com/cloudquery/plugin-sdk/compare/v4.70.2...v4.71.0) (2024-12-09)
9+
10+
11+
### Features
12+
13+
* Implement batch sender. ([#1995](https://github.com/cloudquery/plugin-sdk/issues/1995)) ([371b20f](https://github.com/cloudquery/plugin-sdk/commit/371b20fd192e69681e07c79302e7a06fc89b4a71))
14+
815
## [4.70.2](https://github.com/cloudquery/plugin-sdk/compare/v4.70.1...v4.70.2) (2024-12-05)
916

1017

examples/simple_plugin/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.22.7
44

55
require (
66
github.com/apache/arrow-go/v18 v18.0.0
7-
github.com/cloudquery/plugin-sdk/v4 v4.70.2
7+
github.com/cloudquery/plugin-sdk/v4 v4.71.0
88
github.com/rs/zerolog v1.33.0
99
)
1010

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package batchsender
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/cloudquery/plugin-sdk/v4/helpers"
8+
)
9+
10+
const (
11+
batchSize = 100
12+
batchTimeout = 100 * time.Millisecond
13+
)
14+
15+
// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout.
16+
//
17+
// - If item is already a slice, it will be sent directly
18+
// - Otherwise, it will be added to the current batch
19+
// - If the current batch has reached the batch size, it will be sent immediately
20+
// - Otherwise, a timer will be started to send the current batch after the batch timeout
21+
type BatchSender struct {
22+
sendFn func(any)
23+
items []any
24+
timer *time.Timer
25+
itemsLock sync.Mutex
26+
}
27+
28+
func NewBatchSender(sendFn func(any)) *BatchSender {
29+
return &BatchSender{sendFn: sendFn}
30+
}
31+
32+
func (bs *BatchSender) Send(item any) {
33+
if bs.timer != nil {
34+
bs.timer.Stop()
35+
}
36+
37+
items := helpers.InterfaceSlice(item)
38+
39+
// If item is already a slice, send it directly
40+
// together with the current batch
41+
if len(items) > 1 {
42+
bs.flush(items...)
43+
return
44+
}
45+
46+
// Otherwise, add item to the current batch
47+
bs.appendToBatch(items...)
48+
49+
// If the current batch has reached the batch size, send it
50+
if len(bs.items) >= batchSize {
51+
bs.flush()
52+
return
53+
}
54+
55+
// Otherwise, start a timer to send the current batch after the batch timeout
56+
bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() })
57+
}
58+
59+
func (bs *BatchSender) appendToBatch(items ...any) {
60+
bs.itemsLock.Lock()
61+
defer bs.itemsLock.Unlock()
62+
63+
bs.items = append(bs.items, items...)
64+
}
65+
66+
func (bs *BatchSender) flush(items ...any) {
67+
bs.itemsLock.Lock()
68+
defer bs.itemsLock.Unlock()
69+
70+
bs.items = append(bs.items, items...)
71+
72+
if len(bs.items) == 0 {
73+
return
74+
}
75+
76+
bs.sendFn(bs.items)
77+
bs.items = nil
78+
}
79+
80+
func (bs *BatchSender) Close() {
81+
if bs.timer != nil {
82+
bs.timer.Stop()
83+
}
84+
bs.flush()
85+
}

scheduler/scheduler_dfs.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/cloudquery/plugin-sdk/v4/helpers"
12+
"github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender"
1213
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1415
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -121,9 +122,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
121122
}
122123
}()
123124

125+
batchSender := batchsender.NewBatchSender(func(item any) {
126+
s.resolveResourcesDfs(ctx, table, client, parent, item, resolvedResources, depth)
127+
})
124128
for r := range res {
125-
s.resolveResourcesDfs(ctx, table, client, parent, r, resolvedResources, depth)
129+
batchSender.Send(r)
126130
}
131+
batchSender.Close()
127132

128133
// we don't need any waitgroups here because we are waiting for the channel to close
129134
endTime := time.Now()

0 commit comments

Comments
 (0)