Skip to content

Commit 6e35a5f

Browse files
authored
feat: Split sync and write messages (#1009)
Follow-up cloudquery/plugin-pb-go#41 This also removes WriteOptions because we moved stuff into messages
1 parent c79cde4 commit 6e35a5f

28 files changed

+353
-297
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.19
55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230622042343-ec413b7763fe
77
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
8-
github.com/cloudquery/plugin-pb-go v1.3.4
8+
github.com/cloudquery/plugin-pb-go v1.4.0
99
github.com/cloudquery/plugin-sdk/v2 v2.7.0
1010
github.com/getsentry/sentry-go v0.20.0
1111
github.com/goccy/go-json v0.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
4242
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
4343
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a h1:O/FNq1+8YlWzHYNj2tokFQyja6GXsQBdkuvLMdpuaSw=
4444
github.com/cloudquery/arrow/go/v13 v13.0.0-20230626001500-065602842c3a/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
45-
github.com/cloudquery/plugin-pb-go v1.3.4 h1:GKNVo9gpmvctyD11QaPVnSLYzE9lw1g1kzt4TSmP+2s=
46-
github.com/cloudquery/plugin-pb-go v1.3.4/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
45+
github.com/cloudquery/plugin-pb-go v1.4.0 h1:sfy0oWSFac2JCJQJuKoR+8flZGKkEoUVORwZDNM3aiI=
46+
github.com/cloudquery/plugin-pb-go v1.4.0/go.mod h1:NbWAtT2BzJQ9+XUWwh3IKBg3MOeV9ZEpHoHNAQ/YDV8=
4747
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
4848
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
4949
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=

internal/clients/state/v3/state.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,9 @@ func NewClient(ctx context.Context, pbClient pb.PluginClient, tableName string)
6262
if err != nil {
6363
return nil, err
6464
}
65-
if err := writeClient.Send(&pb.Write_Request{
66-
Message: &pb.Write_Request_Options{
67-
Options: &pb.WriteOptions{MigrateForce: false},
68-
},
69-
}); err != nil {
70-
return nil, err
71-
}
7265
if err := writeClient.Send(&pb.Write_Request{
7366
Message: &pb.Write_Request_MigrateTable{
74-
MigrateTable: &pb.MessageMigrateTable{
67+
MigrateTable: &pb.Write_MessageMigrateTable{
7568
Table: tableBytes,
7669
},
7770
},
@@ -97,8 +90,6 @@ func NewClient(ctx context.Context, pbClient pb.PluginClient, tableName string)
9790
}
9891
var insertMessage *pb.Sync_Response_Insert
9992
switch m := res.Message.(type) {
100-
case *pb.Sync_Response_Delete:
101-
continue
10293
case *pb.Sync_Response_MigrateTable:
10394
continue
10495
case *pb.Sync_Response_Insert:
@@ -153,14 +144,9 @@ func (c *Client) Flush(ctx context.Context) error {
153144
if err != nil {
154145
return err
155146
}
156-
if err := writeClient.Send(&pb.Write_Request{
157-
Message: &pb.Write_Request_Options{},
158-
}); err != nil {
159-
return err
160-
}
161147
if err := writeClient.Send(&pb.Write_Request{
162148
Message: &pb.Write_Request_Insert{
163-
Insert: &pb.MessageInsert{
149+
Insert: &pb.Write_MessageInsert{
164150
Record: recordBytes,
165151
},
166152
},

internal/memdb/memdb.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@ func (c *client) Read(_ context.Context, table *schema.Table, res chan<- arrow.R
106106
return nil
107107
}
108108

109-
func (c *client) Sync(_ context.Context, options plugin.SyncOptions, res chan<- message.Message) error {
109+
func (c *client) Sync(_ context.Context, options plugin.SyncOptions, res chan<- message.SyncMessage) error {
110110
c.memoryDBLock.RLock()
111111

112112
for tableName := range c.memoryDB {
113113
if !plugin.MatchesTable(tableName, options.Tables, options.SkipTables) {
114114
continue
115115
}
116116
for _, row := range c.memoryDB[tableName] {
117-
res <- &message.Insert{
117+
res <- &message.SyncInsert{
118118
Record: row,
119119
}
120120
}
@@ -149,7 +149,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) {
149149
c.tables[tableName] = table
150150
}
151151

152-
func (c *client) Write(ctx context.Context, _ plugin.WriteOptions, msgs <-chan message.Message) error {
152+
func (c *client) Write(ctx context.Context, msgs <-chan message.WriteMessage) error {
153153
if c.errOnWrite {
154154
return fmt.Errorf("errOnWrite")
155155
}
@@ -165,11 +165,11 @@ func (c *client) Write(ctx context.Context, _ plugin.WriteOptions, msgs <-chan m
165165
c.memoryDBLock.Lock()
166166

167167
switch msg := msg.(type) {
168-
case *message.MigrateTable:
168+
case *message.WriteMigrateTable:
169169
c.migrate(ctx, msg.Table)
170-
case *message.DeleteStale:
170+
case *message.WriteDeleteStale:
171171
c.deleteStale(ctx, msg)
172-
case *message.Insert:
172+
case *message.WriteInsert:
173173
sc := msg.Record.Schema()
174174
tableName, ok := sc.Metadata().GetValue(schema.MetadataTableName)
175175
if !ok {
@@ -189,7 +189,7 @@ func (c *client) Close(context.Context) error {
189189
return nil
190190
}
191191

192-
func (c *client) deleteStale(_ context.Context, msg *message.DeleteStale) {
192+
func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
193193
var filteredTable []arrow.Record
194194
tableName := msg.Table.Name
195195
for i, row := range c.memoryDB[tableName] {

internal/memdb/memdb_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestOnWriteError(t *testing.T) {
3939
if err := p.Init(ctx, nil); err != nil {
4040
t.Fatal(err)
4141
}
42-
if err := p.WriteAll(ctx, plugin.WriteOptions{}, nil); err.Error() != "errOnWrite" {
42+
if err := p.WriteAll(ctx, nil); err.Error() != "errOnWrite" {
4343
t.Fatalf("expected errOnWrite, got %s", err)
4444
}
4545
}

internal/servers/destination/v0/destinations.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,15 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr
6767
tables := TablesV2ToV3(tablesV2).FlattenTables()
6868
SetDestinationManagedCqColumns(tables)
6969
s.setPKsForTables(tables)
70-
writeCh := make(chan message.Message)
70+
writeCh := make(chan message.WriteMessage)
7171
eg, ctx := errgroup.WithContext(ctx)
7272
eg.Go(func() error {
73-
return s.Plugin.Write(ctx, plugin.WriteOptions{
74-
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
75-
}, writeCh)
73+
return s.Plugin.Write(ctx, writeCh)
7674
})
7775
for _, table := range tables {
78-
writeCh <- &message.MigrateTable{
79-
Table: table,
76+
writeCh <- &message.WriteMigrateTable{
77+
Table: table,
78+
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
8079
}
8180
}
8281
close(writeCh)
@@ -93,7 +92,7 @@ func (*Server) Write(pb.Destination_WriteServer) error {
9392
// Note the order of operations in this method is important!
9493
// Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock.
9594
func (s *Server) Write2(msg pb.Destination_Write2Server) error {
96-
msgs := make(chan message.Message)
95+
msgs := make(chan message.WriteMessage)
9796

9897
r, err := msg.Recv()
9998
if err != nil {
@@ -124,14 +123,13 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
124123
eg, ctx := errgroup.WithContext(msg.Context())
125124
// sourceName := r.Source
126125
eg.Go(func() error {
127-
return s.Plugin.Write(ctx, plugin.WriteOptions{
128-
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
129-
}, msgs)
126+
return s.Plugin.Write(ctx, msgs)
130127
})
131128

132129
for _, table := range tables {
133-
msgs <- &message.MigrateTable{
134-
Table: table,
130+
msgs <- &message.WriteMigrateTable{
131+
Table: table,
132+
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
135133
}
136134
}
137135

@@ -180,7 +178,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
180178
origResource.Data = append([]schemav2.CQType{sourceColumn, syncTimeColumn}, origResource.Data...)
181179
}
182180
convertedResource := CQTypesToRecord(memory.DefaultAllocator, []schemav2.CQTypes{origResource.Data}, table.ToArrowSchema())
183-
msg := &message.Insert{
181+
msg := &message.WriteInsert{
184182
Record: convertedResource,
185183
}
186184

@@ -232,19 +230,19 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (
232230
tables := TablesV2ToV3(tablesV2).FlattenTables()
233231
SetDestinationManagedCqColumns(tables)
234232

235-
msgs := make(chan message.Message)
233+
msgs := make(chan message.WriteMessage)
236234
var writeErr error
237235
var wg sync.WaitGroup
238236
wg.Add(1)
239237
go func() {
240238
defer wg.Done()
241-
writeErr = s.Plugin.Write(ctx, plugin.WriteOptions{}, msgs)
239+
writeErr = s.Plugin.Write(ctx, msgs)
242240
}()
243241
for _, table := range tables {
244242
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
245243
bldr.Field(table.Columns.Index(schema.CqSourceNameColumn.Name)).(*array.StringBuilder).Append(req.Source)
246244
bldr.Field(table.Columns.Index(schema.CqSyncTimeColumn.Name)).(*array.TimestampBuilder).AppendTime(req.Timestamp.AsTime())
247-
msgs <- &message.DeleteStale{
245+
msgs <- &message.WriteDeleteStale{
248246
Table: table,
249247
SourceName: req.Source,
250248
SyncTime: req.Timestamp.AsTime(),

internal/servers/destination/v1/destinations.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,15 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr
6565
}
6666
s.setPKsForTables(tables)
6767

68-
writeCh := make(chan message.Message)
68+
writeCh := make(chan message.WriteMessage)
6969
eg, ctx := errgroup.WithContext(ctx)
7070
eg.Go(func() error {
71-
return s.Plugin.Write(ctx, plugin.WriteOptions{
72-
MigrateForce: s.migrateMode == plugin.MigrateModeForce,
73-
}, writeCh)
71+
return s.Plugin.Write(ctx, writeCh)
7472
})
7573
for _, table := range tables {
76-
writeCh <- &message.MigrateTable{
77-
Table: table,
74+
writeCh <- &message.WriteMigrateTable{
75+
Table: table,
76+
MigrateForce: s.migrateMode == plugin.MigrateModeForce,
7877
}
7978
}
8079
close(writeCh)
@@ -87,7 +86,7 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr
8786
// Note the order of operations in this method is important!
8887
// Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock.
8988
func (s *Server) Write(msg pb.Destination_WriteServer) error {
90-
msgs := make(chan message.Message)
89+
msgs := make(chan message.WriteMessage)
9190

9291
r, err := msg.Recv()
9392
if err != nil {
@@ -120,14 +119,13 @@ func (s *Server) Write(msg pb.Destination_WriteServer) error {
120119
eg, ctx := errgroup.WithContext(msg.Context())
121120

122121
eg.Go(func() error {
123-
return s.Plugin.Write(ctx, plugin.WriteOptions{
124-
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
125-
}, msgs)
122+
return s.Plugin.Write(ctx, msgs)
126123
})
127124

128125
for _, table := range tables {
129-
msgs <- &message.MigrateTable{
130-
Table: table,
126+
msgs <- &message.WriteMigrateTable{
127+
Table: table,
128+
MigrateForce: s.spec.MigrateMode == specs.MigrateModeForced,
131129
}
132130
}
133131

@@ -158,7 +156,7 @@ func (s *Server) Write(msg pb.Destination_WriteServer) error {
158156
for rdr.Next() {
159157
rec := rdr.Record()
160158
rec.Retain()
161-
msg := &message.Insert{
159+
msg := &message.WriteInsert{
162160
Record: rec,
163161
}
164162
select {
@@ -200,19 +198,19 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (
200198
return nil, status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
201199
}
202200

203-
msgs := make(chan message.Message)
201+
msgs := make(chan message.WriteMessage)
204202
var writeErr error
205203
var wg sync.WaitGroup
206204
wg.Add(1)
207205
go func() {
208206
defer wg.Done()
209-
writeErr = s.Plugin.Write(ctx, plugin.WriteOptions{}, msgs)
207+
writeErr = s.Plugin.Write(ctx, msgs)
210208
}()
211209
for _, table := range tables {
212210
bldr := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
213211
bldr.Field(table.Columns.Index(schema.CqSourceNameColumn.Name)).(*array.StringBuilder).Append(req.Source)
214212
bldr.Field(table.Columns.Index(schema.CqSyncTimeColumn.Name)).(*array.TimestampBuilder).AppendTime(req.Timestamp.AsTime())
215-
msgs <- &message.DeleteStale{
213+
msgs <- &message.WriteDeleteStale{
216214
Table: table,
217215
SourceName: req.Source,
218216
SyncTime: req.Timestamp.AsTime(),

0 commit comments

Comments
 (0)