Skip to content

Commit 282ee45

Browse files
authored
fix(writers): Allow zero timeout, remove unused timeout options from mixedbatchwriter (#1020)
1 parent 6f459c6 commit 282ee45

File tree

5 files changed

+143
-51
lines changed

5 files changed

+143
-51
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m
133133

134134
resources = append(resources, r)
135135
sizeBytes += util.TotalRecordSize(r.Record)
136-
case <-time.After(w.batchTimeout):
136+
case <-timer(w.batchTimeout):
137137
if len(resources) > 0 {
138138
w.flushTable(ctx, tableName, resources)
139139
resources, sizeBytes = resources[:0], 0
@@ -316,3 +316,10 @@ func (w *BatchWriter) startWorker(ctx context.Context, msg *message.WriteInsert)
316316
ch <- msg
317317
return nil
318318
}
319+
320+
func timer(timeout time.Duration) <-chan time.Time {
321+
if timeout == 0 {
322+
return nil
323+
}
324+
return time.After(timeout)
325+
}

writers/mixedbatchwriter/mixedbatchwriter.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package mixedbatchwriter
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/apache/arrow/go/v13/arrow/util"
87
"github.com/cloudquery/plugin-sdk/v4/message"
@@ -20,7 +19,6 @@ type Client interface {
2019
type MixedBatchWriter struct {
2120
client Client
2221
logger zerolog.Logger
23-
batchTimeout time.Duration
2422
batchSize int
2523
batchSizeBytes int
2624
}
@@ -36,12 +34,6 @@ func WithLogger(logger zerolog.Logger) Option {
3634
}
3735
}
3836

39-
func WithBatchTimeout(timeout time.Duration) Option {
40-
return func(p *MixedBatchWriter) {
41-
p.batchTimeout = timeout
42-
}
43-
}
44-
4537
func WithBatchSize(size int) Option {
4638
return func(p *MixedBatchWriter) {
4739
p.batchSize = size
@@ -58,7 +50,6 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) {
5850
c := &MixedBatchWriter{
5951
client: client,
6052
logger: zerolog.Nop(),
61-
batchTimeout: writers.DefaultBatchTimeoutSeconds * time.Second,
6253
batchSize: writers.DefaultBatchSize,
6354
batchSizeBytes: writers.DefaultBatchSizeBytes,
6455
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package streamingbatchwriter
2+
3+
import "time"
4+
5+
type mockTimer struct {
6+
expire chan time.Time
7+
}
8+
9+
func (t *mockTimer) timer(time.Duration) <-chan time.Time {
10+
return t.expire
11+
}
12+
13+
func newMockTimer() (timerFn, chan time.Time) {
14+
expire := make(chan time.Time)
15+
t := &mockTimer{
16+
expire: expire,
17+
}
18+
return t.timer, expire
19+
}

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ type StreamingBatchWriter struct {
6060
batchTimeout time.Duration
6161
batchSizeRows int64
6262
batchSizeBytes int64
63+
64+
timerFn timerFn
6365
}
6466

67+
type timerFn func(timeout time.Duration) <-chan time.Time
68+
6569
// Assert at compile-time that StreamingBatchWriter implements the Writer interface
6670
var _ writers.Writer = (*StreamingBatchWriter)(nil)
6771

@@ -91,6 +95,12 @@ func WithBatchSizeBytes(size int64) Option {
9195
}
9296
}
9397

98+
func withTimerFn(timer timerFn) Option {
99+
return func(p *StreamingBatchWriter) {
100+
p.timerFn = timer
101+
}
102+
}
103+
94104
func New(client Client, opts ...Option) (*StreamingBatchWriter, error) {
95105
c := &StreamingBatchWriter{
96106
client: client,
@@ -99,6 +109,7 @@ func New(client Client, opts ...Option) (*StreamingBatchWriter, error) {
99109
batchTimeout: writers.DefaultBatchTimeoutSeconds * time.Second,
100110
batchSizeRows: writers.DefaultBatchSize,
101111
batchSizeBytes: writers.DefaultBatchSizeBytes,
112+
timerFn: timer,
102113
}
103114
for _, opt := range opts {
104115
opt(c)
@@ -214,6 +225,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
214225

215226
batchSizeRows: w.batchSizeRows,
216227
batchTimeout: w.batchTimeout,
228+
timerFn: w.timerFn,
217229
}
218230

219231
w.workersWaitGroup.Add(1)
@@ -265,6 +277,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
265277
batchSizeRows: w.batchSizeRows,
266278
batchSizeBytes: w.batchSizeBytes,
267279
batchTimeout: w.batchTimeout,
280+
timerFn: w.timerFn,
268281
}
269282
w.workersLock.Lock()
270283
w.insertWorkers[tableName] = wr
@@ -290,6 +303,7 @@ type streamingWorkerManager[T message.WriteMessage] struct {
290303
batchSizeRows int64
291304
batchSizeBytes int64
292305
batchTimeout time.Duration
306+
timerFn timerFn
293307
}
294308

295309
func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, tableName string) {
@@ -351,7 +365,7 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
351365
clientCh <- r
352366
sizeRows++
353367
sizeBytes += recSize
354-
case <-time.After(s.batchTimeout):
368+
case <-s.timerFn(s.batchTimeout):
355369
if sizeRows > 0 {
356370
closeFlush()
357371
}
@@ -370,3 +384,10 @@ func DummyHandler[T message.WriteMessage](ch <-chan T) {
370384
for range ch {
371385
}
372386
}
387+
388+
func timer(timeout time.Duration) <-chan time.Time {
389+
if timeout == 0 {
390+
return nil
391+
}
392+
return time.After(timeout)
393+
}

0 commit comments

Comments
 (0)