Skip to content

Commit 2078e84

Browse files
authored
fix(writers): Require Close() for StreamingBatchWriter (#1045)
So that workers aren't stopped when a single client's data ends. Calling `Close()` multiple times (or calling `Write` afterwards) would end up in a panic.
1 parent d683eff commit 2078e84

File tree

1 file changed

+4
-10
lines changed

1 file changed

+4
-10
lines changed

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (w *StreamingBatchWriter) Flush(_ context.Context) error {
144144
return nil
145145
}
146146

147-
func (w *StreamingBatchWriter) stopWorkers() {
147+
func (w *StreamingBatchWriter) Close(context.Context) error {
148148
w.workersLock.Lock()
149149
defer w.workersLock.Unlock()
150150
for _, w := range w.insertWorkers {
@@ -158,9 +158,9 @@ func (w *StreamingBatchWriter) stopWorkers() {
158158
}
159159
w.workersWaitGroup.Wait()
160160

161-
w.insertWorkers = make(map[string]*streamingWorkerManager[*message.WriteInsert])
162-
w.migrateWorker = nil
163-
w.deleteWorker = nil
161+
w.insertWorkers = nil
162+
163+
return nil
164164
}
165165

166166
func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessage) error {
@@ -172,8 +172,6 @@ func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.Wr
172172
}
173173
}()
174174

175-
hasWorkers := false
176-
177175
for msg := range msgs {
178176
msgType := writers.MsgID(msg)
179177
if w.lastMsgType != msgType {
@@ -182,7 +180,6 @@ func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.Wr
182180
}
183181
}
184182
w.lastMsgType = msgType
185-
hasWorkers = true
186183
if err := w.startWorker(ctx, errCh, msg); err != nil {
187184
return err
188185
}
@@ -192,9 +189,6 @@ func (w *StreamingBatchWriter) Write(ctx context.Context, msgs <-chan message.Wr
192189
return err
193190
}
194191

195-
if hasWorkers {
196-
w.stopWorkers()
197-
}
198192
close(errCh)
199193
return nil
200194
}

0 commit comments

Comments
 (0)