Skip to content

Commit d9004a3

Browse files
authored
feat!: Create cq-plugin-sdk (cloudquery v2 )
* removed unused field and calls * feat!: Create cq-plugin-sdk (cloudquery v2)
1 parent 76ac8db commit d9004a3

File tree

108 files changed

+3931
-11424
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+3931
-11424
lines changed

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ test:
44

55
.PHONY: lint
66
lint:
7-
golangci-lint run
7+
golangci-lint run
8+
9+
.PHONY: generate-protobuf
10+
generate-protobuf:
11+
protoc --proto_path=. --go_out . --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/pb/base.proto internal/pb/source.proto internal/pb/destination.proto

clients/destination.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package clients
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/cloudquery/cq-provider-sdk/internal/pb"
8+
"github.com/cloudquery/cq-provider-sdk/plugins"
9+
"github.com/cloudquery/cq-provider-sdk/schema"
10+
"github.com/cloudquery/cq-provider-sdk/spec"
11+
"github.com/vmihailenco/msgpack/v5"
12+
"google.golang.org/grpc"
13+
"gopkg.in/yaml.v3"
14+
)
15+
16+
type DestinationClient struct {
17+
pbClient pb.DestinationClient
18+
// this can be used if we have a plugin which is compiled in so we dont need to do any grpc requests
19+
localClient plugins.DestinationPlugin
20+
}
21+
22+
func NewDestinationClient(cc grpc.ClientConnInterface) *DestinationClient {
23+
return &DestinationClient{
24+
pbClient: pb.NewDestinationClient(cc),
25+
}
26+
}
27+
28+
func NewLocalDestinationClient(p plugins.DestinationPlugin) *DestinationClient {
29+
return &DestinationClient{
30+
localClient: p,
31+
}
32+
}
33+
34+
func (c *DestinationClient) Configure(ctx context.Context, spec spec.DestinationSpec) error {
35+
if c.localClient != nil {
36+
return c.localClient.Configure(ctx, spec)
37+
}
38+
b, err := yaml.Marshal(spec)
39+
if err != nil {
40+
return fmt.Errorf("failed to marshal spec: %w", err)
41+
}
42+
if _, err := c.pbClient.Configure(ctx, &pb.Configure_Request{Config: b}); err != nil {
43+
return err
44+
}
45+
return nil
46+
}
47+
48+
func (c *DestinationClient) GetExampleConfig(ctx context.Context) (string, error) {
49+
if c.localClient != nil {
50+
return c.localClient.GetExampleConfig(ctx), nil
51+
}
52+
res, err := c.pbClient.GetExampleConfig(ctx, &pb.GetExampleConfig_Request{})
53+
if err != nil {
54+
return "", err
55+
}
56+
return string(res.Config), nil
57+
}
58+
59+
func (c *DestinationClient) Save(ctx context.Context, msg *FetchResultMessage) error {
60+
var saveClient pb.Destination_SaveClient
61+
var err error
62+
if c.pbClient != nil {
63+
saveClient, err = c.pbClient.Save(ctx)
64+
if err != nil {
65+
return fmt.Errorf("failed to create save client: %w", err)
66+
}
67+
}
68+
if c.localClient != nil {
69+
var resource schema.Resource
70+
if err := msgpack.Unmarshal(msg.Resource, &resource); err != nil {
71+
return fmt.Errorf("failed to unmarshal resources: %w", err)
72+
}
73+
if err := c.localClient.Save(ctx, []*schema.Resource{&resource}); err != nil {
74+
return fmt.Errorf("failed to save resources: %w", err)
75+
}
76+
} else {
77+
if err := saveClient.Send(&pb.Save_Request{Resources: msg.Resource}); err != nil {
78+
return err
79+
}
80+
}
81+
82+
return nil
83+
}
84+
85+
func (c *DestinationClient) CreateTables(ctx context.Context, tables []*schema.Table) error {
86+
if c.localClient != nil {
87+
return c.localClient.CreateTables(ctx, tables)
88+
}
89+
b, err := yaml.Marshal(tables)
90+
if err != nil {
91+
return fmt.Errorf("failed to marshal tables: %w", err)
92+
}
93+
if _, err := c.pbClient.CreateTables(ctx, &pb.CreateTables_Request{Tables: b}); err != nil {
94+
return err
95+
}
96+
return nil
97+
}

