Skip to content

Commit 00a12c7

Browse files
authored
Merge branch 'main' into fix/reintroduce-streamingbatchwriterfix
2 parents 33b6e39 + abfbd1b commit 00a12c7

18 files changed

+707
-98
lines changed

.release-please-manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
".": "4.64.1"
2+
".": "4.65.0"
33
}

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [4.65.0](https://github.com/cloudquery/plugin-sdk/compare/v4.64.1...v4.65.0) (2024-10-04)
9+
10+
11+
### Features
12+
13+
* Implement RandomQueue scheduler strategy ([#1914](https://github.com/cloudquery/plugin-sdk/issues/1914)) ([af8ac87](https://github.com/cloudquery/plugin-sdk/commit/af8ac87178cc318d2f31cd17efc7c921d6d52e6b))
14+
15+
16+
### Bug Fixes
17+
18+
* Revert "fix: Error handling in StreamingBatchWriter" ([#1918](https://github.com/cloudquery/plugin-sdk/issues/1918)) ([38b4bfd](https://github.com/cloudquery/plugin-sdk/commit/38b4bfd20e17a00d5a2c83e1d48b8b16270592ba))
19+
* **tests:** WriterTestSuite.handleNulls should not overwrite columns ([#1920](https://github.com/cloudquery/plugin-sdk/issues/1920)) ([08e18e2](https://github.com/cloudquery/plugin-sdk/commit/08e18e265dfb7e6e77c32244f56acd0f63bf4ead))
20+
821
## [4.64.1](https://github.com/cloudquery/plugin-sdk/compare/v4.64.0...v4.64.1) (2024-10-02)
922

1023

examples/simple_plugin/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.22.0
44

55
require (
66
github.com/apache/arrow/go/v17 v17.0.0
7-
github.com/cloudquery/plugin-sdk/v4 v4.64.1
7+
github.com/cloudquery/plugin-sdk/v4 v4.65.0
88
github.com/rs/zerolog v1.33.0
99
)
1010

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package metrics
22

33
import (
44
"context"
@@ -12,6 +12,10 @@ import (
1212
"go.opentelemetry.io/otel/metric"
1313
)
1414

15+
const (
16+
OtelName = "io.cloudquery"
17+
)
18+
1519
// Metrics is deprecated as we move toward open telemetry for tracing and metrics
1620
type Metrics struct {
1721
TableClient map[string]map[string]*TableClientMetrics
@@ -82,39 +86,39 @@ func (s *Metrics) Equal(other *Metrics) bool {
8286
}
8387

8488
func getOtelMeters(tableName string, clientID string) *OtelMeters {
85-
resources, err := otel.Meter(otelName).Int64Counter("sync.table.resources",
89+
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
8690
metric.WithDescription("Number of resources synced for a table"),
8791
metric.WithUnit("/{tot}"),
8892
)
8993
if err != nil {
9094
return nil
9195
}
9296

93-
errors, err := otel.Meter(otelName).Int64Counter("sync.table.errors",
97+
errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors",
9498
metric.WithDescription("Number of errors encountered while syncing a table"),
9599
metric.WithUnit("/{tot}"),
96100
)
97101
if err != nil {
98102
return nil
99103
}
100104

101-
panics, err := otel.Meter(otelName).Int64Counter("sync.table.panics",
105+
panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics",
102106
metric.WithDescription("Number of panics encountered while syncing a table"),
103107
metric.WithUnit("/{tot}"),
104108
)
105109
if err != nil {
106110
return nil
107111
}
108112

109-
startTime, err := otel.Meter(otelName).Int64Counter("sync.table.start_time",
113+
startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time",
110114
metric.WithDescription("Start time of syncing a table"),
111115
metric.WithUnit("ns"),
112116
)
113117
if err != nil {
114118
return nil
115119
}
116120

117-
endTime, err := otel.Meter(otelName).Int64Counter("sync.table.end_time",
121+
endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time",
118122
metric.WithDescription("End time of syncing a table"),
119123
metric.WithUnit("ns"),
120124
)
@@ -136,7 +140,7 @@ func getOtelMeters(tableName string, clientID string) *OtelMeters {
136140
}
137141
}
138142

139-
func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMeta) {
143+
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
140144
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
141145
for _, client := range clients {
142146
tableName := table.Name
@@ -146,7 +150,7 @@ func (s *Metrics) initWithClients(table *schema.Table, clients []schema.ClientMe
146150
}
147151
}
148152
for _, relation := range table.Relations {
149-
s.initWithClients(relation, clients)
153+
s.InitWithClients(relation, clients)
150154
}
151155
}
152156

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package scheduler
1+
package metrics
22

33
import "testing"
44

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package queue
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
// activeWorkSignal is a thread-safe coordinator for awaiting a worker pool
10+
// that relies on a queue that might be temporarily empty.
11+
//
12+
// If queue is empty and workers idle, done!
13+
//
14+
// If the queue is empty but a worker is working on a task, we must wait and check
15+
// if it's empty after that worker finishes. That's why we need this.
16+
//
17+
// Use it like this:
18+
//
19+
// - When a worker picks up a task, call `Add()` (like a WaitGroup)
20+
// - When a worker finishes a task, call `Done()` (like a WaitGroup)
21+
//
22+
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
23+
// - If workers are still active, call `Wait()` to block until state changes.
24+
type activeWorkSignal struct {
25+
countChangeSignal *sync.Cond
26+
activeWorkUnitCount *atomic.Int32
27+
isStarted *atomic.Bool
28+
}
29+
30+
func newActiveWorkSignal() *activeWorkSignal {
31+
return &activeWorkSignal{
32+
countChangeSignal: sync.NewCond(&sync.Mutex{}),
33+
activeWorkUnitCount: &atomic.Int32{},
34+
isStarted: &atomic.Bool{},
35+
}
36+
}
37+
38+
// Add means a worker has started working on a task.
39+
//
40+
// Wake up the work queuing goroutine.
41+
func (s *activeWorkSignal) Add() {
42+
s.activeWorkUnitCount.Add(1)
43+
s.isStarted.Store(true)
44+
s.countChangeSignal.Signal()
45+
}
46+
47+
// Done means a worker has finished working on a task.
48+
//
49+
// If the count became zero, wake up the work queuing goroutine (might have finished).
50+
func (s *activeWorkSignal) Done() {
51+
s.activeWorkUnitCount.Add(-1)
52+
s.countChangeSignal.Signal()
53+
}
54+
55+
// IsIdle returns true if no workers are active. If queue is empty and workers idle, done!
56+
func (s *activeWorkSignal) IsIdle() bool {
57+
return s.isStarted.Load() && s.activeWorkUnitCount.Load() <= 0
58+
}
59+
60+
// Wait blocks until the count of active workers changes.
61+
func (s *activeWorkSignal) Wait() {
62+
// A race condition is possible when the last active table asynchronously
63+
// queues a relation. The table finishes (calling `.Done()`) a moment
64+
// before the queue receives the `.Push()`. At this point, the queue is
65+
// empty and there are no active workers.
66+
//
67+
// A moment later, the queue receives the `.Push()` and queues a new task.
68+
//
69+
// This is a very infrequent case according to tests, but it happens.
70+
time.Sleep(10 * time.Millisecond)
71+
72+
if s.activeWorkUnitCount.Load() <= 0 {
73+
return
74+
}
75+
s.countChangeSignal.L.Lock()
76+
defer s.countChangeSignal.L.Unlock()
77+
s.countChangeSignal.Wait()
78+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package queue
2+
3+
import (
4+
"math/rand"
5+
"sync"
6+
)
7+
8+
// ConcurrentRandomQueue is a generic, thread-safe queue
9+
// that pops random elements in O(1) time.
10+
type ConcurrentRandomQueue[T any] struct {
11+
mu sync.Mutex
12+
queue []T
13+
random *rand.Rand
14+
}
15+
16+
func NewConcurrentRandomQueue[T any](seed int64, capacityHint int) *ConcurrentRandomQueue[T] {
17+
return &ConcurrentRandomQueue[T]{queue: make([]T, 0, capacityHint), random: rand.New(rand.NewSource(seed))}
18+
}
19+
20+
func (q *ConcurrentRandomQueue[T]) Push(item T) {
21+
q.mu.Lock()
22+
defer q.mu.Unlock()
23+
24+
q.queue = append(q.queue, item)
25+
}
26+
27+
func (q *ConcurrentRandomQueue[T]) Pop() *T {
28+
q.mu.Lock()
29+
defer q.mu.Unlock()
30+
31+
if len(q.queue) == 0 {
32+
return nil
33+
}
34+
idx := q.random.Intn(len(q.queue))
35+
lastIdx := len(q.queue) - 1
36+
q.queue[idx], q.queue[lastIdx] = q.queue[lastIdx], q.queue[idx]
37+
item := q.queue[lastIdx]
38+
q.queue = q.queue[:lastIdx]
39+
40+
return &item
41+
}

scheduler/queue/scheduler.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package queue
2+
3+
import (
4+
"context"
5+
6+
"github.com/cloudquery/plugin-sdk/v4/caser"
7+
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
8+
"github.com/cloudquery/plugin-sdk/v4/schema"
9+
"github.com/google/uuid"
10+
"github.com/rs/zerolog"
11+
"golang.org/x/sync/errgroup"
12+
)
13+
14+
const DefaultWorkerCount = 1000
15+
16+
// WorkUnit is an atomic unit of work that the scheduler syncs.
17+
//
18+
// It is one table resolver (same as all other scheduler strategies).
19+
//
20+
// But if it is a non-top-level table, it is bound to a single parent resource.
21+
type WorkUnit struct {
22+
Table *schema.Table
23+
Client schema.ClientMeta
24+
Parent *schema.Resource
25+
}
26+
27+
type Scheduler struct {
28+
workerCount int
29+
logger zerolog.Logger
30+
caser *caser.Caser
31+
deterministicCQID bool
32+
metrics *metrics.Metrics
33+
invocationID string
34+
seed int64
35+
}
36+
37+
type Option func(*Scheduler)
38+
39+
func WithWorkerCount(workerCount int) Option {
40+
return func(d *Scheduler) {
41+
d.workerCount = workerCount
42+
}
43+
}
44+
45+
func WithCaser(c *caser.Caser) Option {
46+
return func(d *Scheduler) {
47+
d.caser = c
48+
}
49+
}
50+
51+
func WithDeterministicCQID(deterministicCQID bool) Option {
52+
return func(d *Scheduler) {
53+
d.deterministicCQID = deterministicCQID
54+
}
55+
}
56+
57+
func WithInvocationID(invocationID string) Option {
58+
return func(d *Scheduler) {
59+
d.invocationID = invocationID
60+
}
61+
}
62+
63+
func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed int64, opts ...Option) *Scheduler {
64+
scheduler := &Scheduler{
65+
logger: logger,
66+
metrics: m,
67+
workerCount: DefaultWorkerCount,
68+
caser: caser.New(),
69+
invocationID: uuid.New().String(),
70+
seed: seed,
71+
}
72+
73+
for _, opt := range opts {
74+
opt(scheduler)
75+
}
76+
77+
return scheduler
78+
}
79+
80+
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
81+
if len(tableClients) == 0 {
82+
return
83+
}
84+
queue := NewConcurrentRandomQueue[WorkUnit](d.seed, len(tableClients))
85+
for _, tc := range tableClients {
86+
queue.Push(tc)
87+
}
88+
89+
jobs := make(chan *WorkUnit)
90+
activeWorkSignal := newActiveWorkSignal()
91+
92+
// Worker pool
93+
workerPool, _ := errgroup.WithContext(ctx)
94+
for w := 0; w < d.workerCount; w++ {
95+
workerPool.Go(func() error {
96+
newWorker(
97+
jobs,
98+
queue,
99+
resolvedResources,
100+
d.logger,
101+
d.caser,
102+
d.invocationID,
103+
d.deterministicCQID,
104+
d.metrics,
105+
).work(ctx, activeWorkSignal)
106+
return nil
107+
})
108+
}
109+
110+
// Work distribution
111+
go func() {
112+
defer close(jobs)
113+
for {
114+
select {
115+
case <-ctx.Done():
116+
return
117+
default:
118+
item := queue.Pop()
119+
120+
// There is work to do
121+
if item != nil {
122+
jobs <- item
123+
continue
124+
}
125+
126+
// Queue is empty and no active work, done!
127+
if activeWorkSignal.IsIdle() {
128+
return
129+
}
130+
131+
// Queue is empty and there is active work, wait for changes
132+
activeWorkSignal.Wait()
133+
}
134+
}
135+
}()
136+
137+
_ = workerPool.Wait()
138+
}

0 commit comments

Comments
 (0)