Skip to content

Commit feb165e

Browse files
committed
Closes batchsender by sending the last batch immediately.
1 parent 29e50aa commit feb165e

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

scheduler/batchsender/batch_sender.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,17 @@ func (bs *BatchSender) flush(items ...any) {
6969

7070
bs.items = append(bs.items, items...)
7171

72+
if len(bs.items) == 0 {
73+
return
74+
}
75+
7276
bs.sendFn(bs.items)
7377
bs.items = nil
7478
}
79+
80+
func (bs *BatchSender) Close() {
81+
if bs.timer != nil {
82+
bs.timer.Stop()
83+
}
84+
bs.flush()
85+
}

scheduler/scheduler_dfs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
128128
for r := range res {
129129
batchSender.Send(r)
130130
}
131+
batchSender.Close()
131132

132133
// we don't need any waitgroups here because we are waiting for the channel to close
133134
endTime := time.Now()

0 commit comments

Comments
 (0)