clients/source.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// package clients is a wrapper around grpc clients so clients can work
2+
// with non protobuf structs and handle unmarshaling
3+
package clients
4+
5+
import (
6+
"bytes"
7+
"context"
8+
"fmt"
9+
"io"
10+
"text/template"
11+
12+
"github.com/cloudquery/cq-provider-sdk/internal/pb"
13+
"github.com/cloudquery/cq-provider-sdk/schema"
14+
"github.com/cloudquery/cq-provider-sdk/spec"
15+
"github.com/pkg/errors"
16+
"github.com/vmihailenco/msgpack/v5"
17+
"github.com/xeipuuv/gojsonschema"
18+
"google.golang.org/grpc"
19+
"gopkg.in/yaml.v3"
20+
)
21+
22+
type SourceClient struct {
23+
pbClient pb.SourceClient
24+
}
25+
26+
type FetchResultMessage struct {
27+
Resource []byte
28+
}
29+
30+
const sourcePluginExampleConfigTemplate = `kind: source
31+
spec:
32+
name: {{.Name}}
33+
version: {{.Version}}
34+
configuration:
35+
{{.PluginExampleConfig | indent 4}}
36+
`
37+
38+
func NewSourceClient(cc grpc.ClientConnInterface) *SourceClient {
39+
return &SourceClient{
40+
pbClient: pb.NewSourceClient(cc),
41+
}
42+
}
43+
44+
func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) {
45+
res, err := c.pbClient.GetTables(ctx, &pb.GetTables_Request{})
46+
if err != nil {
47+
return nil, err
48+
}
49+
var tables []*schema.Table
50+
if err := msgpack.Unmarshal(res.Tables, &tables); err != nil {
51+
return nil, err
52+
}
53+
return tables, nil
54+
}
55+
56+
func (c *SourceClient) Configure(ctx context.Context, s spec.SourceSpec) (*gojsonschema.Result, error) {
57+
b, err := yaml.Marshal(s)
58+
if err != nil {
59+
return nil, errors.Wrap(err, "failed to marshal source spec")
60+
}
61+
res, err := c.pbClient.Configure(ctx, &pb.Configure_Request{Config: b})
62+
if err != nil {
63+
return nil, errors.Wrap(err, "failed to configure source")
64+
}
65+
var validationResult gojsonschema.Result
66+
if err := msgpack.Unmarshal(res.JsonschemaResult, &validationResult); err != nil {
67+
return nil, errors.Wrap(err, "failed to unmarshal validation result")
68+
}
69+
return &validationResult, nil
70+
}
71+
72+
func (c *SourceClient) GetExampleConfig(ctx context.Context) (string, error) {
73+
res, err := c.pbClient.GetExampleConfig(ctx, &pb.GetExampleConfig_Request{})
74+
if err != nil {
75+
return "", fmt.Errorf("failed to get example config: %w", err)
76+
}
77+
t, err := template.New("source_plugin").Funcs(templateFuncMap()).Parse(sourcePluginExampleConfigTemplate)
78+
if err != nil {
79+
return "", fmt.Errorf("failed to parse template: %w", err)
80+
}
81+
var tpl bytes.Buffer
82+
if err := t.Execute(&tpl, map[string]interface{}{
83+
"Name": res.Name,
84+
"Version": res.Version,
85+
"PluginExampleConfig": string(res.Config),
86+
}); err != nil {
87+
return "", fmt.Errorf("failed to generate example config: %w", err)
88+
}
89+
return tpl.String(), nil
90+
}
91+
92+
func (c *SourceClient) Fetch(ctx context.Context, spec spec.SourceSpec, res chan<- *FetchResultMessage) error {
93+
stream, err := c.pbClient.Fetch(ctx, &pb.Fetch_Request{})
94+
if err != nil {
95+
return fmt.Errorf("failed to fetch resources: %w", err)
96+
}
97+
for {
98+
r, err := stream.Recv()
99+
if err != nil {
100+
if err == io.EOF {
101+
return nil
102+
}
103+
return fmt.Errorf("failed to fetch resources from stream: %w", err)
104+
}
105+
res <- &FetchResultMessage{
106+
Resource: r.Resource,
107+
}
108+
}
109+
}

clients/template.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package clients
2+
3+
import (
4+
"strings"
5+
"text/template"
6+
)
7+
8+
func templateFuncMap() template.FuncMap {
9+
return template.FuncMap{
10+
"indent": indent,
11+
}
12+
}
13+
14+
func indent(spaces int, v string) string {
15+
pad := strings.Repeat(" ", spaces)
16+
return pad + strings.Replace(v, "\n", "\n"+pad, -1)
17+
}

0 commit comments

Comments
 (0)