Skip to content

Commit a49abcb

Browse files
authored
feat: Support sources in SDK V3 (#864)
Few notes: - Most of the code here was removed in #854 to prevent accidental use before it was ready - Sources moved to use the new `schema.Table` which means use of `arrow.DataType` instead of our old `ValueType` - Protocol in sources and destinations are upgraded to use native arrow format (sources v2 and dest v1) - https://github.com/cloudquery/plugin-pb-go - Introduces a new `scalar` package which is mostly used by our "managed" sources and basically our old cqtypes just now using arrow types (which also support nesting). This will hopefully go upstream one day but I don't think we should block on that as the upstream package is not a fit right now and will take time for such refactor to make it upstream. Follow-up PRs with example sources and dest will follow soon. IMPORTANT: all destinations should be released first as we decided new sources will work only with new destination to avoid un-necessary backward compatibility maintenance. Example PRs: - PostgreSQL: cloudquery/cloudquery#10783 - GCP: cloudquery/cloudquery#10784 - CLI: cloudquery/cloudquery#10785
1 parent 56c0e84 commit a49abcb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+6667
-37
lines changed

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.19
44

55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
7+
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
78
github.com/cloudquery/plugin-pb-go v1.0.8
89
github.com/cloudquery/plugin-sdk/v2 v2.7.0
910
github.com/getsentry/sentry-go v0.20.0
@@ -18,6 +19,7 @@ require (
1819
github.com/stretchr/testify v1.8.2
1920
github.com/thoas/go-funk v0.9.3
2021
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
22+
golang.org/x/net v0.9.0
2123
golang.org/x/sync v0.1.0
2224
golang.org/x/text v0.9.0
2325
google.golang.org/grpc v1.54.0
@@ -42,11 +44,11 @@ require (
4244
github.com/mattn/go-isatty v0.0.18 // indirect
4345
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
4446
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
47+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
4548
github.com/pmezard/go-difflib v1.0.0 // indirect
4649
github.com/spf13/pflag v1.0.5 // indirect
4750
github.com/zeebo/xxh3 v1.0.2 // indirect
4851
golang.org/x/mod v0.8.0 // indirect
49-
golang.org/x/net v0.9.0 // indirect
5052
golang.org/x/sys v0.7.0 // indirect
5153
golang.org/x/tools v0.6.0 // indirect
5254
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
3838
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
3939
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
4040
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
41+
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
42+
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
4143
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
4244
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
4345
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@@ -174,6 +176,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
174176
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
175177
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
176178
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
179+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
177180
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
178181
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
179182
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -196,10 +199,12 @@ github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUq
196199
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
197200
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
198201
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
202+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
199203
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
200204
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
201205
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
202206
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
207+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
203208
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
204209
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
205210
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package destination
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
10+
"github.com/apache/arrow/go/v13/arrow"
11+
"github.com/apache/arrow/go/v13/arrow/ipc"
12+
pb "github.com/cloudquery/plugin-pb-go/pb/destination/v1"
13+
"github.com/cloudquery/plugin-pb-go/specs"
14+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
15+
"github.com/cloudquery/plugin-sdk/v3/schema"
16+
"github.com/rs/zerolog"
17+
"golang.org/x/sync/errgroup"
18+
"google.golang.org/grpc/codes"
19+
"google.golang.org/grpc/status"
20+
)
21+
22+
type Server struct {
23+
pb.UnimplementedDestinationServer
24+
Plugin *destination.Plugin
25+
Logger zerolog.Logger
26+
spec specs.Destination
27+
}
28+
29+
func (s *Server) Configure(ctx context.Context, req *pb.Configure_Request) (*pb.Configure_Response, error) {
30+
var spec specs.Destination
31+
if err := json.Unmarshal(req.Config, &spec); err != nil {
32+
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal spec: %v", err)
33+
}
34+
s.spec = spec
35+
return &pb.Configure_Response{}, s.Plugin.Init(ctx, s.Logger, spec)
36+
}
37+
38+
func (s *Server) GetName(context.Context, *pb.GetName_Request) (*pb.GetName_Response, error) {
39+
return &pb.GetName_Response{
40+
Name: s.Plugin.Name(),
41+
}, nil
42+
}
43+
44+
func (s *Server) GetVersion(context.Context, *pb.GetVersion_Request) (*pb.GetVersion_Response, error) {
45+
return &pb.GetVersion_Response{
46+
Version: s.Plugin.Version(),
47+
}, nil
48+
}
49+
50+
func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migrate_Response, error) {
51+
schemas, err := schema.NewSchemasFromBytes(req.Tables)
52+
if err != nil {
53+
return nil, status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
54+
}
55+
tables, err := schema.NewTablesFromArrowSchemas(schemas)
56+
if err != nil {
57+
return nil, status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
58+
}
59+
s.setPKsForTables(tables)
60+
61+
return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables)
62+
}
63+
64+
// Note the order of operations in this method is important!
65+
// Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock.
66+
func (s *Server) Write(msg pb.Destination_WriteServer) error {
67+
resources := make(chan arrow.Record)
68+
69+
r, err := msg.Recv()
70+
if err != nil {
71+
if err == io.EOF {
72+
return msg.SendAndClose(&pb.Write_Response{})
73+
}
74+
return status.Errorf(codes.Internal, "failed to receive msg: %v", err)
75+
}
76+
77+
schemas, err := schema.NewSchemasFromBytes(r.Tables)
78+
if err != nil {
79+
return status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
80+
}
81+
tables, err := schema.NewTablesFromArrowSchemas(schemas)
82+
if err != nil {
83+
return status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
84+
}
85+
var sourceSpec specs.Source
86+
if r.SourceSpec == nil {
87+
// this is for backward compatibility
88+
sourceSpec = specs.Source{
89+
Name: r.Source,
90+
}
91+
} else {
92+
if err := json.Unmarshal(r.SourceSpec, &sourceSpec); err != nil {
93+
return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err)
94+
}
95+
}
96+
syncTime := r.Timestamp.AsTime()
97+
s.setPKsForTables(tables)
98+
eg, ctx := errgroup.WithContext(msg.Context())
99+
eg.Go(func() error {
100+
return s.Plugin.Write(ctx, sourceSpec, tables, syncTime, resources)
101+
})
102+
103+
for {
104+
r, err := msg.Recv()
105+
if err == io.EOF {
106+
close(resources)
107+
if err := eg.Wait(); err != nil {
108+
return status.Errorf(codes.Internal, "write failed: %v", err)
109+
}
110+
return msg.SendAndClose(&pb.Write_Response{})
111+
}
112+
if err != nil {
113+
close(resources)
114+
if wgErr := eg.Wait(); wgErr != nil {
115+
return status.Errorf(codes.Internal, "failed to receive msg: %v and write failed: %v", err, wgErr)
116+
}
117+
return status.Errorf(codes.Internal, "failed to receive msg: %v", err)
118+
}
119+
rdr, err := ipc.NewReader(bytes.NewReader(r.Resource))
120+
if err != nil {
121+
close(resources)
122+
if wgErr := eg.Wait(); wgErr != nil {
123+
return status.Errorf(codes.InvalidArgument, "failed to create reader: %v and write failed: %v", err, wgErr)
124+
}
125+
return status.Errorf(codes.InvalidArgument, "failed to create reader: %v", err)
126+
}
127+
for rdr.Next() {
128+
rec := rdr.Record()
129+
rec.Retain()
130+
select {
131+
case resources <- rec:
132+
case <-ctx.Done():
133+
close(resources)
134+
if err := eg.Wait(); err != nil {
135+
return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err)
136+
}
137+
return status.Errorf(codes.Internal, "Context done: %v", ctx.Err())
138+
}
139+
}
140+
if err := rdr.Err(); err != nil {
141+
return status.Errorf(codes.InvalidArgument, "failed to read resource: %v", err)
142+
}
143+
}
144+
}
145+
146+
func setCQIDAsPrimaryKeysForTables(tables schema.Tables) {
147+
for _, table := range tables {
148+
for i, col := range table.Columns {
149+
table.Columns[i].PrimaryKey = col.Name == schema.CqIDColumn.Name
150+
}
151+
setCQIDAsPrimaryKeysForTables(table.Relations)
152+
}
153+
}
154+
155+
func (s *Server) GetMetrics(context.Context, *pb.GetDestinationMetrics_Request) (*pb.GetDestinationMetrics_Response, error) {
156+
stats := s.Plugin.Metrics()
157+
b, err := json.Marshal(stats)
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to marshal stats: %w", err)
160+
}
161+
return &pb.GetDestinationMetrics_Response{
162+
Metrics: b,
163+
}, nil
164+
}
165+
166+
func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (*pb.DeleteStale_Response, error) {
167+
schemas, err := schema.NewSchemasFromBytes(req.Tables)
168+
if err != nil {
169+
return nil, status.Errorf(codes.InvalidArgument, "failed to create schemas: %v", err)
170+
}
171+
tables, err := schema.NewTablesFromArrowSchemas(schemas)
172+
if err != nil {
173+
return nil, status.Errorf(codes.InvalidArgument, "failed to create tables: %v", err)
174+
}
175+
176+
if err := s.Plugin.DeleteStale(ctx, tables, req.Source, req.Timestamp.AsTime()); err != nil {
177+
return nil, err
178+
}
179+
180+
return &pb.DeleteStale_Response{}, nil
181+
}
182+
183+
func (s *Server) setPKsForTables(tables schema.Tables) {
184+
if s.spec.PKMode == specs.PKModeCQID {
185+
setCQIDAsPrimaryKeysForTables(tables)
186+
}
187+
}
188+
189+
func (s *Server) Close(ctx context.Context, _ *pb.Close_Request) (*pb.Close_Response, error) {
190+
return &pb.Close_Response{}, s.Plugin.Close(ctx)
191+
}

0 commit comments

Comments
 (0)