Skip to content

Commit b25d1ed

Browse files
committed
feat(ydbcp): implement kms plugin
1 parent 9329af5 commit b25d1ed

File tree

16 files changed

+1703
-10
lines changed

16 files changed

+1703
-10
lines changed

internal/auth/auth.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var (
2727
ErrGetAuthToken = errors.New("can't get auth token")
2828
)
2929

30-
func NewAuthProvider(ctx context.Context, cfg config.AuthConfig) (auth.AuthProvider, error) {
30+
func NewAuthProvider(ctx context.Context, cfg config.PluginConfig) (auth.AuthProvider, error) {
3131
xlog.Info(ctx, "Loading auth provider plugin", zap.String("PluginPath", cfg.PluginPath))
3232

3333
plug, err := plugin.Open(cfg.PluginPath)

internal/config/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type ClientConnectionConfig struct {
4545
AllowInsecureEndpoint bool `yaml:"allow_insecure_endpoint" default:"false"`
4646
}
4747

48-
type AuthConfig struct {
48+
type PluginConfig struct {
4949
PluginPath string `yaml:"plugin_path"`
5050
Configuration interface{} `yaml:"configuration"`
5151
}
@@ -95,7 +95,7 @@ type Config struct {
9595
DBConnection YDBConnectionConfig `yaml:"db_connection"`
9696
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
9797
S3 S3Config `yaml:"s3"`
98-
Auth AuthConfig `yaml:"auth"`
98+
Auth PluginConfig `yaml:"auth"`
9999
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
100100
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
101101
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`
@@ -223,7 +223,7 @@ func (c *S3Config) SecretKey() (string, error) {
223223

224224
}
225225

226-
func (c *AuthConfig) ConfigurationString() (string, error) {
226+
func (c *PluginConfig) ConfigurationString() (string, error) {
227227
txt, err := yaml.Marshal(c.Configuration)
228228
if err != nil {
229229
return "", fmt.Errorf("can't marshal Auth.Configuration to YAML: %w", err)

internal/kms/kms.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kms
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"plugin"
7+
"ydbcp/internal/config"
8+
"ydbcp/internal/util/xlog"
9+
"ydbcp/pkg/plugins/kms"
10+
11+
"go.uber.org/zap"
12+
)
13+
14+
func NewKmsProvider(ctx context.Context, cfg config.PluginConfig) (kms.KmsProvider, error) {
15+
xlog.Info(ctx, "Loading kms provider plugin", zap.String("PluginPath", cfg.PluginPath))
16+
17+
pluginInstance, err := plugin.Open(cfg.PluginPath)
18+
if err != nil {
19+
return nil, fmt.Errorf("can't load kms provider plugin, path %s: %w", cfg.PluginPath, err)
20+
}
21+
symbol, err := pluginInstance.Lookup("KmsProvider")
22+
if err != nil {
23+
return nil, fmt.Errorf("can't lookup KmsProvider symbol, plugin path %s: %w", cfg.PluginPath, err)
24+
}
25+
26+
var instance kms.KmsProvider
27+
instance, ok := symbol.(kms.KmsProvider)
28+
if !ok {
29+
return nil, fmt.Errorf("can't cast KmsProvider symbol, plugin path %s", cfg.PluginPath)
30+
}
31+
pluginConfig, err := cfg.ConfigurationString()
32+
if err != nil {
33+
return nil, fmt.Errorf("can't get kms provider configuration: %w", err)
34+
}
35+
if err = instance.Init(ctx, pluginConfig); err != nil {
36+
return nil, fmt.Errorf("can't initialize kms provider plugin: %w", err)
37+
}
38+
return instance, nil
39+
}

internal/kms/mock.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
3+
// TODO: Implement

internal/util/tls_setup/tls_setup.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,53 @@ import (
55
"crypto/x509"
66
"errors"
77
"fmt"
8+
"os"
9+
810
"google.golang.org/grpc"
911
"google.golang.org/grpc/credentials"
10-
"os"
1112
)
1213

13-
func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
14+
func LoadTLSCredentials(rootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
15+
tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
16+
if err != nil {
17+
return nil, err
18+
}
19+
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
20+
}
21+
22+
func LoadMTLSCredentials(
23+
rootCAPath *string,
24+
clientCertPath *string,
25+
clientKeyPath *string,
26+
withInsecure bool,
27+
) (grpc.DialOption, error) {
28+
if clientCertPath == nil || len(*clientCertPath) == 0 {
29+
return nil, fmt.Errorf("client certificate path is required for mTLS")
30+
}
31+
if clientKeyPath == nil || len(*clientKeyPath) == 0 {
32+
return nil, fmt.Errorf("client key path is required for mTLS")
33+
}
34+
35+
tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
cert, err := tls.LoadX509KeyPair(*clientCertPath, *clientKeyPath)
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to load client certificate/key: %w", err)
43+
}
44+
tlsConfig.Certificates = []tls.Certificate{cert}
45+
46+
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
47+
}
48+
49+
func buildTLSConfig(rootCAPath *string, withInsecure bool) (*tls.Config, error) {
1450
var certPool *x509.CertPool
15-
if RootCAPath != nil && len(*RootCAPath) > 0 {
16-
caBundle, err := os.ReadFile(*RootCAPath)
51+
if rootCAPath != nil && len(*rootCAPath) > 0 {
52+
caBundle, err := os.ReadFile(*rootCAPath)
1753
if err != nil {
18-
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *RootCAPath, err)
54+
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *rootCAPath, err)
1955
}
2056
certPool = x509.NewCertPool()
2157
if ok := certPool.AppendCertsFromPEM(caBundle); !ok {
@@ -36,5 +72,5 @@ func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption,
3672
if withInsecure {
3773
tlsConfig.InsecureSkipVerify = true
3874
}
39-
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
75+
return tlsConfig, nil
4076
}

pkg/plugins/kms/kms.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package kms
2+
3+
import "context"
4+
5+
type EncryptRequest struct {
6+
KeyID string
7+
Plaintext []byte
8+
}
9+
10+
type EncryptResponse struct {
11+
KeyID string
12+
Ciphertext []byte
13+
}
14+
15+
type DecryptRequest struct {
16+
KeyID string
17+
Ciphertext []byte
18+
}
19+
20+
type DecryptResponse struct {
21+
KeyID string
22+
Plaintext []byte
23+
}
24+
25+
// KmsProvider is an interface that KMS plugins must implement.
26+
type KmsProvider interface {
27+
Init(ctx context.Context, config string) error
28+
Close(ctx context.Context) error
29+
30+
Encrypt(ctx context.Context, req *EncryptRequest) (*EncryptResponse, error)
31+
Decrypt(ctx context.Context, req *DecryptRequest) (*DecryptResponse, error)
32+
}

plugins/kms_nebius/kms_nebius.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"ydbcp/internal/util/tls_setup"
8+
"ydbcp/internal/util/xlog"
9+
"ydbcp/pkg/plugins/kms"
10+
pb "ydbcp/plugins/kms_nebius/proto"
11+
12+
"go.uber.org/zap"
13+
"google.golang.org/grpc"
14+
"gopkg.in/yaml.v3"
15+
)
16+
17+
type kmsProviderNebius struct {
18+
config pluginConfig
19+
}
20+
21+
type pluginConfig struct {
22+
CryptoServiceEndpoint string `yaml:"crypto_service_endpoint"`
23+
Insecure bool `yaml:"insecure" default:"false"`
24+
RootCAPath string `yaml:"root_ca_path"`
25+
ClientKeyPath string `yaml:"client_key_path"`
26+
ClientCertificatePath string `yaml:"client_certificate_path"`
27+
}
28+
29+
func (p *kmsProviderNebius) Init(ctx context.Context, rawConfig string) error {
30+
xlog.Info(ctx, "KmsNebiusProvider initialization started", zap.String("config", rawConfig))
31+
if err := yaml.Unmarshal([]byte(rawConfig), &p.config); err != nil {
32+
xlog.Error(ctx, "Unable to parse configuration", zap.Error(err))
33+
return fmt.Errorf("kms: unable to parse configuration: %w", err)
34+
}
35+
if len(p.config.CryptoServiceEndpoint) == 0 {
36+
return fmt.Errorf("kms: crypto service endpoint is required in configuration")
37+
}
38+
if len(p.config.RootCAPath) == 0 {
39+
return fmt.Errorf("kms: root ca path is required in configuration")
40+
}
41+
if len(p.config.ClientKeyPath) == 0 {
42+
return fmt.Errorf("kms: client key path is required in configuration")
43+
}
44+
if len(p.config.ClientCertificatePath) == 0 {
45+
return fmt.Errorf("kms: client certificate path is required in configuration")
46+
}
47+
48+
xlog.Info(ctx, "KmsNebiusProvider was initialized successfully")
49+
return nil
50+
}
51+
52+
func (p *kmsProviderNebius) Close(ctx context.Context) error {
53+
xlog.Info(ctx, "KmsNebiusProvider was closed")
54+
return nil
55+
}
56+
57+
func (p *kmsProviderNebius) Encrypt(ctx context.Context, req *kms.EncryptRequest) (*kms.EncryptResponse, error) {
58+
if req == nil {
59+
return nil, fmt.Errorf("kms: encryption request is nil")
60+
}
61+
if len(req.KeyID) == 0 {
62+
return nil, fmt.Errorf("kms: key id is required in encryption request")
63+
}
64+
65+
tlsOption, err := tls_setup.LoadMTLSCredentials(&p.config.RootCAPath, &p.config.ClientCertificatePath, &p.config.ClientKeyPath, p.config.Insecure)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
grpcClient, err := grpc.NewClient("dns:"+p.config.CryptoServiceEndpoint, tlsOption) // TODO: do we need dns prefix?
71+
if err != nil {
72+
return nil, err
73+
}
74+
defer grpcClient.Close()
75+
76+
client := pb.NewSymmetricCryptoServiceClient(grpcClient)
77+
resp, err := client.Encrypt(ctx, &pb.SymmetricEncryptRequest{
78+
KeyId: req.KeyID,
79+
Plaintext: req.Plaintext,
80+
})
81+
if err != nil {
82+
return nil, fmt.Errorf("kms: encryption was failed: %w", err)
83+
}
84+
85+
return &kms.EncryptResponse{
86+
KeyID: resp.GetKeyId(),
87+
Ciphertext: resp.GetCiphertext(),
88+
}, nil
89+
}
90+
91+
func (p *kmsProviderNebius) Decrypt(ctx context.Context, req *kms.DecryptRequest) (*kms.DecryptResponse, error) {
92+
if req == nil {
93+
return nil, fmt.Errorf("kms: decryption request is nil")
94+
}
95+
if len(req.KeyID) == 0 {
96+
return nil, fmt.Errorf("kms: key id is required in decryption request")
97+
}
98+
99+
tlsOption, err := tls_setup.LoadTLSCredentials(&p.config.RootCAPath, p.config.Insecure)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
grpcClient, err := grpc.NewClient("dns:"+p.config.CryptoServiceEndpoint, tlsOption) // TODO: do we need dns prefix?
105+
if err != nil {
106+
return nil, err
107+
}
108+
defer grpcClient.Close()
109+
110+
client := pb.NewSymmetricCryptoServiceClient(grpcClient)
111+
resp, err := client.Decrypt(ctx, &pb.SymmetricDecryptRequest{
112+
KeyId: req.KeyID,
113+
Ciphertext: req.Ciphertext,
114+
})
115+
if err != nil {
116+
return nil, fmt.Errorf("kms: decryption was failed: %w", err)
117+
}
118+
119+
return &kms.DecryptResponse{
120+
KeyID: resp.GetKeyId(),
121+
Plaintext: resp.GetPlaintext(),
122+
}, nil
123+
}
124+
125+
func main() {}
126+
127+
var KmsProvider kmsProviderNebius

0 commit comments

Comments
 (0)