Skip to content

Commit 646d451

Browse files
authored
refactor(ticker): Don't reset manually (#1058)
Follow-up for #1056
1 parent 36b65cb commit 646d451

File tree

7 files changed

+53
-52
lines changed

7 files changed

+53
-52
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ func (w *BatchWriter) Close(context.Context) error {
122122
func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) {
123123
sizeBytes := int64(0)
124124
resources := make([]*message.WriteInsert, 0, w.batchSize)
125-
tick := timer(w.batchTimeout)
125+
tick, done := writers.NewTicker(w.batchTimeout)
126+
defer done()
126127
for {
127128
select {
128129
case r, ok := <-ch:
@@ -145,7 +146,6 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m
145146
w.flushTable(ctx, tableName, resources)
146147
resources, sizeBytes = resources[:0], 0
147148
}
148-
tick = timer(w.batchTimeout)
149149
case done := <-flush:
150150
if len(resources) > 0 {
151151
w.flushTable(ctx, tableName, resources)
@@ -324,10 +324,3 @@ func (w *BatchWriter) startWorker(ctx context.Context, msg *message.WriteInsert)
324324
ch <- msg
325325
return nil
326326
}
327-
328-
func timer(timeout time.Duration) <-chan time.Time {
329-
if timeout == 0 {
330-
return nil
331-
}
332-
return time.After(timeout)
333-
}

writers/mixedbatchwriter/mixedbatchwriter.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@ type Client interface {
1717
DeleteStaleBatch(ctx context.Context, messages message.WriteDeleteStales) error
1818
}
1919

20-
type timerFn func(timeout time.Duration) <-chan time.Time
21-
2220
type MixedBatchWriter struct {
2321
client Client
2422
logger zerolog.Logger
2523
batchSize int
2624
batchSizeBytes int
2725
batchTimeout time.Duration
28-
timerFn timerFn
26+
tickerFn writers.TickerFunc
2927
}
3028

3129
// Assert at compile-time that MixedBatchWriter implements the Writer interface
@@ -57,9 +55,9 @@ func WithBatchTimeout(timeout time.Duration) Option {
5755
}
5856
}
5957

60-
func withTimerFn(timer timerFn) Option {
58+
func withTickerFn(tickerFn writers.TickerFunc) Option {
6159
return func(p *MixedBatchWriter) {
62-
p.timerFn = timer
60+
p.tickerFn = tickerFn
6361
}
6462
}
6563

@@ -76,7 +74,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) {
7674
batchSize: defaultBatchSize,
7775
batchSizeBytes: defaultBatchSizeBytes,
7876
batchTimeout: defaultBatchTimeout,
79-
timerFn: timer,
77+
tickerFn: writers.NewTicker,
8078
}
8179
for _, opt := range opts {
8280
opt(c)
@@ -116,7 +114,8 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
116114
}
117115
prevMsgType := writers.MsgTypeUnset
118116
var err error
119-
tick := w.timerFn(w.batchTimeout)
117+
tick, done := w.tickerFn(w.batchTimeout)
118+
defer done()
120119
loop:
121120
for {
122121
select {
@@ -149,7 +148,6 @@ loop:
149148
return err
150149
}
151150
prevMsgType = writers.MsgTypeUnset
152-
tick = w.timerFn(w.batchTimeout)
153151
}
154152
}
155153
return flush(prevMsgType)
@@ -215,10 +213,3 @@ func (m *insertBatchManager) flush(ctx context.Context) error {
215213
m.batch = m.batch[:0]
216214
return nil
217215
}
218-
219-
func timer(timeout time.Duration) <-chan time.Time {
220-
if timeout == 0 {
221-
return nil
222-
}
223-
return time.After(timeout)
224-
}

