Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions internal/cli/cluster/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
Copyright (c) 2025 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:

http://license.coscl.org.cn/MulanPSL2

THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package cluster

import (
"encoding/base64"
"fmt"
"os/exec"
"strings"

"github.com/pkg/errors"
"github.com/spf13/cobra"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/oceanbase/ob-operator/internal/cli/generic"
utils "github.com/oceanbase/ob-operator/internal/cli/utils"
"github.com/oceanbase/ob-operator/internal/clients"
)

type ConnectOptions struct {
generic.ResourceOption
ClusterId int64
ObserverIp string
TenantName string
Database string
User string
Password string
Port string
}

func NewConnectOptions() *ConnectOptions {
return &ConnectOptions{}
}

func (o *ConnectOptions) Validate() error {
if o.Namespace == "" {
return errors.New("namespace is not specified")
}
if o.User == "" {
return errors.New("user is not specified")
}
if o.TenantName == "" {
return errors.New("tenant name is not specified")
}
if o.Port == "" {
return errors.New("port is not specified")
}
return nil
}

func (o *ConnectOptions) Parse(cmd *cobra.Command, args []string) error {
// parse args
o.Name = args[0]
o.Cmd = cmd
// get obcluster
obcluster, err := clients.GetOBCluster(o.Cmd.Context(), o.Namespace, o.Name)
if err != nil {
if kubeerrors.IsNotFound(err) {
return fmt.Errorf("OBCluster %s not found", o.Name)
} else {

Check warning on line 71 in internal/cli/cluster/connect.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the lint problem showed in this page.

Copy link
Member

@chris-sun-star chris-sun-star Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's only 2 problems, you don't need to handle the previously pushed code.

return err
}
}
if err := utils.CheckClusterStatus(obcluster); err != nil {
return err
}
o.ClusterId = obcluster.Spec.ClusterId
return nil
}

func (o *ConnectOptions) Complete() error {
// Try to get password from secret if not provided
if err := o.GetPasswordFromSecret(); err != nil {
return fmt.Errorf("failed to get password: %v", err)
}
return nil
}

func (o *ConnectOptions) Run() error {
if err := o.GetObserverIp(); err != nil {
return err
}

cmd := exec.Command("mysql",
"-h"+o.ObserverIp,
"-p"+o.Password,
"-u"+o.User+"@"+o.TenantName,
o.Database,
"-A",
"-c",
"-P"+o.Port)

return utils.RunCmd(cmd)
}

// getAvailableZones returns a list of available zones for the cluster
func (o *ConnectOptions) getAvailableZones() ([]string, error) {
// Run kubectl command to get cluster status
cmd := exec.Command("kubectl", "get", "obcluster", o.Name, "-n", o.Namespace, "-o", "yaml")
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to get cluster information: %v", err)
}

// Parse the output to find zones
lines := strings.Split(string(output), "\n")
zones := make([]string, 0)
inObzones := false

for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}

if strings.HasPrefix(line, "obzones:") {
inObzones = true
continue
}

if inObzones && strings.HasPrefix(line, "- ") {
if !strings.Contains(line, "status:") && !strings.Contains(line, "zone:") {
inObzones = false
continue
}
}

// Parse zone information
if inObzones && strings.Contains(line, "zone:") {
fields := strings.Split(line, "zone:")
if len(fields) == 2 {
zone := strings.TrimSpace(fields[1])
zones = append(zones, zone)
}
}
}

if len(zones) == 0 {
return nil, fmt.Errorf("no available zones found for cluster %s", o.Name)
}

return zones, nil
}

func (o *ConnectOptions) GetObserverIp() error {
// First get available zones
zones, err := o.getAvailableZones()
if err != nil {
return err
}

// Try to get observer IP from each zone until we find one
for _, zone := range zones {
cmd := exec.Command("kubectl", "get", "pods", "-o", "wide", "-n", o.Namespace)
output, err := cmd.CombinedOutput()
if err != nil {
continue
}

lines := strings.Split(string(output), "\n")
if len(lines) < 2 {
continue
}
for _, line := range lines[1:] { // Skip header line
// Use alternative space to split fields
fields := strings.FieldsFunc(line, func(r rune) bool {
return r == ' ' || r == '\t'
})

if len(fields) < 7 {
continue
}

podName := fields[0]
var podIP string
// check every field to find ip, because the ip maybe analyzed as other fields
for i := 5; i < len(fields); i++ {
if strings.Contains(fields[i], ".") {
podIP = fields[i]
break
}
}

if podIP == "" {
continue
}

// check if this is an observer pod for the current zone
if strings.Contains(podName, zone) {
parts := strings.Split(podIP, ".")
if len(parts) != 4 {
continue
}

o.ObserverIp = podIP
return nil
}
}
}

return fmt.Errorf("no observer pod found in any available zone in namespace %s", o.Namespace)
}

