Skip to content

Commit b0454e7

Browse files
committed
shutdown logic fix
1 parent 76410f8 commit b0454e7

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,13 @@ func (w *StreamingBatchWriter) Close(context.Context) error {
180180
}
181181

182182
func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessage) error {
183-
errCh := make(chan error)
184-
defer w.Close(ctx)
185-
186183
ctx, cancel := context.WithCancel(ctx)
187184
defer cancel()
188185

186+
errCh := make(chan error)
189187
go func() {
190188
defer close(errCh)
189+
defer w.Close(ctx)
191190
for msg := range msgs {
192191
msgType := writers.MsgID(msg)
193192
if w.lastMsgType != writers.MsgTypeUnset && w.lastMsgType != msgType {
@@ -370,6 +369,7 @@ type streamingWorkerManager[T message.WriteMessage] struct {
370369
batchTimeout time.Duration
371370
tickerFn writers.TickerFunc
372371
failed *atomic.Bool
372+
workerWg sync.WaitGroup
373373

374374
inputCh chan T
375375
mu sync.Mutex // protects inputCh
@@ -404,9 +404,11 @@ func (s *streamingWorkerManager[T]) send(ctx context.Context, data T) {
404404
}
405405

406406
s.inputCh = make(chan T)
407+
s.workerWg.Add(1)
407408

408409
// start consuming our new channel
409410
go func(ch chan T) {
411+
defer s.workerWg.Done()
410412
defer func() {
411413
if msg := recover(); msg != nil {
412414
switch v := msg.(type) {
@@ -453,6 +455,7 @@ func (s *streamingWorkerManager[T]) send(ctx context.Context, data T) {
453455

454456
func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup) {
455457
defer wg.Done()
458+
defer s.workerWg.Wait()
456459
defer s.closeFlush()
457460

458461
ticker := s.tickerFn(s.batchTimeout)

writers/streamingbatchwriter/streamingbatchwriter_test.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ type testStreamingBatchClient struct {
3030
committed map[messageType]int
3131
open map[messageType][]string
3232

33-
writeErr error
34-
writeErrAfter int64
35-
writeCounter map[string]int64 // table name to write counter
33+
writeErr error
34+
writeErrAfter int64
35+
writeCounter map[string]int64 // table name to write counter
36+
writeCommitErr error
3637
}
3738

3839
func newClient() *testStreamingBatchClient {
@@ -84,6 +85,11 @@ func (c *testStreamingBatchClient) WriteTable(ctx context.Context, msgs <-chan *
8485
return c.writeErr // leave msgs open
8586
}
8687
}
88+
89+
if c.writeCommitErr != nil {
90+
return c.writeCommitErr
91+
}
92+
8793
return c.handleTypeCommit(ctx, messageTypeInsert, key)
8894
}
8995

@@ -528,6 +534,41 @@ func TestErrorCleanUpSecondMessage(t *testing.T) {
528534
waitForLength(t, testClient.MessageLen, messageTypeInsert, 1) // batch size 1
529535
}
530536

537+
func TestErrorCleanUpAfterClose(t *testing.T) {
538+
t.Parallel()
539+
ctx := context.Background()
540+
ch := make(chan message.WriteMessage)
541+
542+
testClient := newClient()
543+
testClient.writeCommitErr = errors.New("test error")
544+
545+
wr, err := New(testClient, WithBatchTimeout(0), WithBatchSizeRows(100))
546+
if err != nil {
547+
t.Fatal(err)
548+
}
549+
550+
errCh := make(chan error)
551+
go func() {
552+
errCh <- wr.Write(ctx, ch)
553+
}()
554+
555+
table := schema.Table{Name: "table1", Columns: []schema.Column{{Name: "id", Type: arrow.PrimitiveTypes.Int64}}}
556+
record := getRecord(table.ToArrowSchema(), 1)
557+
558+
for i := 0; i < 10; i++ {
559+
ch <- &message.WriteInsert{
560+
Record: record,
561+
}
562+
}
563+
564+
waitForLength(t, testClient.InflightLen, messageTypeInsert, 10)
565+
close(ch)
566+
567+
requireErrorCount(t, 1, errCh)
568+
569+
waitForLength(t, testClient.MessageLen, messageTypeInsert, 0) // batch size 1
570+
}
571+
531572
func waitForLength(t *testing.T, checkLen func(messageType) int, msgType messageType, want int) {
532573
t.Helper()
533574
lastValue := -1

0 commit comments

Comments
 (0)