Skip to content

Commit abe2557

Browse files
authored
fix: Update to plugin-pb v1.5.0 (#1026)
Updated to cloudquery/plugin-pb-go#46
1 parent 40f1c77 commit abe2557

File tree

10 files changed

+14
-24
lines changed

10 files changed

+14
-24
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.19
55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe
77
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
8-
github.com/cloudquery/plugin-pb-go v1.4.0
8+
github.com/cloudquery/plugin-pb-go v1.5.0
99
github.com/cloudquery/plugin-sdk/v2 v2.7.0
1010
github.com/getsentry/sentry-go v0.20.0
1111
github.com/goccy/go-json v0.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
4242
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
4343
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1+8YlWzHYNj2tokFQyja6GXsQBdkuvLMdpuaSw=
4444
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
45-
github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI=
46-
github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
45+
github.com/cloudquery/plugin-pb-go v1.5.0 h1:A/RE1U1l34W5T+JlXJzrHz0IMzfpdUK4VSg+J1Hw0gw=
46+
github.com/cloudquery/plugin-pb-go v1.5.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
4747
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
4848
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
4949
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=

internal/memdb/memdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (c *client) Close(context.Context) error {
191191

192192
func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
193193
var filteredTable []arrow.Record
194-
tableName := msg.Table.Name
194+
tableName := msg.TableName
195195
for i, row := range c.memoryDB[tableName] {
196196
sc := row.Schema()
197197
indices := sc.FieldIndices(schema.CqSourceNameColumn.Name)

internal/servers/destination/v0/destinations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (
243243
bldr.Field(table.Columns.Index(schema.CqSourceNameColumn.Name)).(*array.StringBuilder).Append(req.Source)
244244
bldr.Field(table.Columns.Index(schema.CqSyncTimeColumn.Name)).(*array.TimestampBuilder).AppendTime(req.Timestamp.AsTime())
245245
msgs <- &message.WriteDeleteStale{
246-
Table: table,
246+
TableName: table.Name,
247247
SourceName: req.Source,
248248
SyncTime: req.Timestamp.AsTime(),
249249
}

internal/servers/destination/v1/destinations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (
226226
bldr.Field(table.Columns.Index(schema.CqSourceNameColumn.Name)).(*array.StringBuilder).Append(req.Source)
227227
bldr.Field(table.Columns.Index(schema.CqSyncTimeColumn.Name)).(*array.TimestampBuilder).AppendTime(req.Timestamp.AsTime())
228228
msgs <- &message.WriteDeleteStale{
229-
Table: table,
229+
TableName: table.Name,
230230
SourceName: req.Source,
231231
SyncTime: req.Timestamp.AsTime(),
232232
}

internal/servers/plugin/v3/plugin.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,18 +176,8 @@ func (s *Server) Write(msg pb.Plugin_WriteServer) error {
176176
Record: record,
177177
}
178178
case *pb.Write_Request_Delete:
179-
sc, err := pb.NewSchemaFromBytes(pbMsg.Delete.Table)
180-
if err != nil {
181-
pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create schema from bytes: %v", err)
182-
break
183-
}
184-
table, err := schema.NewTableFromArrowSchema(sc)
185-
if err != nil {
186-
pbMsgConvertErr = status.Errorf(codes.InvalidArgument, "failed to create table from schema: %v", err)
187-
break
188-
}
189179
pluginMessage = &message.WriteDeleteStale{
190-
Table: table,
180+
TableName: pbMsg.Delete.TableName,
191181
SourceName: pbMsg.Delete.SourceName,
192182
SyncTime: pbMsg.Delete.SyncTime.AsTime(),
193183
}

message/write_message.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,19 +95,19 @@ func (m WriteInserts) GetRecordsForTable(table *schema.Table) []arrow.Record {
9595
// The message indicates that the destination needs to run something like "DELETE FROM table WHERE _cq_source_name=$1 and sync_time < $2"
9696
type WriteDeleteStale struct {
9797
writeBaseMessage
98-
Table *schema.Table
98+
TableName string
9999
SourceName string
100100
SyncTime time.Time
101101
}
102102

103103
func (m WriteDeleteStale) GetTable() *schema.Table {
104-
return m.Table
104+
return &schema.Table{Name: m.TableName}
105105
}
106106

107107
type WriteDeleteStales []*WriteDeleteStale
108108

109109
func (m WriteDeleteStales) Exists(tableName string) bool {
110110
return slices.ContainsFunc(m, func(msg *WriteDeleteStale) bool {
111-
return msg.Table.Name == tableName
111+
return msg.TableName == tableName
112112
})
113113
}

plugin/testing_write_delete.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error {
5353
bldr.Field(1).(*array.TimestampBuilder).AppendTime(syncTime.Add(time.Second))
5454

5555
if err := s.plugin.writeOne(ctx, &message.WriteDeleteStale{
56-
Table: table,
56+
TableName: table.Name,
5757
SourceName: "test",
5858
SyncTime: syncTime,
5959
}); err != nil {

writers/batchwriter/batchwriter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (w *BatchWriter) Write(ctx context.Context, msgs <-chan message.WriteMessag
251251
if err := w.flushMigrateTables(ctx); err != nil {
252252
return err
253253
}
254-
w.flushInsert(m.Table.Name)
254+
w.flushInsert(m.TableName)
255255
w.deleteStaleLock.Lock()
256256
w.deleteStaleMessages = append(w.deleteStaleMessages, m)
257257
l := len(w.deleteStaleMessages)

writers/mixedbatchwriter/mixedbatchwriter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,12 @@ func TestMixedBatchWriter(t *testing.T) {
9595

9696
// message to delete stale from table1
9797
msgDeleteStale1 := &message.WriteDeleteStale{
98-
Table: table1,
98+
TableName: table1.Name,
9999
SourceName: "my-source",
100100
SyncTime: time.Now(),
101101
}
102102
msgDeleteStale2 := &message.WriteDeleteStale{
103-
Table: table1,
103+
TableName: table1.Name,
104104
SourceName: "my-source",
105105
SyncTime: time.Now(),
106106
}

0 commit comments

Comments
 (0)