Skip to content

Commit 44edb93

Browse files
authored
feat: implement primary transfer functionality for Resource Manager (#6661)
1 parent 30c7dd8 commit 44edb93

File tree

21 files changed

+889
-15
lines changed

21 files changed

+889
-15
lines changed

cmd/tidb-operator/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ import (
7878
"github.com/pingcap/tidb-operator/v2/pkg/metrics"
7979
"github.com/pingcap/tidb-operator/v2/pkg/scheme"
8080
pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd"
81+
rmm "github.com/pingcap/tidb-operator/v2/pkg/timanager/resourcemanager"
8182
fm "github.com/pingcap/tidb-operator/v2/pkg/timanager/tiflash"
8283
tsom "github.com/pingcap/tidb-operator/v2/pkg/timanager/tso"
8384
"github.com/pingcap/tidb-operator/v2/pkg/utils/informertest"
@@ -245,6 +246,7 @@ func setup(ctx context.Context, mgr ctrl.Manager) error {
245246
logger.Info("setup client manager")
246247
pdcm := pdm.NewPDClientManager(mgr.GetLogger(), c)
247248
tsocm := tsom.NewTSOClientManager(mgr.GetLogger(), c)
249+
rmcm := rmm.NewResourceManagerClientManager(mgr.GetLogger(), c)
248250
fcm := fm.NewTiFlashClientManager(mgr.GetLogger(), c)
249251

250252
logger.Info("setup volume modifier")
@@ -253,7 +255,7 @@ func setup(ctx context.Context, mgr ctrl.Manager) error {
253255
am := adoption.New(ctrl.Log.WithName("adoption"))
254256
tf := tracker.New()
255257
setupLog.Info("setup controllers")
256-
if err := setupControllers(mgr, c, pdcm, tsocm, fcm, vm, tf, am); err != nil {
258+
if err := setupControllers(mgr, c, pdcm, tsocm, rmcm, fcm, vm, tf, am); err != nil {
257259
setupLog.Error(err, "unable to setup controllers")
258260
os.Exit(1)
259261
}
@@ -265,6 +267,7 @@ func setup(ctx context.Context, mgr ctrl.Manager) error {
265267
logger.Info("start client manager")
266268
pdcm.Start(ctx)
267269
tsocm.Start(ctx)
270+
rmcm.Start(ctx)
268271

269272
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
270273
return fmt.Errorf("unable to set up health check: %w", err)
@@ -383,6 +386,7 @@ func setupControllers(
383386
c client.Client,
384387
pdcm pdm.PDClientManager,
385388
tsocm tsom.TSOClientManager,
389+
rmcm rmm.ResourceManagerClientManager,
386390
fcm fm.TiFlashClientManager,
387391
vm volumes.ModifierFactory,
388392
tf tracker.Factory,
@@ -482,7 +486,7 @@ func setupControllers(
482486
{
483487
name: "ResourceManagerGroup",
484488
setupFunc: func() error {
485-
return resourcemanagergroup.Setup(mgr, c, tf.AllocateFactory("resourcemanager"))
489+
return resourcemanagergroup.Setup(mgr, c, rmcm, tf.AllocateFactory("resourcemanager"))
486490
},
487491
},
488492
{

pkg/controllers/resourcemanager/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
3434
common.TaskInstanceConditionRunning[scope.ResourceManager](state),
3535
common.TaskStatusPersister[scope.ResourceManager](state, r.Client),
3636
),
37-
3837
common.TaskContextCluster[scope.ResourceManager](state, r.Client),
3938
task.IfBreak(common.CondClusterIsPaused(state)),
4039
task.IfBreak(common.CondClusterIsDeleting(state),
@@ -44,6 +43,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
4443
common.TaskFinalizerAdd[scope.ResourceManager](state, r.Client),
4544

4645
common.TaskContextPod[scope.ResourceManager](state, r.Client),
46+
tasks.TaskContextClient(state, r.PDClientManager),
4747

4848
task.IfBreak(
4949
common.CondClusterIsSuspending(state),

pkg/controllers/resourcemanager/tasks/ctx.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,30 @@
1414

1515
package tasks
1616

17+
import (
18+
"context"
19+
20+
"github.com/pingcap/tidb-operator/v2/pkg/timanager"
21+
pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd"
22+
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
23+
)
24+
1725
type ReconcileContext struct {
1826
// TODO: replace all fields in ReconcileContext by State
1927
State
28+
29+
PDClient pdm.PDClient
30+
}
31+
32+
func TaskContextClient(state *ReconcileContext, cm pdm.PDClientManager) task.Task {
33+
return task.NameTaskFunc("ContextClient", func(_ context.Context) task.Result {
34+
ck := state.Cluster()
35+
key := timanager.PrimaryKey(ck.Namespace, ck.Name)
36+
pc, ok := cm.Get(key)
37+
if !ok {
38+
return task.Wait().With("pd client has not been registered yet")
39+
}
40+
state.PDClient = pc
41+
return task.Complete().With("pd client is ready")
42+
})
2043
}

pkg/controllers/resourcemanager/tasks/pod.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/pingcap/tidb-operator/v2/pkg/utils/k8s"
3535
maputil "github.com/pingcap/tidb-operator/v2/pkg/utils/map"
3636
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
37+
"github.com/pingcap/tidb-operator/v2/third_party/kubernetes/pkg/controller/statefulset"
3738
)
3839

3940
const (
@@ -56,6 +57,19 @@ func TaskPod(state *ReconcileContext, c client.Client) task.Task {
5657
}
5758

5859
if !reloadable.CheckResourceManagerPod(obj, pod) {
60+
if statefulset.IsPodReady(pod) {
61+
if state.PDClient == nil {
62+
return task.Wait().With("wait for pd client being ready")
63+
}
64+
wait, err := transferPrimaryIfNeeded(ctx, logger, c, ck, obj, state.PDClient)
65+
if err != nil {
66+
return task.Fail().With("pre delete pod of resource manager failed: %v", err)
67+
}
68+
if wait {
69+
return task.Wait().With("wait for resource manager primary being transferred")
70+
}
71+
}
72+
5973
logger.Info("will delete the pod to recreate", "name", pod.Name, "namespace", pod.Namespace, "UID", pod.UID)
6074

6175
if err := c.Delete(ctx, pod); err != nil {
@@ -180,8 +194,7 @@ func buildReadinessProbe(cluster *v1alpha1.Cluster, port int32) *corev1.Probe {
180194
scheme = "https"
181195
}
182196

183-
// TODO: update to /health after RM supports it
184-
readinessURL := fmt.Sprintf("%s://127.0.0.1:%d/status", scheme, port)
197+
readinessURL := fmt.Sprintf("%s://127.0.0.1:%d/health", scheme, port)
185198
command := []string{
186199
"curl",
187200
readinessURL,
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package tasks
16+
17+
import (
18+
"context"
19+
"crypto/tls"
20+
"strings"
21+
"time"
22+
23+
"github.com/go-logr/logr"
24+
25+
"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
26+
"github.com/pingcap/tidb-operator/v2/pkg/apicall"
27+
coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1"
28+
"github.com/pingcap/tidb-operator/v2/pkg/client"
29+
"github.com/pingcap/tidb-operator/v2/pkg/resourcemanagerapi"
30+
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
31+
pdm "github.com/pingcap/tidb-operator/v2/pkg/timanager/pd"
32+
33+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
34+
)
35+
36+
const (
37+
resourceManagerServiceName = "resource_manager"
38+
39+
defaultAPITimeout = 5 * time.Second
40+
)
41+
42+
var (
43+
newRMClient = resourcemanagerapi.NewClient
44+
)
45+
46+
func transferPrimaryIfNeeded(
47+
ctx context.Context,
48+
logger logr.Logger,
49+
c client.Client,
50+
cluster *v1alpha1.Cluster,
51+
rm *v1alpha1.ResourceManager,
52+
pdClient pdm.PDClient,
53+
) (bool, error) {
54+
groupName := rm.Labels[v1alpha1.LabelKeyGroup]
55+
if groupName == "" {
56+
// Not managed by ResourceManagerGroup, skip.
57+
return false, nil
58+
}
59+
60+
if cluster == nil || strings.TrimSpace(cluster.Status.PD) == "" {
61+
return false, nil
62+
}
63+
if pdClient == nil || pdClient.Underlay() == nil {
64+
return false, nil
65+
}
66+
67+
peers, err := listGroupResourceManagers(ctx, c, cluster, rm, groupName)
68+
if err != nil {
69+
return false, err
70+
}
71+
if len(peers) <= 1 {
72+
return false, nil
73+
}
74+
75+
var tlsConfig *tls.Config
76+
if coreutil.IsTLSClusterEnabled(cluster) {
77+
cfg, err := apicall.GetClientTLSConfig(ctx, c, cluster)
78+
if err != nil {
79+
return false, err
80+
}
81+
tlsConfig = cfg
82+
}
83+
84+
primaryAddr, err := pdClient.Underlay().GetMicroServicePrimary(ctx, resourceManagerServiceName)
85+
if err != nil {
86+
return false, err
87+
}
88+
if primaryAddr == "" {
89+
return false, nil
90+
}
91+
92+
primaryAddr = strings.TrimRight(strings.TrimSpace(primaryAddr), "/")
93+
myAddr := coreutil.InstanceAdvertiseURL[scope.ResourceManager](cluster, rm, coreutil.ResourceManagerClientPort(rm))
94+
if myAddr == "" || normalizeMicroserviceAddr(primaryAddr) != normalizeMicroserviceAddr(myAddr) {
95+
return false, nil
96+
}
97+
98+
transferee := coreutil.LongestReadyPeer[scope.ResourceManager](rm, peers)
99+
if transferee == nil {
100+
logger.Info("no healthy transferee available for resource manager primary transfer", "name", rm.Name)
101+
return false, nil
102+
}
103+
104+
logger.Info("try to transfer resource manager primary", "from", rm.Name, "to", transferee.Name)
105+
106+
rmClient := newRMClient(ensureAddrScheme(primaryAddr, coreutil.IsTLSClusterEnabled(cluster)), defaultAPITimeout, tlsConfig)
107+
if err := rmClient.TransferPrimary(ctx, transferee.Name); err != nil {
108+
return false, err
109+
}
110+
111+
return true, nil
112+
}
113+
114+
func listGroupResourceManagers(
115+
ctx context.Context,
116+
c client.Client,
117+
cluster *v1alpha1.Cluster,
118+
rm *v1alpha1.ResourceManager,
119+
groupName string,
120+
) ([]*v1alpha1.ResourceManager, error) {
121+
var list v1alpha1.ResourceManagerList
122+
selector := map[string]string{
123+
v1alpha1.LabelKeyComponent: v1alpha1.LabelValComponentResourceManager,
124+
v1alpha1.LabelKeyCluster: cluster.Name,
125+
v1alpha1.LabelKeyGroup: groupName,
126+
}
127+
if err := c.List(ctx, &list, ctrlclient.InNamespace(rm.Namespace), ctrlclient.MatchingLabels(selector)); err != nil {
128+
return nil, err
129+
}
130+
peers := make([]*v1alpha1.ResourceManager, 0, len(list.Items))
131+
for i := range list.Items {
132+
peer := &list.Items[i]
133+
peers = append(peers, peer)
134+
}
135+
return peers, nil
136+
}
137+
138+
func normalizeMicroserviceAddr(addr string) string {
139+
addr = strings.TrimRight(strings.TrimSpace(addr), "/")
140+
addr = strings.TrimPrefix(addr, "http://")
141+
addr = strings.TrimPrefix(addr, "https://")
142+
return addr
143+
}
144+
145+
func ensureAddrScheme(addr string, tlsEnabled bool) string {
146+
addr = strings.TrimRight(strings.TrimSpace(addr), "/")
147+
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
148+
return addr
149+
}
150+
scheme := "http"
151+
if tlsEnabled {
152+
scheme = "https"
153+
}
154+
return scheme + "://" + addr
155+
}

0 commit comments

Comments
 (0)