Skip to content

Commit 09fb4ce

Browse files
authored
feat: Implement plugin Read (#1027)
#1026 should be merged first
1 parent abe2557 commit 09fb4ce

File tree

4 files changed

+58
-12
lines changed

4 files changed

+58
-12
lines changed

internal/clients/state/v3/state.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,30 +72,23 @@ func NewClient(ctx context.Context, pbClient pb.PluginClient, tableName string)
7272
return nil, err
7373
}
7474

75-
syncClient, err := c.client.Sync(ctx, &pb.Sync_Request{
76-
Tables: []string{tableName},
75+
readClient, err := c.client.Read(ctx, &pb.Read_Request{
76+
Table: tableBytes,
7777
})
7878
if err != nil {
7979
return nil, err
8080
}
8181
c.mutex.Lock()
8282
defer c.mutex.Unlock()
8383
for {
84-
res, err := syncClient.Recv()
84+
res, err := readClient.Recv()
8585
if err != nil {
8686
if err == io.EOF {
8787
break
8888
}
8989
return nil, err
9090
}
91-
var insertMessage *pb.Sync_Response_Insert
92-
switch m := res.Message.(type) {
93-
case *pb.Sync_Response_MigrateTable:
94-
continue
95-
case *pb.Sync_Response_Insert:
96-
insertMessage = m
97-
}
98-
rdr, err := ipc.NewReader(bytes.NewReader(insertMessage.Insert.Record))
91+
rdr, err := ipc.NewReader(bytes.NewReader(res.Record))
9992
if err != nil {
10093
return nil, err
10194
}

internal/servers/plugin/v3/plugin.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77

8+
"github.com/apache/arrow/go/v13/arrow"
89
pb "github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
910
"github.com/cloudquery/plugin-sdk/v4/message"
1011
"github.com/cloudquery/plugin-sdk/v4/plugin"
@@ -63,6 +64,43 @@ func (s *Server) Init(ctx context.Context, req *pb.Init_Request) (*pb.Init_Respo
6364
return &pb.Init_Response{}, nil
6465
}
6566

67+
func (s *Server) Read(req *pb.Read_Request, stream pb.Plugin_ReadServer) error {
68+
records := make(chan arrow.Record)
69+
var syncErr error
70+
ctx := stream.Context()
71+
72+
sc, err := pb.NewSchemaFromBytes(req.Table)
73+
if err != nil {
74+
return status.Errorf(codes.InvalidArgument, "failed to create schema from bytes: %v", err)
75+
}
76+
table, err := schema.NewTableFromArrowSchema(sc)
77+
if err != nil {
78+
return status.Errorf(codes.InvalidArgument, "failed to create table from schema: %v", err)
79+
}
80+
go func() {
81+
defer close(records)
82+
err := s.Plugin.Read(ctx, table, records)
83+
if err != nil {
84+
syncErr = fmt.Errorf("failed to sync records: %w", err)
85+
}
86+
}()
87+
88+
for rec := range records {
89+
recBytes, err := pb.RecordToBytes(rec)
90+
if err != nil {
91+
return status.Errorf(codes.Internal, "failed to convert record to bytes: %v", err)
92+
}
93+
resp := &pb.Read_Response{
94+
Record: recBytes,
95+
}
96+
if err := stream.Send(resp); err != nil {
97+
return status.Errorf(codes.Internal, "failed to send read response: %v", err)
98+
}
99+
}
100+
101+
return syncErr
102+
}
103+
66104
func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
67105
msgs := make(chan message.SyncMessage)
68106
var syncErr error

plugin/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (UnimplementedDestination) Write(context.Context, <-chan message.WriteMessa
2727
}
2828

2929
func (UnimplementedDestination) Read(context.Context, *schema.Table, chan<- arrow.Record) error {
30-
return fmt.Errorf("not implemented")
30+
return ErrNotImplemented
3131
}
3232

3333
type UnimplementedSource struct{}

plugin/plugin_destination.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,18 @@ func (p *Plugin) Write(ctx context.Context, res <-chan message.WriteMessage) err
3737
}
3838
return p.client.Write(ctx, res)
3939
}
40+
41+
// Read is read data from the requested table to the given channel, returned in the same format as the table
42+
func (p *Plugin) Read(ctx context.Context, table *schema.Table, res chan<- arrow.Record) error {
43+
if !p.mu.TryLock() {
44+
return fmt.Errorf("plugin already in use")
45+
}
46+
defer p.mu.Unlock()
47+
if p.client == nil {
48+
return fmt.Errorf("plugin not initialized. call Init() first")
49+
}
50+
if err := p.client.Read(ctx, table, res); err != nil {
51+
return fmt.Errorf("failed to read: %w", err)
52+
}
53+
return nil
54+
}

0 commit comments

Comments
 (0)