Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 3 additions & 25 deletions internal/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"errors"
"fmt"
"plugin"

"ydbcp/internal/config"
"ydbcp/internal/plugin"
"ydbcp/internal/util/xlog"
"ydbcp/pkg/plugins/auth"

Expand All @@ -27,30 +27,8 @@ var (
ErrGetAuthToken = errors.New("can't get auth token")
)

func NewAuthProvider(ctx context.Context, cfg config.AuthConfig) (auth.AuthProvider, error) {
xlog.Info(ctx, "Loading auth provider plugin", zap.String("PluginPath", cfg.PluginPath))

plug, err := plugin.Open(cfg.PluginPath)
if err != nil {
return nil, fmt.Errorf("can't load auth provider plugin, path %s: %w", cfg.PluginPath, err)
}
symbol, err := plug.Lookup("AuthProvider")
if err != nil {
return nil, fmt.Errorf("can't lookup AuthProvider symbol, plugin path %s: %w", cfg.PluginPath, err)
}
var instance auth.AuthProvider
instance, ok := symbol.(auth.AuthProvider)
if !ok {
return nil, fmt.Errorf("can't cast AuthProvider symbol, plugin path %s", cfg.PluginPath)
}
pluginConfig, err := cfg.ConfigurationString()
if err != nil {
return nil, fmt.Errorf("can't get auth provider configuration: %w", err)
}
if err = instance.Init(ctx, pluginConfig); err != nil {
return nil, fmt.Errorf("can't initialize auth provider plugin: %w", err)
}
return instance, nil
func NewAuthProvider(ctx context.Context, cfg config.PluginConfig) (auth.AuthProvider, error) {
return plugin.Load[auth.AuthProvider](ctx, cfg, "AuthProvider", "auth")
}

func Authenticate(ctx context.Context, provider auth.AuthProvider) (string, error) {
Expand Down
6 changes: 3 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ClientConnectionConfig struct {
AllowInsecureEndpoint bool `yaml:"allow_insecure_endpoint" default:"false"`
}

type AuthConfig struct {
type PluginConfig struct {
PluginPath string `yaml:"plugin_path"`
Configuration interface{} `yaml:"configuration"`
}
Expand Down Expand Up @@ -95,7 +95,7 @@ type Config struct {
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
Auth AuthConfig `yaml:"auth"`
Auth PluginConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`
Expand Down Expand Up @@ -223,7 +223,7 @@ func (c *S3Config) SecretKey() (string, error) {

}

func (c *AuthConfig) ConfigurationString() (string, error) {
func (c *PluginConfig) ConfigurationString() (string, error) {
txt, err := yaml.Marshal(c.Configuration)
if err != nil {
return "", fmt.Errorf("can't marshal Auth.Configuration to YAML: %w", err)
Expand Down
13 changes: 13 additions & 0 deletions internal/kms/kms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kms

import (
"context"

"ydbcp/internal/config"
"ydbcp/internal/plugin"
"ydbcp/pkg/plugins/kms"
)

func NewKmsProvider(ctx context.Context, cfg config.PluginConfig) (kms.KmsProvider, error) {
return plugin.Load[kms.KmsProvider](ctx, cfg, "KmsProvider", "kms")
}
84 changes: 84 additions & 0 deletions internal/kms/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package kms

import (
"context"
"fmt"

"ydbcp/pkg/plugins/kms"
)

type MockKmsProvider struct {
keys map[string][]byte
}

func NewMockKmsProvider(keys map[string][]byte) *MockKmsProvider {
return &MockKmsProvider{
keys: keys,
}
}

func (p *MockKmsProvider) Init(_ context.Context, _ string) error {
if p.keys == nil {
p.keys = make(map[string][]byte)
}
return nil
}

func (p *MockKmsProvider) Close(_ context.Context) error {
p.keys = nil
return nil
}

func xor(data []byte, key []byte) []byte {
out := make([]byte, len(data))
for i := range data {
out[i] = data[i] ^ key[i%len(key)]
}
return out
}

func (p *MockKmsProvider) Encrypt(
_ context.Context,
req *kms.EncryptRequest,
) (*kms.EncryptResponse, error) {
if req == nil {
return nil, fmt.Errorf("mock kms: encrypt request is nil")
}
if len(req.Plaintext) == 0 {
return &kms.EncryptResponse{KeyID: req.KeyID}, nil
}

key, ok := p.keys[req.KeyID]
if !ok {
return nil, fmt.Errorf("mock kms: key not found")
}

ciphertext := xor(req.Plaintext, key)
return &kms.EncryptResponse{
KeyID: req.KeyID,
Ciphertext: ciphertext,
}, nil
}

func (p *MockKmsProvider) Decrypt(
_ context.Context,
req *kms.DecryptRequest,
) (*kms.DecryptResponse, error) {
if req == nil {
return nil, fmt.Errorf("mock kms: decrypt request is nil")
}
if len(req.Ciphertext) == 0 {
return &kms.DecryptResponse{KeyID: req.KeyID}, nil
}

key, ok := p.keys[req.KeyID]
if !ok {
return nil, fmt.Errorf("mock kms: key not found")
}

plaintext := xor(req.Ciphertext, key)
return &kms.DecryptResponse{
KeyID: req.KeyID,
Plaintext: plaintext,
}, nil
}
55 changes: 55 additions & 0 deletions internal/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package plugin

import (
"context"
"fmt"
"plugin"

"ydbcp/internal/config"
"ydbcp/internal/util/xlog"

"go.uber.org/zap"
)

type Plugin interface {
Init(ctx context.Context, config string) error
}

// Load loads a Go plugin, looks up the provided symbol name and initializes it with the
// plugin configuration. The type parameter must be an interface that embeds the Init method.
func Load[T Plugin](
ctx context.Context,
cfg config.PluginConfig,
symbolName string,
pluginKind string,
) (T, error) {
var zero T

xlog.Info(ctx, fmt.Sprintf("Loading %s plugin", pluginKind), zap.String("plugin_path", cfg.PluginPath))

plug, err := plugin.Open(cfg.PluginPath)
if err != nil {
return zero, fmt.Errorf("can't load %s plugin, path %s: %w", pluginKind, cfg.PluginPath, err)
}

symbol, err := plug.Lookup(symbolName)
if err != nil {
return zero, fmt.Errorf("can't lookup %s symbol, plugin path %s: %w", symbolName, cfg.PluginPath, err)
}

instance, ok := symbol.(T)
if !ok {
return zero, fmt.Errorf("can't cast %s symbol, plugin path %s", symbolName, cfg.PluginPath)
}

pluginConfig, err := cfg.ConfigurationString()
if err != nil {
return zero, fmt.Errorf("can't get %s plugin configuration: %w", pluginKind, err)
}

if err = instance.Init(ctx, pluginConfig); err != nil {
return zero, fmt.Errorf("can't initialize %s plugin: %w", pluginKind, err)
}

return instance, nil
}
48 changes: 42 additions & 6 deletions internal/util/tls_setup/tls_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,53 @@ import (
"crypto/x509"
"errors"
"fmt"
"os"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"os"
)

func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
func LoadTLSCredentials(rootCAPath *string, withInsecure bool) (grpc.DialOption, error) {
tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
if err != nil {
return nil, err
}
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
}

func LoadMTLSCredentials(
rootCAPath *string,
clientCertPath *string,
clientKeyPath *string,
withInsecure bool,
) (grpc.DialOption, error) {
if clientCertPath == nil || len(*clientCertPath) == 0 {
return nil, fmt.Errorf("client certificate path is required for mTLS")
}
if clientKeyPath == nil || len(*clientKeyPath) == 0 {
return nil, fmt.Errorf("client key path is required for mTLS")
}

tlsConfig, err := buildTLSConfig(rootCAPath, withInsecure)
if err != nil {
return nil, err
}

cert, err := tls.LoadX509KeyPair(*clientCertPath, *clientKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate/key: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}

return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
}

func buildTLSConfig(rootCAPath *string, withInsecure bool) (*tls.Config, error) {
var certPool *x509.CertPool
if RootCAPath != nil && len(*RootCAPath) > 0 {
caBundle, err := os.ReadFile(*RootCAPath)
if rootCAPath != nil && len(*rootCAPath) > 0 {
caBundle, err := os.ReadFile(*rootCAPath)
if err != nil {
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *RootCAPath, err)
return nil, fmt.Errorf("unable to read root ca bundle from file %s: %w", *rootCAPath, err)
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(caBundle); !ok {
Expand All @@ -36,5 +72,5 @@ func LoadTLSCredentials(RootCAPath *string, withInsecure bool) (grpc.DialOption,
if withInsecure {
tlsConfig.InsecureSkipVerify = true
}
return grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), nil
return tlsConfig, nil
}
32 changes: 32 additions & 0 deletions pkg/plugins/kms/kms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kms

import "context"

type EncryptRequest struct {
KeyID string
Plaintext []byte
}

type EncryptResponse struct {
KeyID string
Ciphertext []byte
}

type DecryptRequest struct {
KeyID string
Ciphertext []byte
}

type DecryptResponse struct {
KeyID string
Plaintext []byte
}

// KmsProvider is an interface that KMS plugins must implement.
type KmsProvider interface {
Init(ctx context.Context, config string) error
Close(ctx context.Context) error

Encrypt(ctx context.Context, req *EncryptRequest) (*EncryptResponse, error)
Decrypt(ctx context.Context, req *DecryptRequest) (*DecryptResponse, error)
}
Loading
Loading