Skip to content

Commit 3e1492b

Browse files
authored
feat: Support application level protocol message. (#294)
This will let us make breaking changes to the protocol while ensuring users get the right message on what todo. e.g update plugin or cli (client). This is needed in general and also is a pre-requisite for the performance issue in destination plugins that will require breaking change in the destination protocol.
1 parent 48f953a commit 3e1492b

File tree

14 files changed

+555
-171
lines changed

14 files changed

+555
-171
lines changed

clients/destination.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ import (
1515
"time"
1616

1717
"github.com/cloudquery/plugin-sdk/internal/pb"
18+
"github.com/cloudquery/plugin-sdk/internal/versions"
1819
"github.com/cloudquery/plugin-sdk/schema"
1920
"github.com/cloudquery/plugin-sdk/specs"
2021
"github.com/rs/zerolog"
2122
"google.golang.org/grpc"
23+
"google.golang.org/grpc/codes"
2224
"google.golang.org/grpc/credentials/insecure"
25+
"google.golang.org/grpc/status"
2326
"google.golang.org/protobuf/types/known/timestamppb"
2427
)
2528

@@ -78,7 +81,9 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str
7881
}
7982
return c, nil
8083
case specs.RegistryLocal:
81-
return c.newManagedClient(ctx, path)
84+
if err := c.newManagedClient(ctx, path); err != nil {
85+
return nil, err
86+
}
8287
case specs.RegistryGithub:
8388
pathSplit := strings.Split(path, "/")
8489
if len(pathSplit) != 2 {
@@ -90,26 +95,40 @@ func NewDestinationClient(ctx context.Context, registry specs.Registry, path str
9095
if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeDestination); err != nil {
9196
return nil, err
9297
}
93-
return c.newManagedClient(ctx, localPath)
98+
if err := c.newManagedClient(ctx, localPath); err != nil {
99+
return nil, err
100+
}
94101
default:
95102
return nil, fmt.Errorf("unsupported registry %s", registry)
96103
}
104+
protocolVersion, err := c.GetProtocolVersion(ctx)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
if protocolVersion < versions.DestinationProtocolVersion {
110+
return nil, fmt.Errorf("destination plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.DestinationProtocolVersion)
111+
} else if protocolVersion > versions.DestinationProtocolVersion {
112+
return nil, fmt.Errorf("destination plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.DestinationProtocolVersion)
113+
}
114+
115+
return c, nil
97116
}
98117

99118
// newManagedClient starts a new destination plugin process from local file, connects to it via gRPC server
100119
// and returns a new DestinationClient
101-
func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (*DestinationClient, error) {
120+
func (c *DestinationClient) newManagedClient(ctx context.Context, path string) error {
102121
c.grpcSocketName = generateRandomUnixSocketName()
103122
// spawn the plugin first and then connect
104123
cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName,
105124
"--log-level", c.logger.GetLevel().String(), "--log-format", "json")
106125
reader, err := cmd.StdoutPipe()
107126
if err != nil {
108-
return nil, fmt.Errorf("failed to get stdout pipe: %w", err)
127+
return fmt.Errorf("failed to get stdout pipe: %w", err)
109128
}
110129
cmd.Stderr = os.Stderr
111130
if err := cmd.Start(); err != nil {
112-
return nil, fmt.Errorf("failed to start plugin %s: %w", path, err)
131+
return fmt.Errorf("failed to start plugin %s: %w", path, err)
113132
}
114133

115134
c.wg.Add(1)
@@ -157,10 +176,26 @@ func (c *DestinationClient) newManagedClient(ctx context.Context, path string) (
157176
if err := cmd.Process.Kill(); err != nil {
158177
c.logger.Error().Err(err).Msg("failed to kill plugin process")
159178
}
160-
return c, err
179+
return err
161180
}
162181
c.pbClient = pb.NewDestinationClient(c.conn)
163-
return c, nil
182+
return nil
183+
}
184+
185+
func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, error) {
186+
res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{})
187+
if err != nil {
188+
s, ok := status.FromError(err)
189+
if !ok {
190+
return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err)
191+
}
192+
if s.Code() != codes.Unimplemented {
193+
return 0, err
194+
}
195+
c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1")
196+
return 1, nil
197+
}
198+
return res.Version, nil
164199
}
165200

166201
func (c *DestinationClient) Name(ctx context.Context) (string, error) {
@@ -207,7 +242,7 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table)
207242

