Skip to content

Commit 5dda440

Browse files
committed
feat: create kubectl-fleet plugin
Signed-off-by: Wantong Jiang <[email protected]>
1 parent 0e90c29 commit 5dda440

File tree

9 files changed

+1122
-16
lines changed

9 files changed

+1122
-16
lines changed

test/e2e/drain_tool_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ var _ = Describe("Drain is allowed on one cluster, blocked on others - ClusterRe
359359

360360
func runDrainClusterBinary(hubClusterName, memberClusterName string) {
361361
By(fmt.Sprintf("draining cluster %s", memberClusterName))
362-
cmd := exec.Command(drainBinaryPath,
362+
cmd := exec.Command(fleetBinaryPath, "drain",
363363
"--hubClusterContext", hubClusterName,
364364
"--clusterName", memberClusterName)
365365
_, err := cmd.CombinedOutput()
@@ -368,7 +368,7 @@ func runDrainClusterBinary(hubClusterName, memberClusterName string) {
368368

369369
func runUncordonClusterBinary(hubClusterName, memberClusterName string) {
370370
By(fmt.Sprintf("uncordoning cluster %s", memberClusterName))
371-
cmd := exec.Command(uncordonBinaryPath,
371+
cmd := exec.Command(fleetBinaryPath, "uncordon",
372372
"--hubClusterContext", hubClusterName,
373373
"--clusterName", memberClusterName)
374374
_, err := cmd.CombinedOutput()

test/e2e/setup.sh

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,6 @@ done
215215
# Create tools directory if it doesn't exist
216216
mkdir -p ../../hack/tools/bin
217217

218-
# Build drain binary
219-
echo "Building drain binary..."
220-
go build -o ../../hack/tools/bin/kubectl-draincluster ../../tools/draincluster
221-
222-
# Build uncordon binary
223-
echo "Building uncordon binary..."
224-
go build -o ../../hack/tools/bin/kubectl-uncordoncluster ../../tools/uncordoncluster
218+
# Build fleet plugin binary
219+
echo "Building fleet kubectl-plugin binary..."
220+
go build -o ../../hack/tools/bin/kubectl-fleet ../../tools/fleet

test/e2e/setup_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ var (
173173
)
174174

175175
var (
176-
drainBinaryPath = filepath.Join("../../", "hack", "tools", "bin", "kubectl-draincluster")
177-
uncordonBinaryPath = filepath.Join("../../", "hack", "tools", "bin", "kubectl-uncordoncluster")
176+
fleetBinaryPath = filepath.Join("../../", "hack", "tools", "bin", "kubectl-fleet")
178177
)
179178

180179
var (
@@ -376,11 +375,9 @@ func beforeSuiteForAllProcesses() {
376375
allMemberClusterNames = append(allMemberClusterNames, allMemberClusters[i].ClusterName)
377376
}
378377

379-
// Check if drain cluster and uncordon cluster binaries exist.
380-
_, err := os.Stat(drainBinaryPath)
381-
Expect(os.IsNotExist(err)).To(BeFalse(), fmt.Sprintf("drain binary not found at %s", drainBinaryPath))
382-
_, err = os.Stat(uncordonBinaryPath)
383-
Expect(os.IsNotExist(err)).To(BeFalse(), fmt.Sprintf("uncordon binary not found at %s", uncordonBinaryPath))
378+
// Check if kubectl-fleet binary exists.
379+
_, err := os.Stat(fleetBinaryPath)
380+
Expect(os.IsNotExist(err)).To(BeFalse(), fmt.Sprintf("kubectl-fleet binary not found at %s", fleetBinaryPath))
384381
})
385382
}
386383

tools/fleet/README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# kubectl-fleet
2+
3+
A unified kubectl plugin for KubeFleet cluster management operations.
4+
5+
## Description
6+
7+
`kubectl-fleet` consolidates the functionality of the separate `kubectl-draincluster` and `kubectl-uncordoncluster` plugins into a single plugin with subcommands.
8+
9+
## Installation
10+
11+
Build the plugin:
12+
```bash
13+
go build -o ./hack/tools/bin/kubectl-fleet ./tools/fleet/
14+
```
15+
16+
Install the plugin:
17+
```bash
18+
sudo cp ./hack/tools/bin/kubectl-fleet /usr/local/bin/
19+
chmod +x /usr/local/bin/kubectl-fleet
20+
```
21+
22+
Verify installation:
23+
```bash
24+
kubectl plugin list | grep fleet
25+
```
26+
27+
## Usage
28+
29+
### Drain a cluster
30+
```bash
31+
kubectl fleet drain --hubClusterContext=hub-context --clusterName=member-cluster-1
32+
```
33+
34+
### Uncordon a cluster
35+
```bash
36+
kubectl fleet uncordon --hubClusterContext=hub-context --clusterName=member-cluster-1
37+
```
38+
39+
## Subcommands
40+
41+
### drain
42+
Drains a member cluster by:
43+
1. Adding a cordon taint to prevent new resource propagation
44+
2. Creating eviction objects for all existing resource placements
45+
3. Waiting for evictions to complete
46+
47+
### uncordon
48+
Uncordons a previously drained member cluster by removing the cordon taint, allowing resources to be propagated to the cluster again.
49+
50+
## Flags
51+
52+
- `--hubClusterContext`: kubectl context for the hub cluster (required)
53+
- `--clusterName`: name of the member cluster to operate on (required)
54+
55+
## Examples
56+
57+
```bash
58+
# Drain a cluster for maintenance
59+
kubectl fleet drain --hubClusterContext=production-hub --clusterName=worker-node-1
60+
61+
# After maintenance, uncordon the cluster
62+
kubectl fleet uncordon --hubClusterContext=production-hub --clusterName=worker-node-1
63+
```

tools/fleet/drain.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"log"
23+
24+
k8errors "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apimachinery/pkg/util/uuid"
28+
"k8s.io/apimachinery/pkg/util/validation"
29+
"k8s.io/apimachinery/pkg/util/wait"
30+
"k8s.io/client-go/util/retry"
31+
"sigs.k8s.io/controller-runtime/pkg/client"
32+
33+
clusterv1beta1 "github.com/kubefleet-dev/kubefleet/apis/cluster/v1beta1"
34+
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
35+
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
36+
evictionutils "github.com/kubefleet-dev/kubefleet/pkg/utils/eviction"
37+
toolsutils "github.com/kubefleet-dev/kubefleet/tools/utils"
38+
)
39+
40+
const (
41+
uuidLength = 8
42+
drainEvictionNameFormat = "drain-eviction-%s-%s-%s"
43+
resourceIdentifierKeyFormat = "%s/%s/%s/%s/%s"
44+
)
45+
46+
type drainHelper struct {
47+
hubClient client.Client
48+
clusterName string
49+
}
50+
51+
func (h *drainHelper) Drain(ctx context.Context) (bool, error) {
52+
if err := h.cordon(ctx); err != nil {
53+
return false, fmt.Errorf("failed to cordon member cluster %s: %w", h.clusterName, err)
54+
}
55+
log.Printf("Successfully cordoned member cluster %s by adding cordon taint", h.clusterName)
56+
57+
crpNameMap, err := h.fetchClusterResourcePlacementNamesToEvict(ctx)
58+
if err != nil {
59+
return false, err
60+
}
61+
62+
if len(crpNameMap) == 0 {
63+
log.Printf("There are currently no resources propagated to %s from fleet using ClusterResourcePlacement resources", h.clusterName)
64+
return true, nil
65+
}
66+
67+
isDrainSuccessful := true
68+
for crpName := range crpNameMap {
69+
evictionName, err := generateDrainEvictionName(crpName, h.clusterName)
70+
if err != nil {
71+
return false, err
72+
}
73+
74+
err = retry.OnError(retry.DefaultBackoff, func(err error) bool {
75+
return k8errors.IsAlreadyExists(err)
76+
}, func() error {
77+
eviction := placementv1beta1.ClusterResourcePlacementEviction{
78+
ObjectMeta: metav1.ObjectMeta{
79+
Name: evictionName,
80+
},
81+
Spec: placementv1beta1.PlacementEvictionSpec{
82+
PlacementName: crpName,
83+
ClusterName: h.clusterName,
84+
},
85+
}
86+
return h.hubClient.Create(ctx, &eviction)
87+
})
88+
89+
if err != nil {
90+
return false, fmt.Errorf("failed to create eviction %s for CRP %s targeting member cluster %s: %w", evictionName, crpName, h.clusterName, err)
91+
}
92+
93+
log.Printf("Created eviction %s for CRP %s targeting member cluster %s", evictionName, crpName, h.clusterName)
94+
95+
var eviction placementv1beta1.ClusterResourcePlacementEviction
96+
err = wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (bool, error) {
97+
if err := h.hubClient.Get(ctx, types.NamespacedName{Name: evictionName}, &eviction); err != nil {
98+
return false, fmt.Errorf("failed to get eviction %s for CRP %s targeting member cluster %s: %w", evictionName, crpName, h.clusterName, err)
99+
}
100+
return evictionutils.IsEvictionInTerminalState(&eviction), nil
101+
})
102+
103+
if err != nil {
104+
return false, fmt.Errorf("failed to wait for eviction %s for CRP %s targeting member cluster %s to reach terminal state: %w", evictionName, crpName, h.clusterName, err)
105+
}
106+
107+
validCondition := eviction.GetCondition(string(placementv1beta1.PlacementEvictionConditionTypeValid))
108+
if validCondition != nil && validCondition.Status == metav1.ConditionFalse {
109+
if validCondition.Reason == condition.EvictionInvalidMissingCRPMessage ||
110+
validCondition.Reason == condition.EvictionInvalidDeletingCRPMessage ||
111+
validCondition.Reason == condition.EvictionInvalidMissingCRBMessage {
112+
log.Printf("eviction %s is invalid with reason %s for CRP %s targeting member cluster %s, but drain will succeed", evictionName, validCondition.Reason, crpName, h.clusterName)
113+
continue
114+
}
115+
}
116+
executedCondition := eviction.GetCondition(string(placementv1beta1.PlacementEvictionConditionTypeExecuted))
117+
if executedCondition == nil || executedCondition.Status == metav1.ConditionFalse {
118+
isDrainSuccessful = false
119+
log.Printf("eviction %s was not executed successfully for CRP %s targeting member cluster %s", evictionName, crpName, h.clusterName)
120+
continue
121+
}
122+
log.Printf("eviction %s was executed successfully for CRP %s targeting member cluster %s", evictionName, crpName, h.clusterName)
123+
124+
clusterScopedResourceIdentifiers, err := h.collectClusterScopedResourcesSelectedByCRP(ctx, crpName)
125+
if err != nil {
126+
log.Printf("failed to collect cluster scoped resources selected by CRP %s: %v", crpName, err)
127+
continue
128+
}
129+
for _, resourceIdentifier := range clusterScopedResourceIdentifiers {
130+
log.Printf("evicted resource %s propagated by CRP %s targeting member cluster %s", generateResourceIdentifierKey(resourceIdentifier), crpName, h.clusterName)
131+
}
132+
}
133+
134+
return isDrainSuccessful, nil
135+
}
136+
137+
func (h *drainHelper) cordon(ctx context.Context) error {
138+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
139+
var mc clusterv1beta1.MemberCluster
140+
if err := h.hubClient.Get(ctx, types.NamespacedName{Name: h.clusterName}, &mc); err != nil {
141+
return err
142+
}
143+
144+
for i := range mc.Spec.Taints {
145+
if mc.Spec.Taints[i] == toolsutils.CordonTaint {
146+
return nil
147+
}
148+
}
149+
150+
mc.Spec.Taints = append(mc.Spec.Taints, toolsutils.CordonTaint)
151+
152+
return h.hubClient.Update(ctx, &mc)
153+
})
154+
}
155+
156+
func (h *drainHelper) fetchClusterResourcePlacementNamesToEvict(ctx context.Context) (map[string]bool, error) {
157+
var crbList placementv1beta1.ClusterResourceBindingList
158+
if err := h.hubClient.List(ctx, &crbList); err != nil {
159+
return map[string]bool{}, fmt.Errorf("failed to list cluster resource bindings: %w", err)
160+
}
161+
162+
crpNameMap := make(map[string]bool)
163+
for i := range crbList.Items {
164+
crb := crbList.Items[i]
165+
if crb.Spec.TargetCluster == h.clusterName && crb.DeletionTimestamp == nil {
166+
crpName, ok := crb.GetLabels()[placementv1beta1.PlacementTrackingLabel]
167+
if !ok {
168+
return map[string]bool{}, fmt.Errorf("failed to get CRP name from binding %s", crb.Name)
169+
}
170+
crpNameMap[crpName] = true
171+
}
172+
}
173+
174+
return crpNameMap, nil
175+
}
176+
177+
func (h *drainHelper) collectClusterScopedResourcesSelectedByCRP(ctx context.Context, crpName string) ([]placementv1beta1.ResourceIdentifier, error) {
178+
var crp placementv1beta1.ClusterResourcePlacement
179+
if err := h.hubClient.Get(ctx, types.NamespacedName{Name: crpName}, &crp); err != nil {
180+
return nil, fmt.Errorf("failed to get ClusterResourcePlacement %s: %w", crpName, err)
181+
}
182+
183+
var resourcesPropagated []placementv1beta1.ResourceIdentifier
184+
for _, selectedResource := range crp.Status.SelectedResources {
185+
if len(selectedResource.Namespace) == 0 {
186+
resourcesPropagated = append(resourcesPropagated, selectedResource)
187+
}
188+
}
189+
return resourcesPropagated, nil
190+
}
191+
192+
func generateDrainEvictionName(crpName, targetCluster string) (string, error) {
193+
evictionName := fmt.Sprintf(drainEvictionNameFormat, crpName, targetCluster, uuid.NewUUID()[:uuidLength])
194+
195+
if errs := validation.IsDNS1123Subdomain(evictionName); len(errs) != 0 {
196+
return "", fmt.Errorf("failed to format a qualified name for drain eviction object with CRP name %s, cluster name %s: %v", crpName, targetCluster, errs)
197+
}
198+
return evictionName, nil
199+
}
200+
201+
func generateResourceIdentifierKey(r placementv1beta1.ResourceIdentifier) string {
202+
if len(r.Group) == 0 && len(r.Namespace) == 0 {
203+
return fmt.Sprintf(resourceIdentifierKeyFormat, "''", r.Version, r.Kind, "''", r.Name)
204+
}
205+
if len(r.Group) == 0 {
206+
return fmt.Sprintf(resourceIdentifierKeyFormat, "''", r.Version, r.Kind, r.Namespace, r.Name)
207+
}
208+
if len(r.Namespace) == 0 {
209+
return fmt.Sprintf(resourceIdentifierKeyFormat, r.Group, r.Version, r.Kind, "''", r.Name)
210+
}
211+
return fmt.Sprintf(resourceIdentifierKeyFormat, r.Group, r.Version, r.Kind, r.Namespace, r.Name)
212+
}

0 commit comments

Comments
 (0)