writers/mixedbatchwriter/mixedbatchwriter_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,14 @@ func TestMixedBatchWriterTimeout(t *testing.T) {
243243
wr, err := New(client,
244244
WithBatchSize(1000),
245245
WithBatchSizeBytes(1000000),
246-
withTimerFn(func(_ time.Duration) <-chan time.Time {
246+
withTickerFn(func(_ time.Duration) (<-chan time.Time, func()) {
247247
c := make(chan time.Time)
248248
go func() {
249-
<-triggerTimeout
250-
c <- time.Now()
249+
for range triggerTimeout {
250+
c <- time.Now()
251+
}
251252
}()
252-
return c
253+
return c, func() { close(c) }
253254
}),
254255
)
255256
if err != nil {

writers/streamingbatchwriter/mocktimer_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
package streamingbatchwriter
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"github.com/cloudquery/plugin-sdk/v4/writers"
7+
)
48

59
type mockTimer struct {
610
expire chan time.Time
711
}
812

9-
func (t *mockTimer) timer(time.Duration) <-chan time.Time {
10-
return t.expire
13+
func (t *mockTimer) timer(time.Duration) (<-chan time.Time, func()) {
14+
return t.expire, t.close
15+
}
16+
17+
func (t *mockTimer) close() {
18+
close(t.expire)
1119
}
1220

13-
func newMockTimer() (timerFn, chan time.Time) {
21+
func newMockTimer() (writers.TickerFunc, chan time.Time) {
1422
expire := make(chan time.Time)
1523
t := &mockTimer{
1624
expire: expire,

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,9 @@ type StreamingBatchWriter struct {
6161
batchSizeRows int64
6262
batchSizeBytes int64
6363

64-
timerFn timerFn
64+
tickerFn writers.TickerFunc
6565
}
6666

67-
type timerFn func(timeout time.Duration) <-chan time.Time
68-
6967
// Assert at compile-time that StreamingBatchWriter implements the Writer interface
7068
var _ writers.Writer = (*StreamingBatchWriter)(nil)
7169

@@ -95,9 +93,9 @@ func WithBatchSizeBytes(size int64) Option {
9593
}
9694
}
9795

98-
func withTimerFn(timer timerFn) Option {
96+
func withTickerFn(tickerFn writers.TickerFunc) Option {
9997
return func(p *StreamingBatchWriter) {
100-
p.timerFn = timer
98+
p.tickerFn = tickerFn
10199
}
102100
}
103101

@@ -115,7 +113,7 @@ func New(client Client, opts ...Option) (*StreamingBatchWriter, error) {
115113
batchTimeout: defaultBatchTimeoutSeconds * time.Second,
116114
batchSizeRows: defaultBatchSize,
117115
batchSizeBytes: defaultBatchSizeBytes,
118-
timerFn: timer,
116+
tickerFn: writers.NewTicker,
119117
}
120118
for _, opt := range opts {
121119
opt(c)
@@ -225,7 +223,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
225223

226224
batchSizeRows: w.batchSizeRows,
227225
batchTimeout: w.batchTimeout,
228-
timerFn: w.timerFn,
226+
tickerFn: w.tickerFn,
229227
}
230228

231229
w.workersWaitGroup.Add(1)
@@ -277,7 +275,7 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
277275
batchSizeRows: w.batchSizeRows,
278276
batchSizeBytes: w.batchSizeBytes,
279277
batchTimeout: w.batchTimeout,
280-
timerFn: w.timerFn,
278+
tickerFn: w.tickerFn,
281279
}
282280
w.workersLock.Lock()
283281
w.insertWorkers[tableName] = wr
@@ -303,7 +301,7 @@ type streamingWorkerManager[T message.WriteMessage] struct {
303301
batchSizeRows int64
304302
batchSizeBytes int64
305303
batchTimeout time.Duration
306-
timerFn timerFn
304+
tickerFn writers.TickerFunc
307305
}
308306

309307
func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, tableName string) {
@@ -345,7 +343,8 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
345343
}
346344
defer closeFlush()
347345

348-
tick := s.timerFn(s.batchTimeout)
346+
tick, done := s.tickerFn(s.batchTimeout)
347+
defer done()
349348
for {
350349
select {
351350
case r, ok := <-s.ch:
@@ -370,7 +369,6 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
370369
if sizeRows > 0 {
371370
closeFlush()
372371
}
373-
tick = s.timerFn(s.batchTimeout)
374372
case done := <-s.flush:
375373
if sizeRows > 0 {
376374
closeFlush()
@@ -379,10 +377,3 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
379377
}
380378
}
381379
}
382-
383-
func timer(timeout time.Duration) <-chan time.Time {
384-
if timeout == 0 {
385-
return nil
386-
}
387-
return time.After(timeout)
388-
}

writers/streamingbatchwriter/streamingbatchwriter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestStreamingBatchTimeout(t *testing.T) {
229229
testClient := newClient()
230230
timerFn, timerExpire := newMockTimer()
231231

232-
wr, err := New(testClient, withTimerFn(timerFn))
232+
wr, err := New(testClient, withTickerFn(timerFn))
233233
if err != nil {
234234
t.Fatal(err)
235235
}
@@ -333,7 +333,7 @@ func TestStreamingBatchUpserts(t *testing.T) {
333333

334334
testClient := newClient()
335335
timerFn, timerExpire := newMockTimer()
336-
wr, err := New(testClient, WithBatchSizeRows(2), withTimerFn(timerFn))
336+
wr, err := New(testClient, WithBatchSizeRows(2), withTickerFn(timerFn))
337337
if err != nil {
338338
t.Fatal(err)
339339
}

writers/ticker.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package writers
2+
3+
import (
4+
"time"
5+
)
6+
7+
type TickerFunc func(interval time.Duration) (ch <-chan time.Time, done func())
8+
9+
func NewTicker(interval time.Duration) (<-chan time.Time, func()) {
10+
if interval <= 0 {
11+
return nil, nop
12+
}
13+
t := time.NewTicker(interval)
14+
return t.C, t.Stop
15+
}
16+
17+
func nop() {}

0 commit comments

Comments
 (0)