Skip to content

Commit e510ef5

Browse files
committed
feat: add TLS support for Keeper connections
1 parent 8ca4889 commit e510ef5

File tree

5 files changed

+72
-35
lines changed

5 files changed

+72
-35
lines changed

pkg/backup/create.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,7 @@ func (b *Backuper) createBackupNamedCollections(ctx context.Context, backupPath
794794
}
795795

796796
k := keeper.Keeper{}
797-
if err = k.Connect(ctx, b.ch); err != nil {
797+
if err = k.Connect(ctx, b.ch, b.cfg); err != nil {
798798
return 0, err
799799
}
800800
defer k.Close()
@@ -844,7 +844,7 @@ func (b *Backuper) createBackupRBACReplicated(ctx context.Context, rbacBackup st
844844
rbacDataSize := uint64(0)
845845
if err = b.ch.SelectContext(ctx, &replicatedRBAC, "SELECT name FROM system.user_directories WHERE type='replicated'"); err == nil && len(replicatedRBAC) > 0 {
846846
k := keeper.Keeper{}
847-
if err = k.Connect(ctx, b.ch); err != nil {
847+
if err = k.Connect(ctx, b.ch, b.cfg); err != nil {
848848
return 0, err
849849
}
850850
defer k.Close()

pkg/backup/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ func (b *Backuper) restoreRBAC(ctx context.Context, backupName string, disks []c
500500
replicatedUserDirectories := make([]clickhouse.UserDirectory, 0)
501501
if err = b.ch.SelectContext(ctx, &replicatedUserDirectories, "SELECT name FROM system.user_directories WHERE type='replicated'"); err == nil && len(replicatedUserDirectories) > 0 {
502502
k = &keeper.Keeper{}
503-
if connErr := k.Connect(ctx, b.ch); connErr != nil {
503+
if connErr := k.Connect(ctx, b.ch, b.cfg); connErr != nil {
504504
return errors.Wrap(connErr, "but can't connect to keeper")
505505
}
506506
defer k.Close()

pkg/clickhouse/clickhouse.go

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package clickhouse
22

33
import (
44
"context"
5-
"crypto/tls"
6-
"crypto/x509"
75
"database/sql"
86
"fmt"
97
"os"
@@ -18,6 +16,7 @@ import (
1816
"github.com/Altinity/clickhouse-backup/v2/pkg/common"
1917
"github.com/Altinity/clickhouse-backup/v2/pkg/config"
2018
"github.com/Altinity/clickhouse-backup/v2/pkg/metadata"
19+
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
2120
"github.com/ClickHouse/clickhouse-go/v2"
2221
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
2322
"github.com/antchfx/xmlquery"
@@ -87,31 +86,9 @@ func (ch *ClickHouse) Connect() error {
8786
}
8887

8988
if ch.Config.Secure {
90-
tlsConfig := &tls.Config{
91-
InsecureSkipVerify: ch.Config.SkipVerify,
92-
}
93-
if ch.Config.TLSKey != "" || ch.Config.TLSCert != "" || ch.Config.TLSCa != "" {
94-
if ch.Config.TLSCert != "" || ch.Config.TLSKey != "" {
95-
cert, err := tls.LoadX509KeyPair(ch.Config.TLSCert, ch.Config.TLSKey)
96-
if err != nil {
97-
log.Error().Msgf("tls.LoadX509KeyPair error: %v", err)
98-
return err
99-
}
100-
tlsConfig.Certificates = []tls.Certificate{cert}
101-
}
102-
if ch.Config.TLSCa != "" {
103-
caCert, err := os.ReadFile(ch.Config.TLSCa)
104-
if err != nil {
105-
log.Error().Msgf("read `tls_ca` file %s return error: %v ", ch.Config.TLSCa, err)
106-
return err
107-
}
108-
caCertPool := x509.NewCertPool()
109-
if caCertPool.AppendCertsFromPEM(caCert) != true {
110-
log.Error().Msgf("AppendCertsFromPEM %s return false", ch.Config.TLSCa)
111-
return fmt.Errorf("AppendCertsFromPEM %s return false", ch.Config.TLSCa)
112-
}
113-
tlsConfig.RootCAs = caCertPool
114-
}
89+
tlsConfig, err := utils.NewTLSConfig(ch.Config.TLSCa, ch.Config.TLSCert, ch.Config.TLSKey, ch.Config.SkipVerify)
90+
if err != nil {
91+
return err
11592
}
11693
opt.TLS = tlsConfig
11794
}

pkg/keeper/keeper.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package keeper
33
import (
44
"bufio"
55
"context"
6+
"crypto/tls"
67
"encoding/json"
78
"fmt"
89
"io"
10+
"net"
911
"os"
1012
"path"
1113
"strconv"
@@ -18,6 +20,8 @@ import (
1820
"github.com/rs/zerolog/log"
1921

2022
"github.com/Altinity/clickhouse-backup/v2/pkg/clickhouse"
23+
"github.com/Altinity/clickhouse-backup/v2/pkg/config"
24+
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
2125
"github.com/go-zookeeper/zk"
2226
)
2327

@@ -59,7 +63,7 @@ type Keeper struct {
5963
}
6064

6165
// Connect - connect to any zookeeper server from /var/lib/clickhouse/preprocessed_configs/config.xml
62-
func (k *Keeper) Connect(ctx context.Context, ch *clickhouse.ClickHouse) error {
66+
func (k *Keeper) Connect(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config) error {
6367
configFile, doc, err := ch.ParseXML(ctx, "config.xml")
6468
if err != nil {
6569
return errors.Wrapf(err, "can't parse config.xml from %s, error", configFile)
@@ -83,6 +87,7 @@ func (k *Keeper) Connect(ctx context.Context, ch *clickhouse.ClickHouse) error {
8387
return errors.WithStack(fmt.Errorf("/zookeeper/node not exists in %s", configFile))
8488
}
8589
keeperHosts := make([]string, len(nodeList))
90+
isSecure := false
8691
for i, node := range nodeList {
8792
hostNode := node.SelectElement("host")
8893
if hostNode == nil {
@@ -93,11 +98,37 @@ func (k *Keeper) Connect(ctx context.Context, ch *clickhouse.ClickHouse) error {
9398
if portNode != nil {
9499
port = portNode.InnerText()
95100
}
101+
secureNode := node.SelectElement("secure")
102+
if secureNode != nil && (secureNode.InnerText() == "1" || secureNode.InnerText() == "true") {
103+
isSecure = true
104+
}
96105
keeperHosts[i] = fmt.Sprintf("%s:%s", hostNode.InnerText(), port)
97106
}
98-
conn, _, err := zk.Connect(keeperHosts, sessionTimeout, zk.WithLogger(newKeeperLogger()))
99-
if err != nil {
100-
return err
107+
var conn *zk.Conn
108+
if isSecure {
109+
log.Info().Msgf("isSecure=%v, keeperHosts=%v, caPath=%v, certPath=%v, keyPath=%v, skipVerify=%v, use TLS for keeper connection", isSecure, keeperHosts, cfg.ClickHouse.TLSCa, cfg.ClickHouse.TLSCert, cfg.ClickHouse.TLSKey, cfg.ClickHouse.SkipVerify)
110+
tlsConfig, err := utils.NewTLSConfig(cfg.ClickHouse.TLSCa, cfg.ClickHouse.TLSCert, cfg.ClickHouse.TLSKey, cfg.ClickHouse.SkipVerify)
111+
if err != nil {
112+
return errors.Wrap(err, "can't create TLS config")
113+
}
114+
conn, _, err = zk.Connect(keeperHosts, sessionTimeout, zk.WithLogger(newKeeperLogger()), zk.WithDialer(func(network, address string, timeout time.Duration) (net.Conn, error) {
115+
tlsConn, dialErr := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, network, address, tlsConfig)
116+
if dialErr != nil {
117+
log.Error().Msgf("TLS dial to %s failed: %v", address, dialErr)
118+
return nil, dialErr
119+
}
120+
return tlsConn, nil
121+
}))
122+
if err != nil {
123+
log.Error().Msgf("zk.Connect with TLS failed: %v", err)
124+
return err
125+
}
126+
} else {
127+
log.Info().Msgf("isSecure=%v, keeperHosts=%v", isSecure, keeperHosts)
128+
conn, _, err = zk.Connect(keeperHosts, sessionTimeout, zk.WithLogger(newKeeperLogger()))
129+
if err != nil {
130+
return err
131+
}
101132
}
102133
if digestNode := zookeeperNode.SelectElement("digest"); digestNode != nil {
103134
if err = conn.AddAuth("digest", []byte(digestNode.InnerText())); err != nil {

pkg/utils/utils.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package utils
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"fmt"
6-
"github.com/rs/zerolog/log"
8+
"os"
79
"os/exec"
810
"regexp"
911
"strings"
1012
"time"
13+
14+
"github.com/rs/zerolog/log"
1115
)
1216

1317
const (
@@ -72,3 +76,28 @@ func ExecCmdOut(ctx context.Context, timeout time.Duration, cmd string, args ...
7276
cancel()
7377
return string(out), err
7478
}
79+
80+
func NewTLSConfig(caPath, certPath, keyPath string, skipVerify bool) (*tls.Config, error) {
81+
tlsConfig := &tls.Config{
82+
InsecureSkipVerify: skipVerify,
83+
}
84+
if certPath != "" || keyPath != "" {
85+
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
86+
if err != nil {
87+
return nil, fmt.Errorf("tls.LoadX509KeyPair error: %v", err)
88+
}
89+
tlsConfig.Certificates = []tls.Certificate{cert}
90+
}
91+
if caPath != "" {
92+
caCert, err := os.ReadFile(caPath)
93+
if err != nil {
94+
return nil, fmt.Errorf("read ca file %s return error: %v", caPath, err)
95+
}
96+
caCertPool := x509.NewCertPool()
97+
if !caCertPool.AppendCertsFromPEM(caCert) {
98+
return nil, fmt.Errorf("AppendCertsFromPEM %s return false", caPath)
99+
}
100+
tlsConfig.RootCAs = caCertPool
101+
}
102+
return tlsConfig, nil
103+
}

0 commit comments

Comments
 (0)