Skip to content

Commit 18d1c61

Browse files
optimize(sql-analyzer): cache observer list to reduce k8s api calls (#915)
1 parent 66688f0 commit 18d1c61

File tree

1 file changed

+28
-11
lines changed

1 file changed

+28
-11
lines changed

internal/sql-analyzer/oceanbase/connection_manager.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"context"
1717
"sort"
1818
"sync"
19+
"time"
1920

2021
"github.com/go-logr/logr"
2122
"github.com/pkg/errors"
@@ -34,11 +35,17 @@ import (
3435
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
3536
)
3637

38+
const (
39+
observerListCacheExpireTime = 30 * time.Second
40+
)
41+
3742
type ConnectionManager struct {
38-
obcluster *v1alpha1.OBCluster
39-
connectionMap map[string]*operation.OceanbaseOperationManager
40-
ctx context.Context
41-
mu sync.Mutex
43+
obcluster *v1alpha1.OBCluster
44+
connectionMap map[string]*operation.OceanbaseOperationManager
45+
ctx context.Context
46+
mu sync.Mutex
47+
observerListCache *v1alpha1.OBServerList
48+
lastCacheTime time.Time
4249
}
4350

4451
// NewConnectionManager creates a new ConnectionManager.
@@ -64,19 +71,29 @@ func (cm *ConnectionManager) GetSysReadonlyConnection() (*operation.OceanbaseOpe
6471
}
6572

6673
func (cm *ConnectionManager) GetSysReadonlyConnectionByIP(svrIP string) (*operation.OceanbaseOperationManager, error) {
67-
observerList, err := clients.ListOBServersOfOBCluster(cm.ctx, cm.obcluster)
74+
cm.mu.Lock()
75+
if cm.observerListCache == nil || time.Since(cm.lastCacheTime) > observerListCacheExpireTime {
76+
observerList, err := clients.ListOBServersOfOBCluster(cm.ctx, cm.obcluster)
77+
if err != nil {
78+
cm.mu.Unlock()
79+
return nil, errors.Wrap(err, "Get observers")
80+
}
6881

69-
if err != nil {
70-
return nil, errors.Wrap(err, "Get observers")
82+
// Sort once before caching
83+
sort.Slice(observerList.Items, func(i, j int) bool {
84+
return observerList.Items[i].Status.Status == observerstatus.Running && observerList.Items[j].Status.Status != observerstatus.Running
85+
})
86+
87+
cm.observerListCache = observerList
88+
cm.lastCacheTime = time.Now()
7189
}
90+
observerList := cm.observerListCache
91+
cm.mu.Unlock()
92+
7293
if len(observerList.Items) == 0 {
7394
return nil, errors.Errorf("No observer belongs to cluster %s", cm.obcluster.Name)
7495
}
7596

76-
sort.Slice(observerList.Items, func(i, j int) bool {
77-
return observerList.Items[i].Status.Status == observerstatus.Running && observerList.Items[j].Status.Status != observerstatus.Running
78-
})
79-
8097
var s *connector.OceanBaseDataSource
8198
password, err := cm.readPassword()
8299
if err != nil {

0 commit comments

Comments
 (0)