Skip to content

Commit 1f0a603

Browse files
authored
feat: Support Delete Record (#1282)
#### Summary Blocked by cloudquery/plugin-pb#18 (linting and tests fail because of protobuf isn't updated)
1 parent 3e063bc commit 1f0a603

File tree

11 files changed

+361
-0
lines changed

11 files changed

+361
-0
lines changed

internal/memdb/memdb.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ func (c *client) Write(ctx context.Context, msgs <-chan message.WriteMessage) er
211211
c.migrate(ctx, msg.Table)
212212
case *message.WriteDeleteStale:
213213
c.deleteStale(ctx, msg)
214+
case *message.WriteDeleteRecord:
215+
c.deleteRecord(ctx, msg)
214216
case *message.WriteInsert:
215217
sc := msg.Record.Schema()
216218
tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName)
@@ -257,3 +259,53 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
257259
}
258260
c.memoryDB[tableName] = filteredTable
259261
}
262+
263+
func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord) {
264+
var filteredTable []arrow.Record
265+
tableName := msg.TableName
266+
for i, row := range c.memoryDB[tableName] {
267+
isMatch := true
268+
// Groups are evaluated as AND
269+
for _, predGroup := range msg.WhereClause {
270+
for _, pred := range predGroup.Predicates {
271+
predResult := evaluatePredicate(pred, row)
272+
if predGroup.GroupingType == "AND" {
273+
isMatch = isMatch && predResult
274+
} else if predResult {
275+
isMatch = true
276+
break
277+
}
278+
}
279+
// If any single predicate group is false then we can break out of the loop
280+
if !isMatch {
281+
break
282+
}
283+
}
284+
285+
if !isMatch {
286+
filteredTable = append(filteredTable, c.memoryDB[tableName][i])
287+
}
288+
}
289+
c.memoryDB[tableName] = filteredTable
290+
}
291+
292+
func evaluatePredicate(pred message.Predicate, record arrow.Record) bool {
293+
sc := record.Schema()
294+
indices := sc.FieldIndices(pred.Column)
295+
if len(indices) == 0 {
296+
return false
297+
}
298+
syncColIndex := indices[0]
299+
300+
if record.Column(syncColIndex).DataType() != pred.Record.Column(0).DataType() {
301+
return false
302+
}
303+
// dataType := record.Column(syncColIndex).DataType()
304+
switch pred.Operator {
305+
case "eq":
306+
return record.Column(syncColIndex).String() == pred.Record.Column(0).String()
307+
// return record.Column(syncColIndex).(*array.String).Value(0) == pred.Record.Column(0).(*array.String).Value(0)
308+
default:
309+
return false
310+
}
311+
}

internal/servers/plugin/v3/plugin.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,41 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
156156
Record: recordBytes,
157157
},
158158
}
159+
case *message.SyncDeleteRecord:
160+
whereClause := make([]*pb.PredicatesGroup, len(m.WhereClause))
161+
for j, predicateGroup := range m.WhereClause {
162+
whereClause[j] = &pb.PredicatesGroup{
163+
GroupingType: pb.PredicatesGroup_GroupingType(pb.PredicatesGroup_GroupingType_value[predicateGroup.GroupingType]),
164+
Predicates: make([]*pb.Predicate, len(predicateGroup.Predicates)),
165+
}
166+
for i, predicate := range predicateGroup.Predicates {
167+
record, err := pb.RecordToBytes(predicate.Record)
168+
if err != nil {
169+
return status.Errorf(codes.Internal, "failed to encode record: %v", err)
170+
}
171+
172+
whereClause[j].Predicates[i] = &pb.Predicate{
173+
Record: record,
174+
Column: predicate.Column,
175+
Operator: pb.Predicate_Operator(pb.Predicate_Operator_value[predicate.Operator]),
176+
}
177+
}
178+
}
179+
180+
tableRelations := make([]*pb.TableRelation, len(m.TableRelations))
181+
for i, tr := range m.TableRelations {
182+
tableRelations[i] = &pb.TableRelation{
183+
TableName: tr.TableName,
184+
ParentTable: tr.ParentTable,
185+
}
186+
}
187+
pbMsg.Message = &pb.Sync_Response_DeleteRecord{
188+
DeleteRecord: &pb.Sync_MessageDeleteRecord{
189+
TableName: m.TableName,
190+
TableRelations: tableRelations,
191+
WhereClause: whereClause,
192+
},
193+
}
159194
default:
160195
return status.Errorf(codes.Internal, "unknown message type: %T", msg)
161196
}
@@ -230,6 +265,40 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error {
230265
SourceName: pbMsg.Delete.SourceName,
231266
SyncTime: pbMsg.Delete.SyncTime.AsTime(),
232267
}
268+
269+
case *pb.Write_Request_DeleteRecord:
270+
whereClause := make(message.PredicateGroups, len(pbMsg.DeleteRecord.WhereClause))
271+
272+
for j, predicateGroup := range pbMsg.DeleteRecord.WhereClause {
273+
whereClause[j].Predicates = make(message.Predicates, len(predicateGroup.Predicates))
274+
for i, predicate := range predicateGroup.Predicates {
275+
record, err := pb.NewRecordFromBytes(predicate.Record)
276+
if err != nil {
277+
pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create record: %v", err)
278+
break
279+
}
280+
whereClause[j].Predicates[i] = message.Predicate{
281+
Record: record,
282+
Column: predicate.Column,
283+
Operator: predicate.Operator.String(),
284+
}
285+
}
286+
}
287+
288+
tableRelations := make([]message.TableRelation, len(pbMsg.DeleteRecord.TableRelations))
289+
for i, tr := range pbMsg.DeleteRecord.TableRelations {
290+
tableRelations[i] = message.TableRelation{
291+
TableName: tr.TableName,
292+
ParentTable: tr.ParentTable,
293+
}
294+
}
295+
pluginMessage = &message.WriteDeleteRecord{
296+
DeleteRecord: message.DeleteRecord{
297+
TableName: pbMsg.DeleteRecord.TableName,
298+
TableRelations: tableRelations,
299+
WhereClause: whereClause,
300+
},
301+
}
233302
}
234303

