Skip to content

Commit b95f7ee

Browse files
committed
change logic for batch size checking
1 parent c82d006 commit b95f7ee

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
278278
w.deleteStaleMessages = append(w.deleteStaleMessages, m)
279279
l := int64(len(w.deleteStaleMessages))
280280
w.deleteStaleLock.Unlock()
281-
if w.batchSize > 0 && l > w.batchSize {
281+
if w.batchSize > 0 && l >= w.batchSize {
282282
if err := w.flushDeleteStaleTables(ctx); err != nil {
283283
return err
284284
}
@@ -298,7 +298,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
298298
w.deleteRecordMessages = append(w.deleteRecordMessages, m)
299299
l := int64(len(w.deleteRecordMessages))
300300
w.deleteRecordLock.Unlock()
301-
if w.batchSize > 0 && l > w.batchSize {
301+
if w.batchSize > 0 && l >= w.batchSize {
302302
if err := w.flushDeleteRecordTables(ctx); err != nil {
303303
return err
304304
}
@@ -322,7 +322,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
322322
w.migrateTableMessages = append(w.migrateTableMessages, m)
323323
l := int64(len(w.migrateTableMessages))
324324
w.migrateTableLock.Unlock()
325-
if w.batchSize > 0 && l > w.batchSize {
325+
if w.batchSize > 0 && l >= w.batchSize {
326326
if err := w.flushMigrateTables(ctx); err != nil {
327327
return err
328328
}

0 commit comments

Comments
 (0)