208243
// Write writes rows as they are received from the channel to the destination plugin.
209244
// resources is marshaled schema.Resource. We are not marshalling this inside the function
210-
// because usually it is alreadun marshalled from the source plugin.
245+
// because usually it is alreadun marshalled from the destination plugin.
211246
func (c *DestinationClient) Write(ctx context.Context, source string, syncTime time.Time, resources <-chan []byte) (uint64, error) {
212247
saveClient, err := c.pbClient.Write(ctx)
213248
if err != nil {

clients/source.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ import (
1414
"sync"
1515

1616
"github.com/cloudquery/plugin-sdk/internal/pb"
17+
"github.com/cloudquery/plugin-sdk/internal/versions"
1718
"github.com/cloudquery/plugin-sdk/schema"
1819
"github.com/cloudquery/plugin-sdk/specs"
1920
"github.com/rs/zerolog"
2021
"google.golang.org/grpc"
22+
"google.golang.org/grpc/codes"
2123
"google.golang.org/grpc/credentials/insecure"
24+
"google.golang.org/grpc/status"
2225
)
2326

2427
// SourceClient
@@ -82,7 +85,9 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string,
8285
}
8386
return c, nil
8487
case specs.RegistryLocal:
85-
return c.newManagedClient(ctx, path)
88+
if err := c.newManagedClient(ctx, path); err != nil {
89+
return nil, err
90+
}
8691
case specs.RegistryGithub:
8792
pathSplit := strings.Split(path, "/")
8893
if len(pathSplit) != 2 {
@@ -94,26 +99,40 @@ func NewSourceClient(ctx context.Context, registry specs.Registry, path string,
9499
if err := DownloadPluginFromGithub(ctx, localPath, org, name, version, PluginTypeSource); err != nil {
95100
return nil, err
96101
}
97-
return c.newManagedClient(ctx, localPath)
102+
if err := c.newManagedClient(ctx, localPath); err != nil {
103+
return nil, err
104+
}
98105
default:
99106
return nil, fmt.Errorf("unsupported registry %s", registry)
100107
}
108+
109+
protocolVersion, err := c.GetProtocolVersion(ctx)
110+
if err != nil {
111+
return nil, err
112+
}
113+
if protocolVersion < versions.SourceProtocolVersion {
114+
return nil, fmt.Errorf("source plugin protocol version %d is lower than client version %d. Try updating client", protocolVersion, versions.SourceProtocolVersion)
115+
} else if protocolVersion > versions.SourceProtocolVersion {
116+
return nil, fmt.Errorf("source plugin protocol version %d is higher than client version %d. Try updating destination plugin", protocolVersion, versions.SourceProtocolVersion)
117+
}
118+
119+
return c, nil
101120
}
102121

103122
// newManagedClient starts a new source plugin process from local path, connects to it via gRPC server
104123
// and returns a new SourceClient
105-
func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*SourceClient, error) {
124+
func (c *SourceClient) newManagedClient(ctx context.Context, path string) error {
106125
c.grpcSocketName = generateRandomUnixSocketName()
107126
// spawn the plugin first and then connect
108127
cmd := exec.CommandContext(ctx, path, "serve", "--network", "unix", "--address", c.grpcSocketName,
109128
"--log-level", c.logger.GetLevel().String(), "--log-format", "json")
110129
reader, err := cmd.StdoutPipe()
111130
if err != nil {
112-
return nil, fmt.Errorf("failed to get stdout pipe: %w", err)
131+
return fmt.Errorf("failed to get stdout pipe: %w", err)
113132
}
114133
cmd.Stderr = os.Stderr
115134
if err := cmd.Start(); err != nil {
116-
return nil, fmt.Errorf("failed to start plugin %s: %w", path, err)
135+
return fmt.Errorf("failed to start plugin %s: %w", path, err)
117136
}
118137

119138
c.wg.Add(1)
@@ -161,10 +180,26 @@ func (c *SourceClient) newManagedClient(ctx context.Context, path string) (*Sour
161180
if err := cmd.Process.Kill(); err != nil {
162181
c.logger.Error().Err(err).Msg("failed to kill plugin process")
163182
}
164-
return c, err
183+
return err
165184
}
166185
c.pbClient = pb.NewSourceClient(c.conn)
167-
return c, nil
186+
return nil
187+
}
188+
189+
func (c *SourceClient) GetProtocolVersion(ctx context.Context) (uint64, error) {
190+
res, err := c.pbClient.GetProtocolVersion(ctx, &pb.GetProtocolVersion_Request{})
191+
if err != nil {
192+
s, ok := status.FromError(err)
193+
if !ok {
194+
return 0, fmt.Errorf("failed to cal GetProtocolVersion: %w", err)
195+
}
196+
if s.Code() != codes.Unimplemented {
197+
return 0, err
198+
}
199+
c.logger.Warn().Err(err).Msg("plugin does not support protocol version. assuming protocol version 1")
200+
return 1, nil
201+
}
202+
return res.Version, nil
168203
}
169204

170205
func (c *SourceClient) Name(ctx context.Context) (string, error) {
@@ -197,7 +232,7 @@ func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) {
197232

198233
// Sync start syncing for the source client per the given spec and returning the results
199234
// in the given channel. res is marshaled schema.Resource. We are not unmarshalling this for performance reasons
200-
// as usually this is sent over-the-wire anyway to a destination plugin
235+
// as usually this is sent over-the-wire anyway to a source plugin
201236
func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- []byte) error {
202237
b, err := json.Marshal(spec)
203238
if err != nil {

0 commit comments

Comments
 (0)