235304
if pbMsgConvertErr != nil {

message/sync_message.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,13 @@ func (m SyncInserts) GetRecordsForTable(table *schema.Table) []arrow.Record {
111111
}
112112
return slices.Clip(res)
113113
}
114+
115+
type SyncDeleteRecord struct {
116+
syncBaseMessage
117+
// TODO: Instead of using this struct we should derive the DeletionKeys and parent/child relation from the schema.Table itself
118+
DeleteRecord
119+
}
120+
121+
func (m SyncDeleteRecord) GetTable() *schema.Table {
122+
return &schema.Table{Name: m.TableName}
123+
}

message/write_message.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,44 @@ func (m WriteDeleteStales) Exists(tableName string) bool {
128128
return msg.TableName == tableName
129129
})
130130
}
131+
132+
type TableRelation struct {
133+
TableName string
134+
ParentTable string
135+
}
136+
137+
type TableRelations []TableRelation
138+
139+
type Predicate struct {
140+
Operator string
141+
Column string
142+
Record arrow.Record
143+
}
144+
145+
type Predicates []Predicate
146+
147+
type PredicateGroup struct {
148+
// This will be AND or OR
149+
GroupingType string
150+
Predicates Predicates
151+
}
152+
153+
type PredicateGroups []PredicateGroup
154+
155+
type DeleteRecord struct {
156+
TableName string
157+
TableRelations TableRelations
158+
WhereClause PredicateGroups
159+
SyncTime time.Time
160+
}
161+
162+
type WriteDeleteRecord struct {
163+
writeBaseMessage
164+
DeleteRecord
165+
}
166+
167+
func (m WriteDeleteRecord) GetTable() *schema.Table {
168+
return &schema.Table{Name: m.TableName}
169+
}
170+
171+
type WriteDeleteRecords []*WriteDeleteRecord

plugin/testing_write.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,15 @@ func TestWriterSuiteRunner(t *testing.T, p *Plugin, tests WriterTestSuiteTests,
148148
})
149149
})
150150

