Skip to content

Commit 51039d9

Browse files
committed
feat(ydbcp): implement kms plugin
1 parent 9329af5 commit 51039d9

File tree

17 files changed

+1815
-34
lines changed

17 files changed

+1815
-34
lines changed

internal/auth/auth.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"plugin"
87

98
"ydbcp/internal/config"
9+
"ydbcp/internal/plugin"
1010
"ydbcp/internal/util/xlog"
1111
"ydbcp/pkg/plugins/auth"
1212

@@ -27,30 +27,8 @@ var (
2727
ErrGetAuthToken = errors.New("can't get auth token")
2828
)
2929

30-
func NewAuthProvider(ctx context.Context, cfg config.AuthConfig) (auth.AuthProvider, error) {
31-
xlog.Info(ctx, "Loading auth provider plugin", zap.String("PluginPath", cfg.PluginPath))
32-
33-
plug, err := plugin.Open(cfg.PluginPath)
34-
if err != nil {
35-
return nil, fmt.Errorf("can't load auth provider plugin, path %s: %w", cfg.PluginPath, err)
36-
}
37-
symbol, err := plug.Lookup("AuthProvider")
38-
if err != nil {
39-
return nil, fmt.Errorf("can't lookup AuthProvider symbol, plugin path %s: %w", cfg.PluginPath, err)
40-
}
41-
var instance auth.AuthProvider
42-
instance, ok := symbol.(auth.AuthProvider)
43-
if !ok {
44-
return nil, fmt.Errorf("can't cast AuthProvider symbol, plugin path %s", cfg.PluginPath)
45-
}
46-
pluginConfig, err := cfg.ConfigurationString()
47-
if err != nil {
48-
return nil, fmt.Errorf("can't get auth provider configuration: %w", err)
49-
}
50-
if err = instance.Init(ctx, pluginConfig); err != nil {
51-
return nil, fmt.Errorf("can't initialize auth provider plugin: %w", err)
52-
}
53-
return instance, nil
30+
func NewAuthProvider(ctx context.Context, cfg config.PluginConfig) (auth.AuthProvider, error) {
31+
return plugin.Load[auth.AuthProvider](ctx, cfg, "AuthProvider", "auth")
5432
}
5533

5634
func Authenticate(ctx context.Context, provider auth.AuthProvider) (string, error) {

internal/config/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type ClientConnectionConfig struct {
4545
AllowInsecureEndpoint bool `yaml:"allow_insecure_endpoint" default:"false"`
4646
}
4747

48-
type AuthConfig struct {
48+
type PluginConfig struct {
4949
PluginPath string `yaml:"plugin_path"`
5050
Configuration interface{} `yaml:"configuration"`
5151
}
@@ -95,7 +95,7 @@ type Config struct {
9595
DBConnection YDBConnectionConfig `yaml:"db_connection"`
9696
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
9797
S3 S3Config `yaml:"s3"`
98-
Auth AuthConfig `yaml:"auth"`
98+
Auth PluginConfig `yaml:"auth"`
9999
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
100100
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
101101
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`
@@ -223,7 +223,7 @@ func (c *S3Config) SecretKey() (string, error) {
223223

224224
}
225225

226-
func (c *AuthConfig) ConfigurationString() (string, error) {
226+
func (c *PluginConfig) ConfigurationString() (string, error) {
227227
txt, err := yaml.Marshal(c.Configuration)
228228
if err != nil {
229229
return "", fmt.Errorf("can't marshal Auth.Configuration to YAML: %w", err)

internal/kms/kms.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package kms
2+
3+
import (
4+
"context"
5+
6+
"ydbcp/internal/config"
7+
"ydbcp/internal/plugin"
8+
"ydbcp/pkg/plugins/kms"
9+
)
10+
11+
func NewKmsProvider(ctx context.Context, cfg config.PluginConfig) (kms.KmsProvider, error) {
12+
return plugin.Load[kms.KmsProvider](ctx, cfg, "KmsProvider", "kms")
13+
}

internal/kms/mock.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package kms
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"ydbcp/pkg/plugins/kms"
8+
)
9+
10+
type MockKmsProvider struct {
11+
keys map[string][]byte
12+
}
13+
14+
func NewMockKmsProvider(keys map[string][]byte) *MockKmsProvider {
15+
return &MockKmsProvider{
16+
keys: keys,
17+
}
18+
}
19+
20+
func (p *MockKmsProvider) Init(_ context.Context, _ string) error {
21+
if p.keys == nil {
22+
p.keys = make(map[string][]byte)
23+
}
24+
return nil
25+
}
26+
27+
func (p *MockKmsProvider) Close(_ context.Context) error {
28+
p.keys = nil
29+
return nil
30+
}
31+
32+
func xor(data []byte, key []byte) []byte {
33+
out := make([]byte, len(data))
34+
for i := range data {
35+
out[i] = data[i] ^ key[i%len(key)]
36+
}
37+
return out
38+
}
39+
40+
func (p *MockKmsProvider) Encrypt(
41+
_ context.Context,
42+
req *kms.EncryptRequest,
43+
) (*kms.EncryptResponse, error) {
44+
if req == nil {
45+
return nil, fmt.Errorf("mock kms: encrypt request is nil")
46+
}
47+
if len(req.Plaintext) == 0 {
48+
return &kms.EncryptResponse{KeyID: req.KeyID}, nil
49+
}
50+
51+
key, ok := p.keys[req.KeyID]
52+
if !ok {
53+
return nil, fmt.Errorf("mock kms: key not found")
54+
}
55+
56+
ciphertext := xor(req.Plaintext, key)
57+
return &kms.EncryptResponse{
58+
KeyID: req.KeyID,
59+
Ciphertext: ciphertext,
60+
}, nil
61+
}
62+
63+
func (p *MockKmsProvider) Decrypt(
64+
_ context.Context,
65+
req *kms.DecryptRequest,
66+
) (*kms.DecryptResponse, error) {
67+
if req == nil {
68+
return nil, fmt.Errorf("mock kms: decrypt request is nil")
69+
}
70+
if len(req.Ciphertext) == 0 {
71+
return &kms.DecryptResponse{KeyID: req.KeyID}, nil
72+
}
73+
74+
key, ok := p.keys[req.KeyID]
75+
if !ok {
76+
return nil, fmt.Errorf("mock kms: key not found")
77+
}
78+
79+
plaintext := xor(req.Ciphertext, key)
80+
return &kms.DecryptResponse{
81+
KeyID: req.KeyID,
82+
Plaintext: plaintext,
83+
}, nil
84+
}

internal/plugin/plugin.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package plugin
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"plugin"
7+
8+
"ydbcp/internal/config"
9+
"ydbcp/internal/util/xlog"
10+
11+
"go.uber.org/zap"
12+
)
13+
14+
type Plugin interface {
15+
Init(ctx context.Context, config string) error
16+
}
17+
18+
// Load loads a Go plugin, looks up the provided symbol name and initializes it with the
19+
// plugin configuration. The type parameter must be an interface that embeds the Init method.
20+
func Load[T Plugin](
21+
ctx context.Context,
22+
cfg config.PluginConfig,
23+
symbolName string,
24+
pluginKind string,
25+
) (T, error) {
26+
var zero T
27+
28+
xlog.Info(ctx, fmt.Sprintf("Loading %s plugin", pluginKind), zap.String("plugin_path", cfg.PluginPath))
29+
30+
plug, err := plugin.Open(cfg.PluginPath)
31+
if err != nil {
32+
return zero, fmt.Errorf("can't load %s plugin, path %s: %w", pluginKind, cfg.PluginPath, err)
33+
}
34+
35+
symbol, err := plug.Lookup(symbolName)
36+
if err != nil {
37+
return zero, fmt.Errorf("can't lookup %s symbol, plugin path %s: %w", symbolName, cfg.PluginPath, err)
38+
}
39+
40+
instance, ok := symbol.(T)
41+
if !ok {
42+
return zero, fmt.Errorf("can't cast %s symbol, plugin path %s", symbolName, cfg.PluginPath)
43+
}
44+
45+
pluginConfig, err := cfg.ConfigurationString()
46+
if err != nil {
47+
return zero, fmt.Errorf("can't get %s plugin configuration: %w", pluginKind, err)
48+
}
49+
50+
if err = instance.Init(ctx, pluginConfig); err != nil {
51+
return zero, fmt.Errorf("can't initialize %s plugin: %w", pluginKind, err)
52+
}
53+
54+
return instance, nil
55+
}

internal/util/tls_setup/tls_setup.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,53 @@ import (
55
"crypto/x509"
66
"errors"
77
"fmt"
8+
"os"
9+
810
"google.golang.org/grpc"
911
"google.golang.org/grpc/credentials"
10-
"os"
1112
)
1213

13-
func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
14+
func LoadTLSCredentials(rootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
15+
tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
16+
if err != nil {
17+
return nil, err
18+
}
19+
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
20+
}
21+
22+
func LoadMTLSCredentials(
23+
rootCAPath *string,
24+
clientCertPath *string,
25+
clientKeyPath *string,
26+
withInsecure bool,
27+
) (grpc.DialOption, error) {
28+
if clientCertPath == nil || len(*clientCertPath) == 0 {
29+
return nil, fmt.Errorf("client certificate path is required for mTLS")
30+
}
31+
if clientKeyPath == nil || len(*clientKeyPath) == 0 {
32+
return nil, fmt.Errorf("client key path is required for mTLS")
33+
}
34+
35+
tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
cert, err := tls.LoadX509KeyPair(*clientCertPath, *clientKeyPath)
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to load client certificate/key: %w", err)
43+
}
44+
tlsConfig.Certificates = []tls.Certificate{cert}
45+
46+
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
47+
}
48+
49+
func buildTLSConfig(rootCAPath *string, withInsecure bool) (*tls.Config, error) {
1450
var certPool *x509.CertPool
15-
if RootCAPath != nil && len(*RootCAPath) > 0 {
16-
caBundle, err := os.ReadFile(*RootCAPath)
51+
if rootCAPath != nil && len(*rootCAPath) > 0 {
52+
caBundle, err := os.ReadFile(*rootCAPath)
1753
if err != nil {
18-
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *RootCAPath, err)
54+
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *rootCAPath, err)
1955
}
2056
certPool = x509.NewCertPool()
2157
if ok := certPool.AppendCertsFromPEM(caBundle); !ok {
@@ -36,5 +72,5 @@ func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption,
3672
if withInsecure {
3773
tlsConfig.InsecureSkipVerify = true
3874
}
39-
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
75+
return tlsConfig, nil
4076
}

pkg/plugins/kms/kms.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kms
2+
3+
import "context"
4+
5+
type EncryptRequest struct {
6+
KeyID string
7+
Plaintext []byte
8+
}
9+
10+
type EncryptResponse struct {
11+
KeyID string
12+
Ciphertext []byte
13+
}
14+
15+
type DecryptRequest struct {
16+
KeyID string
17+
Ciphertext []byte
18+
}
19+
20+
type DecryptResponse struct {
21+
KeyID string
22+
Plaintext []byte
23+
}
24+
25+
// KmsProvider is an interface that KMS plugins must implement.
26+
type KmsProvider interface {
27+
Init(ctx context.Context, config string) error
28+
Close(ctx context.Context) error
29+
30+
Encrypt(ctx context.Context, req *EncryptRequest) (*EncryptResponse, error)
31+
Decrypt(ctx context.Context, req *DecryptRequest) (*DecryptResponse, error)
32+
}

0 commit comments

Comments
 (0)