Skip to content

Commit 791c865

Browse files
authored
fix: Flush DeleteRecord messages when batch writer is flushed (#2154)
The current logic for `BatchWriter` does not flush `DeleteRecord` messages when the writer is flushed. This PR adds that flushing. I'm also wondering whether we should add a call to `flushDeleteRecordTables` in https://github.com/cloudquery/plugin-sdk/blob/main/writers/batchwriter/batchwriter.go#L307-L315. WDYT?
1 parent c82d006 commit 791c865

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ func (w *BatchWriter) Flush(ctx context.Context) error {
107107
if err := w.flushMigrateTables(ctx); err != nil {
108108
return err
109109
}
110-
return w.flushDeleteStaleTables(ctx)
110+
if err := w.flushDeleteStaleTables(ctx); err != nil {
111+
return err
112+
}
113+
return w.flushDeleteRecordTables(ctx)
111114
}
112115

113116
func (w *BatchWriter) Close(context.Context) error {
@@ -310,6 +313,9 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
310313
if err := w.flushDeleteStaleTables(ctx); err != nil {
311314
return err
312315
}
316+
if err := w.flushDeleteRecordTables(ctx); err != nil {
317+
return err
318+
}
313319
if err := w.startWorker(ctx, m); err != nil {
314320
return err
315321
}

0 commit comments

Comments
 (0)