151+
t.Run("TestDeleteRecord", func(t *testing.T) {
152+
t.Run("Basic", func(t *testing.T) {
153+
suite.testDeleteRecordBasic(ctx)
154+
})
155+
t.Run("DeleteAll", func(t *testing.T) {
156+
suite.testDeleteAllRecords(ctx)
157+
})
158+
})
159+
151160
t.Run("TestMigrate", func(t *testing.T) {
152161
if suite.tests.SkipMigrate {
153162
t.Skip("skipping " + t.Name())

plugin/testing_write_delete.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,153 @@ func (s *WriterTestSuite) testDeleteStaleAll(ctx context.Context) {
142142
require.EqualValuesf(s.t, rowsPerRecord, TotalRows(readRecords), "unexpected amount of items after second delete stale")
143143
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{nullRecord}), "record differs")
144144
}
145+
146+
func (s *WriterTestSuite) testDeleteRecordBasic(ctx context.Context) {
147+
tableName := s.tableNameForTest("delete_all_rows")
148+
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
149+
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
150+
table := &schema.Table{
151+
Name: tableName,
152+
Columns: schema.ColumnList{
153+
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true},
154+
schema.CqSourceNameColumn,
155+
schema.CqSyncTimeColumn,
156+
},
157+
}
158+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
159+
const sourceName = "source-test"
160+
161+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
162+
bldr.Field(0).(*array.Int64Builder).Append(0)
163+
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
164+
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
165+
record1 := bldr.NewRecord()
166+
167+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
168+
record1 = s.handleNulls(record1) // we process nulls after writing
169+
170+
records, err := s.plugin.readAll(ctx, table)
171+
require.NoErrorf(s.t, err, "failed to read")
172+
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")
173+
174+
// create value for delete statement but nothing will be deleted because ID value isn't present
175+
bldrDeleteNoMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
176+
Name: tableName,
177+
Columns: schema.ColumnList{
178+
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64},
179+
},
180+
}).ToArrowSchema())
181+
bldrDeleteNoMatch.Field(0).(*array.Int64Builder).Append(1)
182+
deleteValue := bldrDeleteNoMatch.NewRecord()
183+
184+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
185+
DeleteRecord: message.DeleteRecord{
186+
TableName: table.Name,
187+
WhereClause: message.PredicateGroups{
188+
{
189+
GroupingType: "AND",
190+
Predicates: []message.Predicate{
191+
{
192+
Operator: "eq",
193+
Column: "id",
194+
Record: deleteValue,
195+
},
196+
},
197+
},
198+
},
199+
},
200+
}), "failed to delete record no match")
201+
202+
records, err = s.plugin.readAll(ctx, table)
203+
require.NoErrorf(s.t, err, "failed to read after delete with no match")
204+
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items after delete with no match")
205+
require.Emptyf(s.t, RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record1}), "record differs after delete with no match")
206+
207+
// create value for delete statement will be delete One record
208+
bldrDeleteMatch := array.NewRecordBuilder(memory.DefaultAllocator, (&schema.Table{
209+
Name: tableName,
210+
Columns: schema.ColumnList{
211+
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64},
212+
},
213+
}).ToArrowSchema())
214+
bldrDeleteMatch.Field(0).(*array.Int64Builder).Append(0)
215+
deleteValue = bldrDeleteMatch.NewRecord()
216+
217+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
218+
DeleteRecord: message.DeleteRecord{
219+
TableName: table.Name,
220+
WhereClause: message.PredicateGroups{
221+
{
222+
GroupingType: "AND",
223+
Predicates: []message.Predicate{
224+
{
225+
Operator: "eq",
226+
Column: "id",
227+
Record: deleteValue,
228+
},
229+
},
230+
},
231+
},
232+
},
233+
}), "failed to delete record no match")
234+
235+
records, err = s.plugin.readAll(ctx, table)
236+
require.NoErrorf(s.t, err, "failed to read after delete with match")
237+
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete with match")
238+
}
239+
240+
func (s *WriterTestSuite) testDeleteAllRecords(ctx context.Context) {
241+
tableName := s.tableNameForTest("delete_all_records")
242+
syncTime := time.Now().UTC().Truncate(s.genDatOptions.TimePrecision).
243+
Truncate(time.Microsecond) // https://github.com/golang/go/issues/41087
244+
table := &schema.Table{
245+
Name: tableName,
246+
Columns: schema.ColumnList{
247+
schema.Column{Name: "id", Type: arrow.PrimitiveTypes.Int64, PrimaryKey: true, NotNull: true},
248+
schema.CqSourceNameColumn,
249+
schema.CqSyncTimeColumn,
250+
},
251+
}
252+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteMigrateTable{Table: table}), "failed to create table")
253+
const sourceName = "source-test"
254+
255+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
256+
bldr.Field(0).(*array.Int64Builder).Append(0)
257+
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
258+
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime)
259+
record1 := bldr.NewRecord()
260+
261+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record1}), "failed to insert record")
262+
263+
records, err := s.plugin.readAll(ctx, table)
264+
require.NoErrorf(s.t, err, "failed to read")
265+
require.EqualValuesf(s.t, 1, TotalRows(records), "unexpected amount of items")
266+
267+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
268+
DeleteRecord: message.DeleteRecord{
269+
TableName: table.Name,
270+
},
271+
}), "failed to delete records")
272+
273+
records, err = s.plugin.readAll(ctx, table)
274+
require.NoErrorf(s.t, err, "failed to read after delete all records")
275+
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items after delete stale")
276+
277+
bldr.Field(0).(*array.Int64Builder).Append(1)
278+
bldr.Field(1).(*array.StringBuilder).Append(sourceName)
279+
bldr.Field(2).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second))
280+
record2 := bldr.NewRecord()
281+
282+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteInsert{Record: record2}), "failed to insert second record")
283+
284+
require.NoErrorf(s.t, s.plugin.writeOne(ctx, &message.WriteDeleteRecord{
285+
DeleteRecord: message.DeleteRecord{
286+
TableName: table.Name,
287+
},
288+
}), "failed to delete records second time")
289+
290+
records, err = s.plugin.readAll(ctx, table)
291+
require.NoErrorf(s.t, err, "failed to read second time")
292+
sortRecords(table, records, "id")
293+
require.EqualValuesf(s.t, 0, TotalRows(records), "unexpected amount of items second time")
294+
}

0 commit comments

Comments
 (0)