Skip to content

Commit 5a02e27

Browse files
authored
feat: Support DeleteRecord in all writers (#1295)
<!-- Explain what problem this PR addresses --> ---
1 parent db72682 commit 5a02e27

File tree

8 files changed

+122
-13
lines changed

8 files changed

+122
-13
lines changed

writers/batchwriter/batchwriter.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Client interface {
1717
MigrateTables(context.Context, message.WriteMigrateTables) error
1818
WriteTableBatch(ctx context.Context, name string, messages message.WriteInserts) error
1919
DeleteStale(context.Context, message.WriteDeleteStales) error
20+
DeleteRecord(context.Context, message.WriteDeleteRecords) error
2021
}
2122

2223
type BatchWriter struct {
@@ -29,6 +30,8 @@ type BatchWriter struct {
2930
migrateTableMessages message.WriteMigrateTables
3031
deleteStaleLock sync.Mutex
3132
deleteStaleMessages message.WriteDeleteStales
33+
deleteRecordLock sync.Mutex
34+
deleteRecordMessages message.WriteDeleteRecords
3235

3336
logger zerolog.Logger
3437
batchTimeout time.Duration
@@ -199,6 +202,19 @@ func (w *BatchWriter) flushDeleteStaleTables(ctx context.Context) error {
199202
return nil
200203
}
201204

205+
func (w *BatchWriter) flushDeleteRecordTables(ctx context.Context) error {
206+
w.deleteRecordLock.Lock()
207+
defer w.deleteRecordLock.Unlock()
208+
if len(w.deleteRecordMessages) == 0 {
209+
return nil
210+
}
211+
if err := w.client.DeleteRecord(ctx, w.deleteRecordMessages); err != nil {
212+
return err
213+
}
214+
w.deleteRecordMessages = w.deleteRecordMessages[:0]
215+
return nil
216+
}
217+
202218
func (w *BatchWriter) flushInsert(tableName string) {
203219
w.workersLock.RLock()
204220
worker, ok := w.workers[tableName]
@@ -239,6 +255,26 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
239255
return err
240256
}
241257
}
258+
case *message.WriteDeleteRecord:
259+
if err := w.flushMigrateTables(ctx); err != nil {
260+
return err
261+
}
262+
if err := w.flushDeleteStaleTables(ctx); err != nil {
263+
return err
264+
}
265+
// Ensure all related workers are flushed
266+
for _, rel := range m.TableRelations {
267+
w.flushInsert(rel.TableName)
268+
}
269+
w.deleteRecordLock.Lock()
270+
w.deleteRecordMessages = append(w.deleteRecordMessages, m)
271+
l := len(w.deleteRecordMessages)
272+
w.deleteRecordLock.Unlock()
273+
if w.batchSize > 0 && l > w.batchSize {
274+
if err := w.flushDeleteRecordTables(ctx); err != nil {
275+
return err
276+
}
277+
}
242278
case *message.WriteInsert:
243279
if err := w.flushMigrateTables(ctx); err != nil {
244280
return err

writers/batchwriter/batchwriter_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type testBatchClient struct {
1818
migrateTables message.WriteMigrateTables
1919
inserts message.WriteInserts
2020
deleteStales message.WriteDeleteStales
21+
deleteRecords message.WriteDeleteRecords
2122
}
2223

2324
func (c *testBatchClient) MigrateTablesLen() int {
@@ -58,6 +59,13 @@ func (c *testBatchClient) DeleteStale(_ context.Context, messages message.WriteD
5859
return nil
5960
}
6061

62+
func (c *testBatchClient) DeleteRecord(_ context.Context, messages message.WriteDeleteRecords) error {
63+
c.mutex.Lock()
64+
defer c.mutex.Unlock()
65+
c.deleteRecords = append(c.deleteRecords, messages...)
66+
return nil
67+
}
68+
6169
var batchTestTables = schema.Tables{
6270
{
6371
Name: "table1",

writers/batchwriter/unimplemented.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,9 @@ type UnimplementedDeleteStale struct{}
1919
func (UnimplementedDeleteStale) DeleteStale(context.Context, message.WriteDeleteStales) error {
2020
return fmt.Errorf("DeleteStale: %w", plugin.ErrNotImplemented)
2121
}
22+
23+
type UnimplementedDeleteRecord struct{}
24+
25+
func (UnimplementedDeleteRecord) DeleteRecord(context.Context, message.WriteDeleteRecords) error {
26+
return fmt.Errorf("DeleteRecord: %w", plugin.ErrNotImplemented)
27+
}

writers/batchwriter/unimplemented_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type testDummyClient struct {
1111
batchwriter.IgnoreMigrateTables
1212
batchwriter.UnimplementedDeleteStale
13+
batchwriter.UnimplementedDeleteRecord
1314
}
1415

1516
func (testDummyClient) WriteTableBatch(context.Context, string, message.WriteInserts) error {

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type Client interface {
4040
// DeleteStale should block and handle WriteDeleteStale messages until the channel is closed.
4141
DeleteStale(context.Context, <-chan *message.WriteDeleteStale) error
4242

43+
// DeleteRecords should block and handle WriteDeleteRecord messages until the channel is closed.
44+
DeleteRecords(context.Context, <-chan *message.WriteDeleteRecord) error
45+
4346
// WriteTable should block and handle writes to a single table until the channel is closed. Table metadata can be found in the first WriteInsert message.
4447
// The channel is closed when all inserts in the batch have been sent. New batches, if any, will be sent on a new call to WriteTable.
4548
WriteTable(context.Context, <-chan *message.WriteInsert) error
@@ -48,9 +51,11 @@ type Client interface {
4851
type StreamingBatchWriter struct {
4952
client Client
5053

51-
insertWorkers map[string]*streamingWorkerManager[*message.WriteInsert]
52-
migrateWorker *streamingWorkerManager[*message.WriteMigrateTable]
53-
deleteWorker *streamingWorkerManager[*message.WriteDeleteStale]
54+
insertWorkers map[string]*streamingWorkerManager[*message.WriteInsert]
55+
migrateWorker *streamingWorkerManager[*message.WriteMigrateTable]
56+
deleteStaleWorker *streamingWorkerManager[*message.WriteDeleteStale]
57+
deleteRecordWorker *streamingWorkerManager[*message.WriteDeleteRecord]
58+
5459
workersLock sync.RWMutex
5560
workersWaitGroup sync.WaitGroup
5661

@@ -128,9 +133,14 @@ func (w *StreamingBatchWriter) Flush(_ context.Context) error {
128133
w.migrateWorker.flush <- done
129134
<-done
130135
}
131-
if w.deleteWorker != nil {
136+
if w.deleteStaleWorker != nil {
137+
done := make(chan bool)
138+
w.deleteStaleWorker.flush <- done
139+
<-done
140+
}
141+
if w.deleteRecordWorker != nil {
132142
done := make(chan bool)
133-
w.deleteWorker.flush <- done
143+
w.deleteRecordWorker.flush <- done
134144
<-done
135145
}
136146
for _, worker := range w.insertWorkers {
@@ -151,14 +161,18 @@ func (w *StreamingBatchWriter) Close(context.Context) error {
151161
if w.migrateWorker != nil {
152162
close(w.migrateWorker.ch)
153163
}
154-
if w.deleteWorker != nil {
155-
close(w.deleteWorker.ch)
164+
if w.deleteStaleWorker != nil {
165+
close(w.deleteStaleWorker.ch)
166+
}
167+
if w.deleteRecordWorker != nil {
168+
close(w.deleteRecordWorker.ch)
156169
}
157170
w.workersWaitGroup.Wait()
158171

159172
w.insertWorkers = make(map[string]*streamingWorkerManager[*message.WriteInsert])
160173
w.migrateWorker = nil
161-
w.deleteWorker = nil
174+
w.deleteStaleWorker = nil
175+
w.deleteRecordWorker = nil
162176
w.lastMsgType = writers.MsgTypeUnset
163177

164178
return nil
@@ -232,13 +246,13 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
232246
case *message.WriteDeleteStale:
233247
w.workersLock.Lock()
234248
defer w.workersLock.Unlock()
235-
if w.deleteWorker != nil {
236-
w.deleteWorker.ch <- m
249+
if w.deleteStaleWorker != nil {
250+
w.deleteStaleWorker.ch <- m
237251
return nil
238252
}
239253
ch := make(chan *message.WriteDeleteStale)
240254
flush := make(chan chan bool)
241-
w.deleteWorker = &streamingWorkerManager[*message.WriteDeleteStale]{
255+
w.deleteStaleWorker = &streamingWorkerManager[*message.WriteDeleteStale]{
242256
ch: ch,
243257
writeFunc: w.client.DeleteStale,
244258

@@ -251,8 +265,8 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
251265
}
252266

253267
w.workersWaitGroup.Add(1)
254-
go w.deleteWorker.run(ctx, &w.workersWaitGroup, tableName)
255-
w.deleteWorker.ch <- m
268+
go w.deleteStaleWorker.run(ctx, &w.workersWaitGroup, tableName)
269+
w.deleteStaleWorker.ch <- m
256270
return nil
257271
case *message.WriteInsert:
258272
w.workersLock.RLock()
@@ -285,7 +299,32 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
285299
go wr.run(ctx, &w.workersWaitGroup, tableName)
286300
ch <- m
287301
return nil
302+
case *message.WriteDeleteRecord:
303+
w.workersLock.Lock()
304+
defer w.workersLock.Unlock()
305+
if w.deleteRecordWorker != nil {
306+
w.deleteRecordWorker.ch <- m
307+
return nil
308+
}
309+
ch := make(chan *message.WriteDeleteRecord)
310+
flush := make(chan chan bool)
311+
// TODO: flush all workers for nested tables as well (See https://github.com/cloudquery/plugin-sdk/issues/1296)
312+
w.deleteRecordWorker = &streamingWorkerManager[*message.WriteDeleteRecord]{
313+
ch: ch,
314+
writeFunc: w.client.DeleteRecords,
315+
316+
flush: flush,
317+
errCh: errCh,
288318

319+
batchSizeRows: w.batchSizeRows,
320+
batchTimeout: w.batchTimeout,
321+
tickerFn: w.tickerFn,
322+
}
323+
324+
w.workersWaitGroup.Add(1)
325+
go w.deleteRecordWorker.run(ctx, &w.workersWaitGroup, tableName)
326+
w.deleteRecordWorker.ch <- m
327+
return nil
289328
default:
290329
return fmt.Errorf("unhandled message type: %T", msg)
291330
}

writers/streamingbatchwriter/streamingbatchwriter_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
messageTypeMigrateTable messageType = iota
2020
messageTypeInsert
2121
messageTypeDeleteStale
22+
messageTypeDeleteRecord
2223
)
2324

2425
type testStreamingBatchClient struct {
@@ -79,6 +80,14 @@ func (c *testStreamingBatchClient) DeleteStale(ctx context.Context, msgs <-chan
7980
return c.handleTypeCommit(ctx, messageTypeDeleteStale, key)
8081
}
8182

83+
func (c *testStreamingBatchClient) DeleteRecords(ctx context.Context, msgs <-chan *message.WriteDeleteRecord) error {
84+
key := ""
85+
for m := range msgs {
86+
key = c.handleTypeMessage(ctx, messageTypeDeleteRecord, m, key)
87+
}
88+
return c.handleTypeCommit(ctx, messageTypeDeleteRecord, key)
89+
}
90+
8291
func (c *testStreamingBatchClient) handleTypeMessage(_ context.Context, t messageType, msg message.WriteMessage, key string) string {
8392
c.mutex.Lock()
8493
defer c.mutex.Unlock()

writers/streamingbatchwriter/unimplemented.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,12 @@ func (UnimplementedDeleteStale) DeleteStale(_ context.Context, ch <-chan *messag
2727
}
2828
return fmt.Errorf("DeleteStale: %w", plugin.ErrNotImplemented)
2929
}
30+
31+
type UnimplementedDeleteRecords struct{}
32+
33+
func (UnimplementedDeleteRecords) DeleteRecords(_ context.Context, ch <-chan *message.WriteDeleteRecord) error {
34+
// nolint:revive
35+
for range ch {
36+
}
37+
return fmt.Errorf("DeleteRecords: %w", plugin.ErrNotImplemented)
38+
}

writers/streamingbatchwriter/unimplemented_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type testDummyClient struct {
1111
streamingbatchwriter.IgnoreMigrateTable
1212
streamingbatchwriter.UnimplementedDeleteStale
13+
streamingbatchwriter.UnimplementedDeleteRecords
1314
}
1415

1516
func (testDummyClient) WriteTable(context.Context, <-chan *message.WriteInsert) error {

0 commit comments

Comments
 (0)