Skip to content

Commit 9179e7f

Browse files
authored
fix: Fix timer logic in batch writers (#1056)
I believe that the batch timeout should be a guarantee of a message being delayed by no longer than X amount of time. This means that we shouldn't reset the ticker every time a message is received, because then a message could be delayed for a very long time if a slow trickle of messages is coming in. This change fixes this issue and guarantees a message to be flushed within at most X duration. For more context, see this PR and message #1055 (comment)
1 parent 7fe7c64 commit 9179e7f

File tree

2 files changed

+6
-2
lines changed

2 files changed

+6
-2
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (w *BatchWriter) Close(context.Context) error {
122122
func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) {
123123
sizeBytes := int64(0)
124124
resources := make([]*message.WriteInsert, 0, w.batchSize)
125+
tick := timer(w.batchTimeout)
125126
for {
126127
select {
127128
case r, ok := <-ch:
@@ -139,11 +140,12 @@ func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *m
139140

140141
resources = append(resources, r)
141142
sizeBytes += util.TotalRecordSize(r.Record)
142-
case <-timer(w.batchTimeout):
143+
case <-tick:
143144
if len(resources) > 0 {
144145
w.flushTable(ctx, tableName, resources)
145146
resources, sizeBytes = resources[:0], 0
146147
}
148+
tick = timer(w.batchTimeout)
147149
case done := <-flush:
148150
if len(resources) > 0 {
149151
w.flushTable(ctx, tableName, resources)

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
345345
}
346346
defer closeFlush()
347347

348+
tick := s.timerFn(s.batchTimeout)
348349
for {
349350
select {
350351
case r, ok := <-s.ch:
@@ -365,10 +366,11 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
365366
clientCh <- r
366367
sizeRows++
367368
sizeBytes += recSize
368-
case <-s.timerFn(s.batchTimeout):
369+
case <-tick:
369370
if sizeRows > 0 {
370371
closeFlush()
371372
}
373+
tick = s.timerFn(s.batchTimeout)
372374
case done := <-s.flush:
373375
if sizeRows > 0 {
374376
closeFlush()

0 commit comments

Comments
 (0)