Skip to content

Commit 5ceaad4

Browse files
feat: CloudQuery v2 (#4)
* feat: Remove connection spec * fix SourceSpec * working on destination plugin * added bunch of tests * remove limit from sdk * Added more tests * remove dead code * fix some linters * feat: Using json everywhere apart from yaml for the user We are taking a similar approach to k8s to use json internally for marshalling/unmarshalling and yaml only for using facing stuff. yaml parsers are much more complex and also have tons of vulnerabilities so we want to use json everywhere where there is a machine reading those configurations. * more work around configuration * tests working again * Implement code generation helpers * improvments to codegen * fix: dont override nil values * more fixes to base templates * more codegen improvments * feat: Support timestamp columns (#12) * Always generate path resolver (#14) * fix: NewTableFromStruct pointer to time.Time (#15) * feat: Add WithDescriptionEnabled to NewTableFromStruct I suggest not to use it as it is just duplicated bloat and it is super slow * Add table descriptions (#17) * detect duplicate columns on start * fixed golang-lint * run gci * more linting Co-authored-by: Herman Schaaf <[email protected]>
1 parent 0c82873 commit 5ceaad4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2005
-1491
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ config.hcl
66

77
.vscode
88
vendor
9+
cover.out

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ test:
66
lint:
77
golangci-lint run
88

9-
.PHONY: generate-protobuf
10-
generate-protobuf:
9+
.PHONY: gen-proto
10+
gen-proto:
1111
protoc --proto_path=. --go_out . --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pb/base.proto internal/pb/source.proto internal/pb/destination.proto

clients/destination.go

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package clients
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67

78
"github.com/cloudquery/plugin-sdk/internal/pb"
89
"github.com/cloudquery/plugin-sdk/plugins"
910
"github.com/cloudquery/plugin-sdk/schema"
1011
"github.com/cloudquery/plugin-sdk/specs"
11-
"github.com/vmihailenco/msgpack/v5"
1212
"google.golang.org/grpc"
13-
"gopkg.in/yaml.v3"
1413
)
1514

1615
type DestinationClient struct {
@@ -31,23 +30,9 @@ func NewLocalDestinationClient(p plugins.DestinationPlugin) *DestinationClient {
3130
}
3231
}
3332

34-
func (c *DestinationClient) Configure(ctx context.Context, s specs.DestinationSpec) error {
35-
if c.localClient != nil {
36-
return c.localClient.Configure(ctx, s)
37-
}
38-
b, err := yaml.Marshal(s)
39-
if err != nil {
40-
return fmt.Errorf("failed to marshal spec: %w", err)
41-
}
42-
if _, err := c.pbClient.Configure(ctx, &pb.Configure_Request{Config: b}); err != nil {
43-
return err
44-
}
45-
return nil
46-
}
47-
4833
func (c *DestinationClient) GetExampleConfig(ctx context.Context) (string, error) {
4934
if c.localClient != nil {
50-
return c.localClient.GetExampleConfig(ctx), nil
35+
return c.localClient.ExampleConfig(), nil
5136
}
5237
res, err := c.pbClient.GetExampleConfig(ctx, &pb.GetExampleConfig_Request{})
5338
if err != nil {
@@ -56,42 +41,52 @@ func (c *DestinationClient) GetExampleConfig(ctx context.Context) (string, error
5641
return res.Config, nil
5742
}
5843

59-
func (c *DestinationClient) Save(ctx context.Context, msg *FetchResultMessage) error {
60-
var saveClient pb.Destination_SaveClient
61-
var err error
62-
if c.pbClient != nil {
63-
saveClient, err = c.pbClient.Save(ctx)
64-
if err != nil {
65-
return fmt.Errorf("failed to create save client: %w", err)
66-
}
67-
}
44+
func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destination) error {
6845
if c.localClient != nil {
69-
var resource schema.Resource
70-
if err := msgpack.Unmarshal(msg.Resource, &resource); err != nil {
71-
return fmt.Errorf("failed to unmarshal resources: %w", err)
72-
}
73-
if err := c.localClient.Save(ctx, []*schema.Resource{&resource}); err != nil {
74-
return fmt.Errorf("failed to save resources: %w", err)
75-
}
76-
} else {
77-
if err := saveClient.Send(&pb.Save_Request{Resources: msg.Resource}); err != nil {
78-
return err
79-
}
46+
return c.localClient.Initialize(ctx, spec)
47+
}
48+
b, err := json.Marshal(spec)
49+
if err != nil {
50+
return fmt.Errorf("destination configure: failed to marshal spec: %w", err)
51+
}
52+
_, err = c.pbClient.Configure(ctx, &pb.Configure_Request{
53+
Config: b,
54+
})
55+
if err != nil {
56+
return fmt.Errorf("destination configure: failed to configure: %w", err)
8057
}
81-
8258
return nil
8359
}
8460

85-
func (c *DestinationClient) CreateTables(ctx context.Context, tables []*schema.Table) error {
61+
func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table) error {
8662
if c.localClient != nil {
87-
return c.localClient.CreateTables(ctx, tables)
63+
return c.localClient.Migrate(ctx, tables)
64+
}
65+
b, err := json.Marshal(tables)
66+
if err != nil {
67+
return fmt.Errorf("destination migrate: failed to marshal plugin: %w", err)
8868
}
89-
b, err := yaml.Marshal(tables)
69+
_, err = c.pbClient.Migrate(ctx, &pb.Migrate_Request{Tables: b})
9070
if err != nil {
91-
return fmt.Errorf("failed to marshal tables: %w", err)
71+
return fmt.Errorf("destination migrate: failed to migrate: %w", err)
9272
}
93-
if _, err := c.pbClient.CreateTables(ctx, &pb.CreateTables_Request{Tables: b}); err != nil {
94-
return err
73+
return nil
74+
}
75+
76+
func (c *DestinationClient) Write(ctx context.Context, table string, data map[string]interface{}) error {
77+
// var saveClient pb.Destination_SaveClient
78+
// var err error
79+
// if c.pbClient != nil {
80+
// saveClient, err = c.pbClient.Write(ctx)
81+
// if err != nil {
82+
// return fmt.Errorf("failed to create save client: %w", err)
83+
// }
84+
// }
85+
if c.localClient != nil {
86+
if err := c.localClient.Write(ctx, table, data); err != nil {
87+
return fmt.Errorf("failed to save resources: %w", err)
88+
}
9589
}
90+
9691
return nil
9792
}

clients/source.go

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,15 @@
33
package clients
44

55
import (
6-
"bytes"
76
"context"
7+
"encoding/json"
88
"fmt"
99
"io"
10-
"text/template"
1110

1211
"github.com/cloudquery/plugin-sdk/internal/pb"
1312
"github.com/cloudquery/plugin-sdk/schema"
1413
"github.com/cloudquery/plugin-sdk/specs"
15-
"github.com/pkg/errors"
16-
"github.com/vmihailenco/msgpack/v5"
17-
"github.com/xeipuuv/gojsonschema"
1814
"google.golang.org/grpc"
19-
"gopkg.in/yaml.v3"
2015
)
2116

2217
type SourceClient struct {
@@ -27,14 +22,6 @@ type FetchResultMessage struct {
2722
Resource []byte
2823
}
2924

30-
const sourcePluginExampleConfigTemplate = `kind: source
31-
spec:
32-
name: {{.Name}}
33-
version: {{.Version}}
34-
configuration:
35-
{{.PluginExampleConfig | indent 4}}
36-
`
37-
3825
func NewSourceClient(cc grpc.ClientConnInterface) *SourceClient {
3926
return &SourceClient{
4027
pbClient: pb.NewSourceClient(cc),
@@ -47,52 +34,30 @@ func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) {
4734
return nil, err
4835
}
4936
var tables []*schema.Table
50-
if err := msgpack.Unmarshal(res.Tables, &tables); err != nil {
37+
if err := json.Unmarshal(res.Tables, &tables); err != nil {
5138
return nil, err
5239
}
5340
return tables, nil
5441
}
5542

56-
func (c *SourceClient) Configure(ctx context.Context, spec specs.SourceSpec) (*gojsonschema.Result, error) {
57-
b, err := yaml.Marshal(spec)
58-
if err != nil {
59-
return nil, errors.Wrap(err, "failed to marshal source spec")
60-
}
61-
res, err := c.pbClient.Configure(ctx, &pb.Configure_Request{Config: b})
62-
if err != nil {
63-
return nil, errors.Wrap(err, "failed to configure source")
64-
}
65-
var validationResult gojsonschema.Result
66-
if err := msgpack.Unmarshal(res.JsonschemaResult, &validationResult); err != nil {
67-
return nil, errors.Wrap(err, "failed to unmarshal validation result")
68-
}
69-
return &validationResult, nil
70-
}
71-
72-
func (c *SourceClient) GetExampleConfig(ctx context.Context) (string, error) {
43+
func (c *SourceClient) ExampleConfig(ctx context.Context) (string, error) {
7344
res, err := c.pbClient.GetExampleConfig(ctx, &pb.GetExampleConfig_Request{})
7445
if err != nil {
7546
return "", fmt.Errorf("failed to get example config: %w", err)
7647
}
77-
t, err := template.New("source_plugin").Funcs(templateFuncMap()).Parse(sourcePluginExampleConfigTemplate)
78-
if err != nil {
79-
return "", fmt.Errorf("failed to parse template: %w", err)
80-
}
81-
var tpl bytes.Buffer
82-
if err := t.Execute(&tpl, map[string]interface{}{
83-
"Name": res.Name,
84-
"Version": res.Version,
85-
"PluginExampleConfig": res.Config,
86-
}); err != nil {
87-
return "", fmt.Errorf("failed to generate example config: %w", err)
88-
}
89-
return tpl.String(), nil
48+
return res.Config, nil
9049
}
9150

92-
func (c *SourceClient) Fetch(ctx context.Context, spec specs.SourceSpec, res chan<- *FetchResultMessage) error {
93-
stream, err := c.pbClient.Fetch(ctx, &pb.Fetch_Request{})
51+
func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- *schema.Resource) error {
52+
b, err := json.Marshal(spec)
9453
if err != nil {
95-
return fmt.Errorf("failed to fetch resources: %w", err)
54+
return fmt.Errorf("failed to marshal source spec: %w", err)
55+
}
56+
stream, err := c.pbClient.Sync(ctx, &pb.Sync_Request{
57+
Spec: b,
58+
})
59+
if err != nil {
60+
return fmt.Errorf("failed to sync resources: %w", err)
9661
}
9762
for {
9863
r, err := stream.Recv()
@@ -102,8 +67,12 @@ func (c *SourceClient) Fetch(ctx context.Context, spec specs.SourceSpec, res cha
10267
}
10368
return fmt.Errorf("failed to fetch resources from stream: %w", err)
10469
}
105-
res <- &FetchResultMessage{
106-
Resource: r.Resource,
70+
var resource schema.Resource
71+
err = json.Unmarshal(r.Resource, &resource)
72+
if err != nil {
73+
return fmt.Errorf("failed to unmarshal resource: %w", err)
10774
}
75+
76+
res <- &resource
10877
}
10978
}

clients/template.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

codegen/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
//codgen helps autogenerate cloudquery plugins configured by definition
2+
package codegen

0 commit comments

Comments
 (0)