Skip to content

Commit 826e816

Browse files
authored
fix(writers): Move to sub packages (#1011)
For convenient `Option`s etc. Also adds default batch size values to the streamingbatchwriter and some compile-time interface checks.
1 parent b1e2bd4 commit 826e816

File tree

8 files changed

+118
-78
lines changed

8 files changed

+118
-78
lines changed

writers/batch.go renamed to writers/batchwriter/batchwriter.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package writers
1+
package batchwriter
22

33
import (
44
"context"
@@ -9,27 +9,18 @@ import (
99
"github.com/apache/arrow/go/v13/arrow/util"
1010
"github.com/cloudquery/plugin-sdk/v4/message"
1111
"github.com/cloudquery/plugin-sdk/v4/schema"
12+
"github.com/cloudquery/plugin-sdk/v4/writers"
1213
"github.com/rs/zerolog"
1314
)
1415

15-
type Writer interface {
16-
Write(ctx context.Context, res <-chan message.WriteMessage) error
17-
}
18-
19-
const (
20-
DefaultBatchTimeoutSeconds = 20
21-
DefaultBatchSize = 10000
22-
DefaultBatchSizeBytes = 5 * 1024 * 1024 // 5 MiB
23-
)
24-
25-
type BatchWriterClient interface {
16+
type Client interface {
2617
MigrateTables(context.Context, []*message.WriteMigrateTable) error
2718
WriteTableBatch(ctx context.Context, name string, msgs []*message.WriteInsert) error
2819
DeleteStale(context.Context, []*message.WriteDeleteStale) error
2920
}
3021

3122
type BatchWriter struct {
32-
client BatchWriterClient
23+
client Client
3324
workers map[string]*worker
3425
workersLock sync.RWMutex
3526
workersWaitGroup sync.WaitGroup
@@ -45,6 +36,9 @@ type BatchWriter struct {
4536
batchSizeBytes int
4637
}
4738

39+
// Assert at compile-time that BatchWriter implements the Writer interface
40+
var _ writers.Writer = (*BatchWriter)(nil)
41+
4842
type Option func(*BatchWriter)
4943

5044
func WithLogger(logger zerolog.Logger) Option {
@@ -77,14 +71,14 @@ type worker struct {
7771
flush chan chan bool
7872
}
7973

80-
func NewBatchWriter(client BatchWriterClient, opts ...Option) (*BatchWriter, error) {
74+
func New(client Client, opts ...Option) (*BatchWriter, error) {
8175
c := &BatchWriter{
8276
client: client,
8377
workers: make(map[string]*worker),
8478
logger: zerolog.Nop(),
85-
batchTimeout: DefaultBatchTimeoutSeconds * time.Second,
86-
batchSize: DefaultBatchSize,
87-
batchSizeBytes: DefaultBatchSizeBytes,
79+
batchTimeout: writers.DefaultBatchTimeoutSeconds * time.Second,
80+
batchSize: writers.DefaultBatchSize,
81+
batchSizeBytes: writers.DefaultBatchSizeBytes,
8882
}
8983
for _, opt := range opts {
9084
opt(c)

writers/batch_test.go renamed to writers/batchwriter/batchwriter_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package writers
1+
package batchwriter
22

33
import (
44
"context"
@@ -76,7 +76,7 @@ func TestBatchFlushDifferentMessages(t *testing.T) {
7676
ctx := context.Background()
7777

7878
testClient := &testBatchClient{}
79-
wr, err := NewBatchWriter(testClient)
79+
wr, err := New(testClient)
8080
if err != nil {
8181
t.Fatal(err)
8282
}
@@ -117,7 +117,7 @@ func TestBatchSize(t *testing.T) {
117117
ctx := context.Background()
118118

119119
testClient := &testBatchClient{}
120-
wr, err := NewBatchWriter(testClient, WithBatchSize(2))
120+
wr, err := New(testClient, WithBatchSize(2))
121121
if err != nil {
122122
t.Fatal(err)
123123
}
@@ -155,7 +155,7 @@ func TestBatchTimeout(t *testing.T) {
155155
ctx := context.Background()
156156

157157
testClient := &testBatchClient{}
158-
wr, err := NewBatchWriter(testClient, WithBatchTimeout(time.Second))
158+
wr, err := New(testClient, WithBatchTimeout(time.Second))
159159
if err != nil {
160160
t.Fatal(err)
161161
}
@@ -190,7 +190,7 @@ func TestBatchUpserts(t *testing.T) {
190190
ctx := context.Background()
191191

192192
testClient := &testBatchClient{}
193-
wr, err := NewBatchWriter(testClient, WithBatchSize(2), WithBatchTimeout(time.Second))
193+
wr, err := New(testClient, WithBatchSize(2), WithBatchTimeout(time.Second))
194194
if err != nil {
195195
t.Fatal(err)
196196
}

writers/mixed_batch.go renamed to writers/mixedbatchwriter/mixedbatchwriter.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,66 @@
1-
package writers
1+
package mixedbatchwriter
22

33
import (
44
"context"
55
"time"
66

77
"github.com/apache/arrow/go/v13/arrow/util"
88
"github.com/cloudquery/plugin-sdk/v4/message"
9+
"github.com/cloudquery/plugin-sdk/v4/writers"
910
"github.com/rs/zerolog"
1011
)
1112

12-
// MixedBatchClient is a client that will receive batches of messages with a mixture of tables.
13-
type MixedBatchClient interface {
13+
// Client is a client that will receive batches of messages with a mixture of tables.
14+
type Client interface {
1415
MigrateTableBatch(ctx context.Context, messages []*message.WriteMigrateTable) error
1516
InsertBatch(ctx context.Context, messages []*message.WriteInsert) error
1617
DeleteStaleBatch(ctx context.Context, messages []*message.WriteDeleteStale) error
1718
}
1819

1920
type MixedBatchWriter struct {
20-
client MixedBatchClient
21+
client Client
2122
logger zerolog.Logger
2223
batchTimeout time.Duration
2324
batchSize int
2425
batchSizeBytes int
2526
}
2627

2728
// Assert at compile-time that MixedBatchWriter implements the Writer interface
28-
var _ Writer = (*MixedBatchWriter)(nil)
29+
var _ writers.Writer = (*MixedBatchWriter)(nil)
2930

30-
type MixedBatchWriterOption func(writer *MixedBatchWriter)
31+
type Option func(writer *MixedBatchWriter)
3132

32-
func WithMixedBatchWriterLogger(logger zerolog.Logger) MixedBatchWriterOption {
33+
func WithLogger(logger zerolog.Logger) Option {
3334
return func(p *MixedBatchWriter) {
3435
p.logger = logger
3536
}
3637
}
3738

38-
func WithMixedBatchWriterBatchTimeout(timeout time.Duration) MixedBatchWriterOption {
39+
func WithBatchTimeout(timeout time.Duration) Option {
3940
return func(p *MixedBatchWriter) {
4041
p.batchTimeout = timeout
4142
}
4243
}
4344

44-
func WithMixedBatchWriterBatchSize(size int) MixedBatchWriterOption {
45+
func WithBatchSize(size int) Option {
4546
return func(p *MixedBatchWriter) {
4647
p.batchSize = size
4748
}
4849
}
4950

50-
func WithMixedBatchWriterBatchSizeBytes(size int) MixedBatchWriterOption {
51+
func WithBatchSizeBytes(size int) Option {
5152
return func(p *MixedBatchWriter) {
5253
p.batchSizeBytes = size
5354
}
5455
}
5556

56-
func NewMixedBatchWriter(client MixedBatchClient, opts ...MixedBatchWriterOption) (*MixedBatchWriter, error) {
57+
func New(client Client, opts ...Option) (*MixedBatchWriter, error) {
5758
c := &MixedBatchWriter{
5859
client: client,
5960
logger: zerolog.Nop(),
60-
batchTimeout: DefaultBatchTimeoutSeconds * time.Second,
61-
batchSize: DefaultBatchSize,
62-
batchSizeBytes: DefaultBatchSizeBytes,
61+
batchTimeout: writers.DefaultBatchTimeoutSeconds * time.Second,
62+
batchSize: writers.DefaultBatchSize,
63+
batchSizeBytes: writers.DefaultBatchSizeBytes,
6364
}
6465
for _, opt := range opts {
6566
opt(c)
@@ -82,23 +83,23 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
8283
batch: make([]*message.WriteDeleteStale, 0, w.batchSize),
8384
writeFunc: w.client.DeleteStaleBatch,
8485
}
85-
flush := func(msgType msgType) error {
86+
flush := func(msgType writers.MsgType) error {
8687
switch msgType {
87-
case msgTypeMigrateTable:
88+
case writers.MsgTypeMigrateTable:
8889
return migrateTable.flush(ctx)
89-
case msgTypeInsert:
90+
case writers.MsgTypeInsert:
9091
return insert.flush(ctx)
91-
case msgTypeDeleteStale:
92+
case writers.MsgTypeDeleteStale:
9293
return deleteStale.flush(ctx)
9394
default:
9495
panic("unknown message type")
9596
}
9697
}
97-
prevMsgType := msgTypeUnset
98+
prevMsgType := writers.MsgTypeUnset
9899
var err error
99100
for msg := range msgChan {
100-
msgType := msgID(msg)
101-
if prevMsgType != msgTypeUnset && prevMsgType != msgType {
101+
msgType := writers.MsgID(msg)
102+
if prevMsgType != writers.MsgTypeUnset && prevMsgType != msgType {
102103
if err := flush(prevMsgType); err != nil {
103104
return err
104105
}
@@ -118,7 +119,7 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
118119
return err
119120
}
120121
}
121-
if prevMsgType == msgTypeUnset {
122+
if prevMsgType == writers.MsgTypeUnset {
122123
return nil
123124
}
124125
return flush(prevMsgType)

writers/mixed_batch_test.go renamed to writers/mixedbatchwriter/mixedbatchwriter_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package writers
1+
package mixedbatchwriter_test
22

33
import (
44
"context"
@@ -10,6 +10,7 @@ import (
1010
"github.com/apache/arrow/go/v13/arrow/memory"
1111
"github.com/cloudquery/plugin-sdk/v4/message"
1212
"github.com/cloudquery/plugin-sdk/v4/schema"
13+
"github.com/cloudquery/plugin-sdk/v4/writers/mixedbatchwriter"
1314
)
1415

1516
type testMixedBatchClient struct {
@@ -43,7 +44,7 @@ func (c *testMixedBatchClient) DeleteStaleBatch(_ context.Context, msgs []*messa
4344
return nil
4445
}
4546

46-
var _ MixedBatchClient = (*testMixedBatchClient)(nil)
47+
var _ mixedbatchwriter.Client = (*testMixedBatchClient)(nil)
4748

4849
func TestMixedBatchWriter(t *testing.T) {
4950
ctx := context.Background()
@@ -169,7 +170,7 @@ func TestMixedBatchWriter(t *testing.T) {
169170
client := &testMixedBatchClient{
170171
receivedBatches: make([][]message.WriteMessage, 0),
171172
}
172-
wr, err := NewMixedBatchWriter(client)
173+
wr, err := mixedbatchwriter.New(client)
173174
if err != nil {
174175
t.Fatal(err)
175176
}

writers/msgtype.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ import (
66
"github.com/cloudquery/plugin-sdk/v4/message"
77
)
88

9-
type msgType int
9+
type MsgType int
1010

1111
const (
12-
msgTypeUnset msgType = iota
13-
msgTypeMigrateTable
14-
msgTypeInsert
15-
msgTypeDeleteStale
12+
MsgTypeUnset MsgType = iota
13+
MsgTypeMigrateTable
14+
MsgTypeInsert
15+
MsgTypeDeleteStale
1616
)
1717

18-
func msgID(msg message.WriteMessage) msgType {
18+
func MsgID(msg message.WriteMessage) MsgType {
1919
switch msg.(type) {
2020
case *message.WriteMigrateTable:
21-
return msgTypeMigrateTable
21+
return MsgTypeMigrateTable
2222
case *message.WriteInsert:
23-
return msgTypeInsert
23+
return MsgTypeInsert
2424
case *message.WriteDeleteStale:
25-
return msgTypeDeleteStale
25+
return MsgTypeDeleteStale
2626
}
2727
panic("unknown message type: " + reflect.TypeOf(msg).Name())
2828
}

0 commit comments

Comments
 (0)