Skip to content

Commit 5ca5b25

Browse files
author
smiletan
authored
Merge pull request #434 from intelligentfu8/cg-rename
[Feature]operator support drop node use tls
2 parents df92385 + ef0e4fa commit 5ca5b25

File tree

8 files changed

+150
-45
lines changed

8 files changed

+150
-45
lines changed

pkg/common/utils/mysql/mysql.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
package mysql
1919

2020
import (
21-
"database/sql"
22-
"encoding/json"
23-
"fmt"
24-
"os"
25-
26-
_ "github.com/go-sql-driver/mysql"
27-
"github.com/jmoiron/sqlx"
28-
"k8s.io/klog/v2"
21+
"crypto/tls"
22+
"crypto/x509"
23+
"database/sql"
24+
"encoding/json"
25+
"errors"
26+
"fmt"
27+
28+
"github.com/go-sql-driver/mysql"
29+
_ "github.com/go-sql-driver/mysql"
30+
"github.com/jmoiron/sqlx"
31+
corev1 "k8s.io/api/core/v1"
32+
"k8s.io/klog/v2"
2933
)
3034

3135
const (
@@ -40,6 +44,12 @@ type DBConfig struct {
4044
Database string
4145
}
4246

47+
type TLSConfig struct {
48+
CAFileName string
49+
ClientCertFileName string
50+
ClientKeyFileName string
51+
}
52+
4353
func NewDBConfig() DBConfig {
4454
return DBConfig{
4555
Database: "mysql",
@@ -50,11 +60,35 @@ type DB struct {
5060
*sqlx.DB
5161
}
5262

53-
func NewDorisSqlDB(cfg DBConfig) (*DB, error) {
54-
if os.Getenv("DEBUG") == "true" {
55-
cfg.Host = "10.152.183.86"
56-
}
63+
func NewDorisSqlDB(cfg DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) {
5764
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
65+
rootCertPool := x509.NewCertPool()
66+
67+
if tlsConfig != nil && secret != nil {
68+
ca := secret.Data[tlsConfig.CAFileName]
69+
clientCert := secret.Data[tlsConfig.ClientCertFileName]
70+
clientKey := secret.Data[tlsConfig.ClientKeyFileName]
71+
if ok := rootCertPool.AppendCertsFromPEM(ca); !ok {
72+
klog.Errorf("NewDorisSqlDB append cert from pem failed")
73+
return nil, errors.New("NewDorisSqlDB append cert from pem failed")
74+
}
75+
clientCerts := make([]tls.Certificate, 0, 1)
76+
cCert, err := tls.X509KeyPair(clientCert, clientKey)
77+
if err != nil {
78+
return nil, errors.New("NewDorisSqlDB load x509 key pair failed," + err.Error())
79+
}
80+
81+
clientCerts = append(clientCerts, cCert)
82+
registerKey := secret.Namespace + "-" + secret.Name
83+
if err = mysql.RegisterTLSConfig(registerKey, &tls.Config{
84+
RootCAs: rootCertPool,
85+
Certificates: clientCerts,
86+
}); err != nil {
87+
return nil, errors.New("NewDorisSqlDB register tls config failed," + err.Error())
88+
}
89+
dsn = dsn + "?tls=" + registerKey
90+
}
91+
5892
db, err := sqlx.Open("mysql", dsn)
5993
if err != nil {
6094
klog.Errorf("NewDorisSqlDB sqlx.Open failed open doris sql client connection, err: %s \n", err.Error())
@@ -68,8 +102,8 @@ func NewDorisSqlDB(cfg DBConfig) (*DB, error) {
68102
return &DB{db}, nil
69103
}
70104

71-
func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) {
72-
loadBalanceDBClient, err := NewDorisSqlDB(dbConf)
105+
func NewDorisMasterSqlDB(dbConf DBConfig, tlsConfig *TLSConfig, secret *corev1.Secret) (*DB, error) {
106+
loadBalanceDBClient, err := NewDorisSqlDB(dbConf, tlsConfig, secret)
73107
if err != nil {
74108
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
75109
return nil, err
@@ -92,7 +126,7 @@ func NewDorisMasterSqlDB(dbConf DBConfig) (*DB, error) {
92126
Host: master.Host,
93127
Port: dbConf.Port,
94128
Database: "mysql",
95-
})
129+
}, tlsConfig, secret)
96130
if err != nil {
97131
klog.Errorf("NewDorisMasterSqlDB failed, get fe master connection err:%s", err.Error())
98132
return nil, err

pkg/common/utils/resource/configmap.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package resource
2020
import (
2121
"bytes"
2222
"errors"
23+
"os"
24+
2325
dorisv1 "github.com/apache/doris-operator/api/doris/v1"
2426
"github.com/spf13/viper"
2527
corev1 "k8s.io/api/core/v1"
2628
"k8s.io/klog/v2"
27-
"os"
2829
)
2930

3031
// the fe ports key
@@ -46,13 +47,17 @@ const (
4647

4748
// the default ResolveKey
4849
const (
49-
FE_RESOLVEKEY = "fe.conf"
50-
BE_RESOLVEKEY = "be.conf"
51-
CN_RESOLVEKEY = "be.conf"
52-
BROKER_RESOLVEKEY = "apache_hdfs_broker.conf"
53-
MS_RESOLVEKEY = "doris_cloud.conf"
54-
DefaultMsToken = "greedisgood9999"
55-
DefaultMsTokenKey = "http_token"
50+
FE_RESOLVEKEY = "fe.conf"
51+
BE_RESOLVEKEY = "be.conf"
52+
CN_RESOLVEKEY = "be.conf"
53+
BROKER_RESOLVEKEY = "apache_hdfs_broker.conf"
54+
MS_RESOLVEKEY = "doris_cloud.conf"
55+
DefaultMsToken = "greedisgood9999"
56+
DefaultMsTokenKey = "http_token"
57+
ENABLE_TLS_KEY = "enable_tls"
58+
TLS_CERTIFICATE_PATH_KEY = "tls_certificate_path"
59+
TLS_PRIVATE_KEY_PATH_KEY = "tls_private_key_path"
60+
TLS_CA_CERTIFICATE_PATH_KEY = "tls_ca_certificate_path"
5661
)
5762

5863
const ARROW_FLIGHT_SQL_PORT = "arrow_flight_sql_port"

pkg/common/utils/resource/convert.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,11 @@ func GetTerminationGracePeriodSeconds(config map[string]interface{}) int64 {
4141

4242
return 0
4343
}
44+
45+
func GetString(config map[string]interface{}, key string) string {
46+
if v, ok := config[key]; ok {
47+
return v.(string)
48+
}
49+
50+
return ""
51+
}

pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,28 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C
167167

168168
// reconcileStatefulset return bool means reconcile print error message.
169169
func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) {
170-
var est appv1.StatefulSet
171-
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
170+
//use new default value before apply new statefulset, when creating and apply spec change.
171+
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) {
172+
dcgs.useNewDefaultValuesInStatefulset(st)
173+
}
174+
175+
var est appv1.StatefulSet
176+
if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) {
172177
// add downlaodAPI volume Mounts
173178
dcgs.DisaggregatedSubDefaultController.AddDownwardAPI(st)
174-
if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil {
175-
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
176-
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
177-
}
179+
//if err = k8s.CreateClientObject(ctx, dcgs.K8sclient, st); err != nil {
180+
// klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
181+
// return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
182+
//}
183+
184+
//use apply replace create, if use create the default image not replace with be image and annotation for equal not assign.
185+
if err = k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
186+
//creating use the function to assign equal annotation.
187+
return resource.StatefulsetDeepEqualWithKey(st ,est, dv1.DisaggregatedSpecHashValueAnnotation, false)
188+
}, ndf); err != nil {
189+
klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset create statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error())
190+
return &sc.Event{Type: sc.EventWarning, Reason: sc.CGCreateResourceFailed, Message: err.Error()}, err
191+
}
178192

179193
return nil, nil
180194
} else if err != nil {
@@ -191,10 +205,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte
191205
return nil, nil
192206
}
193207

194-
//use new default value before apply new statefulset
195-
ndf := func(st *appv1.StatefulSet, est *appv1.StatefulSet) {
196-
dcgs.useNewDefaultValuesInStatefulset(st)
197-
}
208+
198209
if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool {
199210
//store annotations "doris.disaggregated.cluster/generation={generation}" on statefulset
200211
//store annotations "doris.disaggregated.cluster/update-{uniqueid}=true/false" on DorisDisaggregatedCluster
@@ -605,7 +616,10 @@ func(dcgs *DisaggregatedComputeGroupsController) recordComputeGroupIds(ddc *dv1.
605616
cfg.Host = host
606617
cfg.Port = strconv.FormatInt(int64(queryPort), 10)
607618

608-
db,err := mysql.NewDorisSqlDB(cfg)
619+
tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, ddc)
620+
secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, ddc.Namespace, secretName)
621+
622+
db, err := mysql.NewDorisSqlDB(cfg, tlsConfig, secret)
609623
if err != nil {
610624
klog.Errorf("DisaggregatedComputeGroupsController recordComputeGroupIds new doris client failed,err=%s", err.Error())
611625
return err

pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package computegroups
1919

2020
import (
2121
"context"
22+
"strconv"
23+
"strings"
24+
2225
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
26+
"github.com/apache/doris-operator/pkg/common/utils/k8s"
2327
"github.com/apache/doris-operator/pkg/common/utils/mysql"
2428
"github.com/apache/doris-operator/pkg/common/utils/resource"
2529
appv1 "k8s.io/api/apps/v1"
2630
"k8s.io/klog/v2"
27-
"strconv"
28-
"strings"
2931
)
3032

3133
func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error {
@@ -178,8 +180,11 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context
178180
Port: strconv.FormatInt(int64(queryPort), 10),
179181
Database: "mysql",
180182
}
183+
tlsConfig, secretName := dcgs.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster)
184+
secret, _ := k8s.GetSecret(context.Background(), dcgs.K8sclient, cluster.Namespace, secretName)
185+
181186
// Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time
182-
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
187+
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret)
183188
if err != nil {
184189
klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed for ddc %s namespace %s, get fe node connection err:%s", cluster.Namespace, cluster.Name, err.Error())
185190
return nil, err
@@ -226,7 +231,7 @@ func getScaledOutBENode(
226231
return dropNodes, nil
227232
}
228233

229-
//if in decommission, skip apply statefulset.
234+
// if in decommission, skip apply statefulset.
230235
func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool {
231236
var cgStatus *dv1.ComputeGroupStatus
232237
uniqueId := cg.UniqueId

pkg/controller/sub_controller/disaggregated_cluster/disaggregated_fe/controller.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"strconv"
25+
"strings"
26+
2427
"github.com/apache/doris-operator/api/disaggregated/v1"
2528
"github.com/apache/doris-operator/pkg/common/utils/k8s"
2629
"github.com/apache/doris-operator/pkg/common/utils/mysql"
@@ -33,8 +36,6 @@ import (
3336
"k8s.io/klog/v2"
3437
ctrl "sigs.k8s.io/controller-runtime"
3538
"sigs.k8s.io/controller-runtime/pkg/client"
36-
"strconv"
37-
"strings"
3839
)
3940

4041
var _ sc.DisaggregatedSubController = &DisaggregatedFEController{}
@@ -350,6 +351,8 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s
350351
host := cluster.GetFEVIPAddresss()
351352
confMap := dfc.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps)
352353
queryPort := resource.GetPort(confMap, resource.QUERY_PORT)
354+
tlsConfig, secretName := dfc.DisaggregatedSubDefaultController.FindSecretTLSConfig(confMap, cluster)
355+
secret, _ := k8s.GetSecret(context.Background(), dfc.K8sclient, cluster.Namespace, secretName)
353356

354357
// connect to doris sql to get master node
355358
// It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail.
@@ -360,7 +363,7 @@ func (dfc *DisaggregatedFEController) dropFEBySQLClient(ctx context.Context, k8s
360363
Port: strconv.FormatInt(int64(queryPort), 10),
361364
Database: "mysql",
362365
}
363-
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
366+
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, tlsConfig, secret)
364367
if err != nil {
365368
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
366369
return err

pkg/controller/sub_controller/disaggregated_subcontroller.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,16 @@ import (
2222
"context"
2323
"encoding/json"
2424
"fmt"
25+
"os"
26+
"path"
27+
"path/filepath"
28+
"strconv"
29+
"strings"
30+
2531
"github.com/apache/doris-operator/api/disaggregated/v1"
2632
"github.com/apache/doris-operator/pkg/common/utils/k8s"
2733
"github.com/apache/doris-operator/pkg/common/utils/metadata"
34+
"github.com/apache/doris-operator/pkg/common/utils/mysql"
2835
"github.com/apache/doris-operator/pkg/common/utils/resource"
2936
"github.com/apache/doris-operator/pkg/common/utils/set"
3037
"github.com/spf13/viper"
@@ -34,10 +41,7 @@ import (
3441
"k8s.io/apimachinery/pkg/types"
3542
"k8s.io/client-go/tools/record"
3643
"k8s.io/klog/v2"
37-
"os"
3844
"sigs.k8s.io/controller-runtime/pkg/client"
39-
"strconv"
40-
"strings"
4145
)
4246

4347
const (
@@ -768,3 +772,34 @@ func (d *DisaggregatedSubDefaultController) getFEMetaPath(confMap map[string]int
768772
}
769773
return v.(string)
770774
}
775+
776+
func (d *DisaggregatedSubDefaultController) FindSecretTLSConfig(feConfMap map[string]interface{}, ddc *v1.DorisDisaggregatedCluster) (*mysql.TLSConfig, string /*secret name*/) {
777+
enableTLS := resource.GetString(feConfMap, resource.ENABLE_TLS_KEY)
778+
if enableTLS == "" {
779+
return nil, ""
780+
}
781+
782+
caCertFile := resource.GetString(feConfMap, resource.TLS_CA_CERTIFICATE_PATH_KEY)
783+
clientCertFile := resource.GetString(feConfMap, resource.TLS_CERTIFICATE_PATH_KEY)
784+
clientKeyFile := resource.GetString(feConfMap, resource.TLS_PRIVATE_KEY_PATH_KEY)
785+
caFileName := path.Base(caCertFile)
786+
clientCertFileName := path.Base(clientCertFile)
787+
clientKeyFileName := path.Base(clientKeyFile)
788+
789+
caCertDir := filepath.Dir(caCertFile)
790+
secretName := ""
791+
for _, sn := range ddc.Spec.FeSpec.Secrets {
792+
if sn.MountPath == caCertDir {
793+
secretName = sn.SecretName
794+
break
795+
}
796+
}
797+
798+
tlsConfig := &mysql.TLSConfig{
799+
CAFileName: caFileName,
800+
ClientCertFileName: clientCertFileName,
801+
ClientKeyFileName: clientKeyFileName,
802+
}
803+
804+
return tlsConfig, secretName
805+
}

pkg/controller/sub_controller/fe/prepare_modify.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (fc *Controller) safeScaleDown(cluster *v1.DorisCluster, ost *appv1.Statefu
9696

9797
return
9898
}
99+
99100
// dropObserverBySqlClient handles doris'SQL(drop frontend) through the MySQL client when dealing with scale in observer
100101
// targetDCR is new dcr
101102
func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient client.Client, targetDCR *v1.DorisCluster) error {
@@ -118,7 +119,7 @@ func (fc *Controller) dropObserverBySqlClient(ctx context.Context, k8sclient cli
118119
Port: strconv.FormatInt(int64(queryPort), 10),
119120
Database: "mysql",
120121
}
121-
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf)
122+
masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf, nil, nil)
122123
if err != nil {
123124
klog.Errorf("NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error())
124125
return err

0 commit comments

Comments
 (0)