// GetPasswordFromSecret gets password from secret, it can get sys tenant root password and other tenant's password
func (o *ConnectOptions) GetPasswordFromSecret() error {
cmd := exec.Command("kubectl", "get", "secrets", "-n", o.Namespace, "-o", "jsonpath={.items[*].metadata.name}")
output, err := cmd.CombinedOutput()
if err == nil && len(output) > 0 {
// find the secret and get password
secrets := strings.Split(string(output), " ")
// Use the exact pattern with cluster name and ID
expectedSecretPrefix := fmt.Sprintf("%s-%d-root-", o.Name, o.ClusterId)

for _, secretName := range secrets {
if strings.HasPrefix(secretName, expectedSecretPrefix) {
cmd = exec.Command("kubectl", "get", "secret", secretName, "-n", o.Namespace, "-o", "jsonpath={.data.password}")
output, err = cmd.CombinedOutput()
if err == nil && len(output) > 0 {
// decode base64 password
decodedBytes, err := base64.StdEncoding.DecodeString(string(output))
if err == nil {
o.Password = string(decodedBytes)
break
}
}
}
}
}

if o.Password == "" {
return fmt.Errorf("password is not specified and not found in secrets")
}

if o.TenantName != DEFAULT_TENANT_NAME && o.User != DEFAULT_USER {

Check warning on line 245 in internal/cli/cluster/connect.go

View workflow job for this annotation

GitHub Actions / lint

empty-block: this block is empty, you can remove it (revive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is here something missing


}

return nil
}

func (o *ConnectOptions) AddFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&o.Namespace, FLAG_NAMESPACE, SHORTHAND_NAMESPACE, DEFAULT_NAMESPACE, "The namespace of the cluster")
cmd.Flags().StringVarP(&o.Port, FLAG_PORT, SHORTHAND_PORT, DEFAULT_PORT, "The port for connecting to the cluster")
cmd.Flags().StringVarP(&o.Database, FLAG_DATABASE, SHORTHAND_DATABASE, DEFAULT_DATABASE, "The database name of the tenant")
cmd.Flags().StringVarP(&o.User, FLAG_USER, SHORTHAND_USER, DEFAULT_USER, "The user name of the tenant")
cmd.Flags().StringVarP(&o.TenantName, FLAG_TENANT_NAME, SHORTHAND_TENANT_NAME, DEFAULT_TENANT_NAME, "The tenant name of the cluster")
}
18 changes: 15 additions & 3 deletions internal/cli/cluster/enter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ const (

// Flags for all the commands in cluster management
FLAG_CLUSTER_NAME = "cluster-name"
FLAG_TENANT_NAME = "tenant-name"
FLAG_NAMESPACE = "namespace"
FLAG_CLUSTER_ID = "id"
FLAG_ROOT_PASSWORD = "root-password"
FLAG_PORT = "port"
FLAG_MODE = "mode"
FLAG_NAME = "name"
FLAG_DATABASE = "database"
FLAG_USER = "user"

// Flags for zone-related options
FLAG_ZONES = "zones"
Expand Down Expand Up @@ -82,6 +86,10 @@ const (
DEFAULT_MONITOR_MEMORY = 1
DEFAULT_NAME = "test"
DEFAULT_MODE = "service"
DEFAULT_PORT = "2881"
DEFAULT_DATABASE = "oceanbase"
DEFAULT_TENANT_NAME = "sys"
DEFAULT_USER = "root"

// Default values for Parameter flag
DEFAULT_MIN_FULL_RESOURCE_POOL_MEMORY = "2147483648"
Expand All @@ -107,9 +115,13 @@ const (

// Shorthand for cluster management
const (
SHORTHAND_ZONES = "z"
SHORTHAND_NAMESPACE = "n"
SHORTHAND_PASSWD = "p"
SHORTHAND_ZONES = "z"
SHORTHAND_NAMESPACE = "n"
SHORTHAND_PASSWD = "p"
SHORTHAND_PORT = "P"
SHORTHAND_DATABASE = "D"
SHORTHAND_USER = "u"
SHORTHAND_TENANT_NAME = "t"

// Shorthand for demo cluster creation
SHORTHAND_WAIT = "w"
Expand Down
1 change: 1 addition & 0 deletions internal/cli/cmd/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ func NewCmd() *cobra.Command {
cmd.AddCommand(NewListCmd())
cmd.AddCommand(NewScaleCmd())
cmd.AddCommand(NewShowCmd())
cmd.AddCommand(NewConnectCmd())
return cmd
}
46 changes: 46 additions & 0 deletions internal/cli/cmd/cluster/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright (c) 2025 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:

http://license.coscl.org.cn/MulanPSL2

THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package cluster

import (
"github.com/spf13/cobra"

cluster "github.com/oceanbase/ob-operator/internal/cli/cluster"
"github.com/oceanbase/ob-operator/internal/cli/utils"
)

// NewConnectCmd connect to an ob cluster by sys tenant
func NewConnectCmd() *cobra.Command {
o := cluster.NewConnectOptions()
logger := utils.GetDefaultLoggerInstance()
cmd := &cobra.Command{
Use: "connect <cluster_name>",
Short: "Connect to an ob cluster by sys tenant",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not only sys tenant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually, we just need one single command for tenant module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just mean the description, you've provided an option to pass the tenant name

Args: cobra.ExactArgs(1),
PreRunE: o.Parse,
Run: func(cmd *cobra.Command, args []string) {
if err := o.Complete(); err != nil {
logger.Fatalln(err)
}
if err := o.Validate(); err != nil {
logger.Fatalln(err)
}
if err := o.Run(); err != nil {
logger.Fatalln(err)
}
},
}
o.AddFlags(cmd)
return cmd
}
1 change: 1 addition & 0 deletions internal/cli/utils/cmd_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func UpdateHelmRepo() error {

// RunCmd runs the command and prints the output to stdout
func RunCmd(cmd *exec.Cmd) error {
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
Expand Down
Loading