Skip to content

Commit e290234

Browse files
authored
feat: Use named message slice types in writers (#1017)
1 parent 282ee45 commit e290234

File tree

5 files changed

+84
-93
lines changed

5 files changed

+84
-93
lines changed

message/write_message.go

Lines changed: 51 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,20 @@ import (
55

66
"github.com/apache/arrow/go/v13/arrow"
77
"github.com/cloudquery/plugin-sdk/v4/schema"
8+
"golang.org/x/exp/slices"
89
)
910

10-
type writeBaseMessage struct {
11-
}
11+
type writeBaseMessage struct{}
1212

13-
func (*writeBaseMessage) IsWriteMessage() bool {
14-
return true
15-
}
13+
func (*writeBaseMessage) IsWriteMessage() bool { return true }
1614

1715
type WriteMessage interface {
1816
GetTable() *schema.Table
1917
IsWriteMessage() bool
2018
}
2119

22-
type WriteMigrateTable struct {
23-
writeBaseMessage
24-
Table *schema.Table
25-
MigrateForce bool
26-
}
27-
28-
func (m WriteMigrateTable) GetTable() *schema.Table {
29-
return m.Table
30-
}
31-
32-
type WriteInsert struct {
33-
writeBaseMessage
34-
Record arrow.Record
35-
}
36-
37-
func (m *WriteInsert) GetTable() *schema.Table {
38-
table, err := schema.NewTableFromArrowSchema(m.Record.Schema())
39-
if err != nil {
40-
panic(err)
41-
}
42-
return table
43-
}
44-
4520
type WriteMessages []WriteMessage
4621

47-
type WriteMigrateTables []*WriteMigrateTable
48-
49-
type WriteInserts []*WriteInsert
50-
5122
func (messages WriteMessages) InsertItems() int64 {
5223
items := int64(0)
5324
for _, msg := range messages {
@@ -59,57 +30,69 @@ func (messages WriteMessages) InsertItems() int64 {
5930
}
6031

6132
func (messages WriteMessages) GetInserts() WriteInserts {
62-
inserts := []*WriteInsert{}
33+
inserts := make(WriteInserts, 0, len(messages))
6334
for _, msg := range messages {
6435
if m, ok := msg.(*WriteInsert); ok {
6536
inserts = append(inserts, m)
6637
}
6738
}
68-
return inserts
39+
return slices.Clip(inserts)
6940
}
7041

42+
type WriteMigrateTable struct {
43+
writeBaseMessage
44+
Table *schema.Table
45+
MigrateForce bool
46+
}
47+
48+
func (m WriteMigrateTable) GetTable() *schema.Table { return m.Table }
49+
50+
type WriteMigrateTables []*WriteMigrateTable
51+
7152
func (m WriteMigrateTables) Exists(tableName string) bool {
72-
for _, table := range m {
73-
if table.Table.Name == tableName {
74-
return true
75-
}
53+
return slices.ContainsFunc(m, func(msg *WriteMigrateTable) bool {
54+
return msg.Table.Name == tableName
55+
})
56+
}
57+
58+
type WriteInsert struct {
59+
writeBaseMessage
60+
Record arrow.Record
61+
}
62+
63+
func (m *WriteInsert) GetTable() *schema.Table {
64+
table, err := schema.NewTableFromArrowSchema(m.Record.Schema())
65+
if err != nil {
66+
panic(err)
7667
}
77-
return false
68+
return table
7869
}
7970

71+
type WriteInserts []*WriteInsert
72+
8073
func (m WriteInserts) Exists(tableName string) bool {
81-
for _, insert := range m {
82-
md := insert.Record.Schema().Metadata()
83-
tableNameMeta, ok := md.GetValue(schema.MetadataTableName)
84-
if !ok {
85-
continue
86-
}
87-
if tableNameMeta == tableName {
88-
return true
89-
}
90-
}
91-
return false
74+
return slices.ContainsFunc(m, func(msg *WriteInsert) bool {
75+
tableNameMeta, ok := msg.Record.Schema().Metadata().GetValue(schema.MetadataTableName)
76+
return ok && tableNameMeta == tableName
77+
})
9278
}
9379

9480
func (m WriteInserts) GetRecordsForTable(table *schema.Table) []arrow.Record {
95-
res := []arrow.Record{}
81+
res := make([]arrow.Record, 0, len(m))
9682
for _, insert := range m {
97-
md := insert.Record.Schema().Metadata()
98-
tableNameMeta, ok := md.GetValue(schema.MetadataTableName)
99-
if !ok {
83+
tableNameMeta, ok := insert.Record.Schema().Metadata().GetValue(schema.MetadataTableName)
84+
if !ok || tableNameMeta != table.Name {
10085
continue
10186
}
102-
if tableNameMeta == table.Name {
103-
res = append(res, insert.Record)
104-
}
87+
res = append(res, insert.Record)
10588
}
106-
return res
89+
return slices.Clip(res)
10790
}
10891

109-
// DeleteStale is a pretty specific message which requires the destination to be aware of a CLI use-case
92+
// WriteDeleteStale is a pretty specific message which requires the destination to be aware of a CLI use-case
11093
// thus it might be deprecated in the future
11194
// in favour of MessageDelete or MessageRawQuery
112-
// The message indeciates that the destination needs to run something like "DELETE FROM table WHERE _cq_source_name=$1 and sync_time < $2"
95+
// The message indicates that the destination needs to run something like "DELETE FROM table WHERE _cq_source_name=$1 and sync_time < $2"
11396
type WriteDeleteStale struct {
11497
writeBaseMessage
11598
Table *schema.Table
@@ -120,3 +103,11 @@ type WriteDeleteStale struct {
120103
func (m WriteDeleteStale) GetTable() *schema.Table {
121104
return m.Table
122105
}
106+
107+
type WriteDeleteStales []*WriteDeleteStale
108+
109+
func (m WriteDeleteStales) Exists(tableName string) bool {
110+
return slices.ContainsFunc(m, func(msg *WriteDeleteStale) bool {
111+
return msg.Table.Name == tableName
112+
})
113+
}

writers/batchwriter/batchwriter.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
)
1515

1616
type Client interface {
17-
MigrateTables(context.Context, []*message.WriteMigrateTable) error
18-
WriteTableBatch(ctx context.Context, name string, msgs []*message.WriteInsert) error
19-
DeleteStale(context.Context, []*message.WriteDeleteStale) error
17+
MigrateTables(context.Context, message.WriteMigrateTables) error
18+
WriteTableBatch(ctx context.Context, name string, messages message.WriteInserts) error
19+
DeleteStale(context.Context, message.WriteDeleteStales) error
2020
}
2121

2222
type BatchWriter struct {
@@ -26,9 +26,9 @@ type BatchWriter struct {
2626
workersWaitGroup sync.WaitGroup
2727

2828
migrateTableLock sync.Mutex
29-
migrateTableMessages []*message.WriteMigrateTable
29+
migrateTableMessages message.WriteMigrateTables
3030
deleteStaleLock sync.Mutex
31-
deleteStaleMessages []*message.WriteDeleteStale
31+
deleteStaleMessages message.WriteDeleteStales
3232

3333
logger zerolog.Logger
3434
batchTimeout time.Duration

writers/batchwriter/batchwriter_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import (
1515

1616
type testBatchClient struct {
1717
mutex sync.Mutex
18-
migrateTables []*message.WriteMigrateTable
19-
inserts []*message.WriteInsert
20-
deleteStales []*message.WriteDeleteStale
18+
migrateTables message.WriteMigrateTables
19+
inserts message.WriteInserts
20+
deleteStales message.WriteDeleteStales
2121
}
2222

2323
func (c *testBatchClient) MigrateTablesLen() int {
@@ -38,23 +38,23 @@ func (c *testBatchClient) DeleteStalesLen() int {
3838
return len(c.deleteStales)
3939
}
4040

41-
func (c *testBatchClient) MigrateTables(_ context.Context, msgs []*message.WriteMigrateTable) error {
41+
func (c *testBatchClient) MigrateTables(_ context.Context, messages message.WriteMigrateTables) error {
4242
c.mutex.Lock()
4343
defer c.mutex.Unlock()
44-
c.migrateTables = append(c.migrateTables, msgs...)
44+
c.migrateTables = append(c.migrateTables, messages...)
4545
return nil
4646
}
4747

48-
func (c *testBatchClient) WriteTableBatch(_ context.Context, _ string, msgs []*message.WriteInsert) error {
48+
func (c *testBatchClient) WriteTableBatch(_ context.Context, _ string, messages message.WriteInserts) error {
4949
c.mutex.Lock()
5050
defer c.mutex.Unlock()
51-
c.inserts = append(c.inserts, msgs...)
51+
c.inserts = append(c.inserts, messages...)
5252
return nil
5353
}
54-
func (c *testBatchClient) DeleteStale(_ context.Context, msgs []*message.WriteDeleteStale) error {
54+
func (c *testBatchClient) DeleteStale(_ context.Context, messages message.WriteDeleteStales) error {
5555
c.mutex.Lock()
5656
defer c.mutex.Unlock()
57-
c.deleteStales = append(c.deleteStales, msgs...)
57+
c.deleteStales = append(c.deleteStales, messages...)
5858
return nil
5959
}
6060

writers/mixedbatchwriter/mixedbatchwriter.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111

1212
// Client is a client that will receive batches of messages with a mixture of tables.
1313
type Client interface {
14-
MigrateTableBatch(ctx context.Context, messages []*message.WriteMigrateTable) error
15-
InsertBatch(ctx context.Context, messages []*message.WriteInsert) error
16-
DeleteStaleBatch(ctx context.Context, messages []*message.WriteDeleteStale) error
14+
MigrateTableBatch(ctx context.Context, messages message.WriteMigrateTables) error
15+
InsertBatch(ctx context.Context, messages message.WriteInserts) error
16+
DeleteStaleBatch(ctx context.Context, messages message.WriteDeleteStales) error
1717
}
1818

1919
type MixedBatchWriter struct {
@@ -61,7 +61,7 @@ func New(client Client, opts ...Option) (*MixedBatchWriter, error) {
6161

6262
// Write starts listening for messages on the msgChan channel and writes them to the client in batches.
6363
func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.WriteMessage) error {
64-
migrateTable := &batchManager[*message.WriteMigrateTable]{
64+
migrateTable := &batchManager[message.WriteMigrateTables, *message.WriteMigrateTable]{
6565
batch: make([]*message.WriteMigrateTable, 0, w.batchSize),
6666
writeFunc: w.client.MigrateTableBatch,
6767
}
@@ -70,7 +70,7 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
7070
writeFunc: w.client.InsertBatch,
7171
maxBatchSizeBytes: int64(w.batchSizeBytes),
7272
}
73-
deleteStale := &batchManager[*message.WriteDeleteStale]{
73+
deleteStale := &batchManager[message.WriteDeleteStales, *message.WriteDeleteStale]{
7474
batch: make([]*message.WriteDeleteStale, 0, w.batchSize),
7575
writeFunc: w.client.DeleteStaleBatch,
7676
}
@@ -117,12 +117,12 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
117117
}
118118

119119
// generic batch manager for most message types
120-
type batchManager[T message.WriteMessage] struct {
120+
type batchManager[A ~[]T, T message.WriteMessage] struct {
121121
batch []T
122-
writeFunc func(ctx context.Context, messages []T) error
122+
writeFunc func(ctx context.Context, messages A) error
123123
}
124124

125-
func (m *batchManager[T]) append(ctx context.Context, msg T) error {
125+
func (m *batchManager[A, T]) append(ctx context.Context, msg T) error {
126126
if len(m.batch) == cap(m.batch) {
127127
if err := m.flush(ctx); err != nil {
128128
return err
@@ -132,7 +132,7 @@ func (m *batchManager[T]) append(ctx context.Context, msg T) error {
132132
return nil
133133
}
134134

135-
func (m *batchManager[T]) flush(ctx context.Context) error {
135+
func (m *batchManager[A, T]) flush(ctx context.Context) error {
136136
if len(m.batch) == 0 {
137137
return nil
138138
}
@@ -148,7 +148,7 @@ func (m *batchManager[T]) flush(ctx context.Context) error {
148148
// special batch manager for insert messages that also keeps track of the total size of the batch
149149
type insertBatchManager struct {
150150
batch []*message.WriteInsert
151-
writeFunc func(ctx context.Context, messages []*message.WriteInsert) error
151+
writeFunc func(ctx context.Context, messages message.WriteInserts) error
152152
curBatchSizeBytes int64
153153
maxBatchSizeBytes int64
154154
}

writers/mixedbatchwriter/mixedbatchwriter_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,27 @@ type testMixedBatchClient struct {
1717
receivedBatches [][]message.WriteMessage
1818
}
1919

20-
func (c *testMixedBatchClient) MigrateTableBatch(_ context.Context, msgs []*message.WriteMigrateTable) error {
21-
m := make([]message.WriteMessage, len(msgs))
22-
for i, msg := range msgs {
20+
func (c *testMixedBatchClient) MigrateTableBatch(_ context.Context, messages message.WriteMigrateTables) error {
21+
m := make([]message.WriteMessage, len(messages))
22+
for i, msg := range messages {
2323
m[i] = msg
2424
}
2525
c.receivedBatches = append(c.receivedBatches, m)
2626
return nil
2727
}
2828

29-
func (c *testMixedBatchClient) InsertBatch(_ context.Context, msgs []*message.WriteInsert) error {
30-
m := make([]message.WriteMessage, len(msgs))
31-
for i, msg := range msgs {
29+
func (c *testMixedBatchClient) InsertBatch(_ context.Context, messages message.WriteInserts) error {
30+
m := make([]message.WriteMessage, len(messages))
31+
for i, msg := range messages {
3232
m[i] = msg
3333
}
3434
c.receivedBatches = append(c.receivedBatches, m)
3535
return nil
3636
}
3737

38-
func (c *testMixedBatchClient) DeleteStaleBatch(_ context.Context, msgs []*message.WriteDeleteStale) error {
39-
m := make([]message.WriteMessage, len(msgs))
40-
for i, msg := range msgs {
38+
func (c *testMixedBatchClient) DeleteStaleBatch(_ context.Context, messages message.WriteDeleteStales) error {
39+
m := make([]message.WriteMessage, len(messages))
40+
for i, msg := range messages {
4141
m[i] = msg
4242
}
4343
c.receivedBatches = append(c.receivedBatches, m)

0 commit comments

Comments
 (0)