Skip to content

Commit 33b6e39

Browse files
committed
Always close workers, even on error
1 parent 2de7a3a commit 33b6e39

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package streamingbatchwriter
2121

2222
import (
2323
"context"
24+
"errors"
2425
"fmt"
2526
"sync"
2627
"time"
@@ -178,15 +179,19 @@ func (w *StreamingBatchWriter) Close(context.Context) error {
178179
return nil
179180
}
180181

181-
func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessage) error {
182+
func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessage) (retErr error) {
182183
errCh := make(chan error)
183184
defer close(errCh)
185+
defer func() {
186+
err := w.Close(ctx)
187+
retErr = errors.Join(retErr, err)
188+
}()
184189

185190
for {
186191
select {
187192
case msg, ok := <-msgs:
188193
if !ok {
189-
return w.Close(ctx)
194+
return nil
190195
}
191196

192197
msgType := writers.MsgID(msg)
@@ -409,7 +414,6 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
409414
if !ok {
410415
return
411416
}
412-
413417
if ins, ok := any(r).(*message.WriteInsert); ok {
414418
add, toFlush, rest := batch.SliceRecord(ins.Record, s.limit)
415419
if add != nil {

0 commit comments

Comments
 (0)