Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions internal/sql-analyzer/oceanbase/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"sort"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -34,11 +35,17 @@ import (
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
)

const (
observerListCacheExpireTime = 30 * time.Second
)

type ConnectionManager struct {
obcluster *v1alpha1.OBCluster
connectionMap map[string]*operation.OceanbaseOperationManager
ctx context.Context
mu sync.Mutex
obcluster *v1alpha1.OBCluster
connectionMap map[string]*operation.OceanbaseOperationManager
ctx context.Context
mu sync.Mutex
observerListCache *v1alpha1.OBServerList
lastCacheTime time.Time
}

// NewConnectionManager creates a new ConnectionManager.
Expand All @@ -64,19 +71,29 @@ func (cm *ConnectionManager) GetSysReadonlyConnection() (*operation.OceanbaseOpe
}

func (cm *ConnectionManager) GetSysReadonlyConnectionByIP(svrIP string) (*operation.OceanbaseOperationManager, error) {
observerList, err := clients.ListOBServersOfOBCluster(cm.ctx, cm.obcluster)
cm.mu.Lock()
if cm.observerListCache == nil || time.Since(cm.lastCacheTime) > observerListCacheExpireTime {
observerList, err := clients.ListOBServersOfOBCluster(cm.ctx, cm.obcluster)
if err != nil {
cm.mu.Unlock()
return nil, errors.Wrap(err, "Get observers")
}

if err != nil {
return nil, errors.Wrap(err, "Get observers")
// Sort once before caching
sort.Slice(observerList.Items, func(i, j int) bool {
return observerList.Items[i].Status.Status == observerstatus.Running && observerList.Items[j].Status.Status != observerstatus.Running
})

cm.observerListCache = observerList
cm.lastCacheTime = time.Now()
}
observerList := cm.observerListCache
cm.mu.Unlock()

if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", cm.obcluster.Name)
}

sort.Slice(observerList.Items, func(i, j int) bool {
return observerList.Items[i].Status.Status == observerstatus.Running && observerList.Items[j].Status.Status != observerstatus.Running
})

var s *connector.OceanBaseDataSource
password, err := cm.readPassword()
if err != nil {
Expand Down