Skip to content

Commit 58c8a1e

Browse files
authored
fix: Change logic for batch writing to write when batch size is reached, not exceeded (#2153)
The current logic for batch writing prevents us from cleanly testing the `DeleteRecord` handling in the plugin. Currently we have to set `BatchSize: 1` and send two `DeleteRecord` messages in the test, which is not the cleanest logic. This PR changes the logic so the batch is flushed when the batch size is reached, not exceeded (so for `BatchSize: 1` it will flush after one `DeleteRecord` is received). This also matches the logic we have in https://github.com/cloudquery/plugin-sdk/blob/main/internal/batch/cap.go#L7.
1 parent 791c865 commit 58c8a1e

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
281281
w.deleteStaleMessages = append(w.deleteStaleMessages, m)
282282
l := int64(len(w.deleteStaleMessages))
283283
w.deleteStaleLock.Unlock()
284-
if w.batchSize > 0 && l > w.batchSize {
284+
if w.isLimitReached(l) {
285285
if err := w.flushDeleteStaleTables(ctx); err != nil {
286286
return err
287287
}
@@ -301,7 +301,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
301301
w.deleteRecordMessages = append(w.deleteRecordMessages, m)
302302
l := int64(len(w.deleteRecordMessages))
303303
w.deleteRecordLock.Unlock()
304-
if w.batchSize > 0 && l > w.batchSize {
304+
if w.isLimitReached(l) {
305305
if err := w.flushDeleteRecordTables(ctx); err != nil {
306306
return err
307307
}
@@ -328,7 +328,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
328328
w.migrateTableMessages = append(w.migrateTableMessages, m)
329329
l := int64(len(w.migrateTableMessages))
330330
w.migrateTableLock.Unlock()
331-
if w.batchSize > 0 && l > w.batchSize {
331+
if w.isLimitReached(l) {
332332
if err := w.flushMigrateTables(ctx); err != nil {
333333
return err
334334
}
@@ -338,6 +338,12 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
338338
return nil
339339
}
340340

341+
func (w *BatchWriter) isLimitReached(rowCount int64) bool {
342+
limit := batch.CappedAt(0, w.batchSize)
343+
limit.AddRows(rowCount)
344+
return limit.ReachedLimit()
345+
}
346+
341347
func (w *BatchWriter) startWorker(_ context.Context, msg *message.WriteInsert) error {
342348
w.workersLock.RLock()
343349
md := msg.Record.Schema().Metadata()

0 commit comments

Comments
 (0)