Skip to content

Commit ea92e68

Browse files
authored
Merge pull request #8938 from killianmuldoon/pr-capim-upgrade-handling
✨ Enable Kubernetes upgrades in CAPIM
2 parents 383e1d0 + bd52d47 commit ea92e68

File tree

7 files changed

+278
-21
lines changed

7 files changed

+278
-21
lines changed

test/infrastructure/inmemory/internal/cloud/api/v1alpha1/etcdcluster_annotations.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ const (
3434
// By using this mechanism leadership can be forwarded to another pod with an atomic operation
3535
// (add/update of the annotation to the pod/etcd member we are forwarding leadership to).
3636
EtcdLeaderFromAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from"
37+
38+
// EtcdMemberRemoved is added to etcd pods which have been removed from the etcd cluster.
39+
EtcdMemberRemoved = "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-removed"
3740
)

test/infrastructure/inmemory/internal/cloud/runtime/cache/client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121
"time"
2222

2323
jsonpatch "github.com/evanphx/json-patch/v5"
24+
corev1 "k8s.io/api/core/v1"
2425
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/api/meta"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29+
"k8s.io/apimachinery/pkg/fields"
2830
"k8s.io/apimachinery/pkg/labels"
2931
"k8s.io/apimachinery/pkg/runtime"
3032
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -117,6 +119,15 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien
117119
}
118120
}
119121

122+
// TODO(killianmuldoon): This only matches the nodeName field for pods. No other fieldSelectors are implemented. This should return an error if another fieldselector is used.
123+
if pod, ok := obj.(*corev1.Pod); ok {
124+
if listOpts.FieldSelector != nil && !listOpts.FieldSelector.Empty() {
125+
if !listOpts.FieldSelector.Matches(fields.Set{"spec.nodeName": pod.Spec.NodeName}) {
126+
continue
127+
}
128+
}
129+
}
130+
120131
obj := obj.DeepCopyObject().(client.Object)
121132
switch list.(type) {
122133
case *unstructured.UnstructuredList:
@@ -272,11 +283,6 @@ func updateTrackerOwnerReferences(tracker *resourceGroupTracker, oldObj, newObj
272283
}
273284

274285
func (c *cache) Patch(resourceGroup string, obj client.Object, patch client.Patch) error {
275-
obj = obj.DeepCopyObject().(client.Object)
276-
if err := c.Get(resourceGroup, client.ObjectKeyFromObject(obj), obj); err != nil {
277-
return err
278-
}
279-
280286
patchData, err := patch.Data(obj)
281287
if err != nil {
282288
return apierrors.NewInternalError(err)

test/infrastructure/inmemory/internal/controllers/inmemorymachine_controller.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
428428
"tier": "control-plane",
429429
},
430430
},
431+
Spec: corev1.PodSpec{
432+
NodeName: inMemoryMachine.Name,
433+
},
431434
Status: corev1.PodStatus{
432435
Phase: corev1.PodRunning,
433436
Conditions: []corev1.PodCondition{
@@ -444,7 +447,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
444447
}
445448

446449
// Gets info about the current etcd cluster, if any.
447-
info, err := r.inspectEtcd(ctx, cloudClient)
450+
info, err := r.getEtcdInfo(ctx, cloudClient)
448451
if err != nil {
449452
return ctrl.Result{}, err
450453
}
@@ -528,7 +531,7 @@ type etcdInfo struct {
528531
members sets.Set[string]
529532
}
530533

531-
func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) {
534+
func (r *InMemoryMachineReconciler) getEtcdInfo(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) {
532535
etcdPods := &corev1.PodList{}
533536
if err := cloudClient.List(ctx, etcdPods,
534537
client.InNamespace(metav1.NamespaceSystem),
@@ -548,6 +551,9 @@ func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient
548551
}
549552
var leaderFrom time.Time
550553
for _, pod := range etcdPods.Items {
554+
if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok {
555+
continue
556+
}
551557
if info.clusterID == "" {
552558
info.clusterID = pod.Annotations[cloudv1.EtcdClusterIDAnnotationName]
553559
} else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != info.clusterID {
@@ -627,6 +633,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
627633
"tier": "control-plane",
628634
},
629635
},
636+
Spec: corev1.PodSpec{
637+
NodeName: inMemoryMachine.Name,
638+
},
630639
Status: corev1.PodStatus{
631640
Phase: corev1.PodRunning,
632641
Conditions: []corev1.PodCondition{
@@ -713,6 +722,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context
713722
"tier": "control-plane",
714723
},
715724
},
725+
Spec: corev1.PodSpec{
726+
NodeName: inMemoryMachine.Name,
727+
},
716728
Status: corev1.PodStatus{
717729
Phase: corev1.PodRunning,
718730
Conditions: []corev1.PodCondition{
@@ -758,6 +770,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context
758770
"tier": "control-plane",
759771
},
760772
},
773+
Spec: corev1.PodSpec{
774+
NodeName: inMemoryMachine.Name,
775+
},
761776
Status: corev1.PodStatus{
762777
Phase: corev1.PodRunning,
763778
Conditions: []corev1.PodCondition{
@@ -832,9 +847,12 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co
832847
Name: "kubeadm-config",
833848
Namespace: metav1.NamespaceSystem,
834849
},
850+
Data: map[string]string{
851+
"ClusterConfiguration": "",
852+
},
835853
}
836854
if err := cloudClient.Create(ctx, cm); err != nil && !apierrors.IsAlreadyExists(err) {
837-
return ctrl.Result{}, errors.Wrapf(err, "failed to create ubeadm-config ConfigMap")
855+
return ctrl.Result{}, errors.Wrapf(err, "failed to create kubeadm-config ConfigMap")
838856
}
839857

840858
return ctrl.Result{}, nil
@@ -1013,6 +1031,8 @@ func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, clu
10131031
Name: inMemoryMachine.Name,
10141032
},
10151033
}
1034+
1035+
// TODO(killianmuldoon): check if we can drop this given that the MachineController is already draining pods and deleting nodes.
10161036
if err := cloudClient.Delete(ctx, node); err != nil && !apierrors.IsNotFound(err) {
10171037
return ctrl.Result{}, errors.Wrapf(err, "failed to delete Node")
10181038
}

test/infrastructure/inmemory/internal/server/api/handler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
apierrors "k8s.io/apimachinery/pkg/api/errors"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
33+
"k8s.io/apimachinery/pkg/fields"
3334
"k8s.io/apimachinery/pkg/runtime"
3435
"k8s.io/apimachinery/pkg/runtime/schema"
3536
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -252,6 +253,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
252253
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
253254
}
254255

256+
// TODO: The only field selector which works is for `spec.nodeName` on pods.
257+
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
258+
if err != nil {
259+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
260+
return
261+
}
262+
if selector != nil {
263+
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
264+
}
265+
255266
if err := cloudClient.List(ctx, list, listOpts...); err != nil {
256267
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
257268
return
@@ -412,6 +423,10 @@ func (h *apiServerHandler) apiV1Patch(req *restful.Request, resp *restful.Respon
412423
obj.SetName(req.PathParameter("name"))
413424
obj.SetNamespace(req.PathParameter("namespace"))
414425

426+
if err := cloudClient.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
427+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
428+
return
429+
}
415430
if err := cloudClient.Patch(ctx, obj, patch); err != nil {
416431
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
417432
return

test/infrastructure/inmemory/internal/server/etcd/handler.go

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,48 @@ func (m *maintenanceServer) Snapshot(_ *pb.SnapshotRequest, _ pb.Maintenance_Sna
113113
return fmt.Errorf("not implemented: Snapshot")
114114
}
115115

116-
func (m *maintenanceServer) MoveLeader(_ context.Context, _ *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
117-
return nil, fmt.Errorf("not implemented: MoveLeader")
116+
func (m *maintenanceServer) MoveLeader(ctx context.Context, req *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
117+
out := new(pb.MoveLeaderResponse)
118+
resourceGroup, _, err := m.getResourceGroupAndMember(ctx)
119+
if err != nil {
120+
return nil, err
121+
}
122+
etcdPods := &corev1.PodList{}
123+
cloudClient := m.manager.GetResourceGroup(resourceGroup).GetClient()
124+
if err := cloudClient.List(ctx, etcdPods,
125+
client.InNamespace(metav1.NamespaceSystem),
126+
client.MatchingLabels{
127+
"component": "etcd",
128+
"tier": "control-plane"},
129+
); err != nil {
130+
return nil, errors.Wrap(err, "failed to list etcd members")
131+
}
132+
133+
if len(etcdPods.Items) == 0 {
134+
return nil, errors.New("failed to list etcd members: no etcd pods found")
135+
}
136+
137+
for i := range etcdPods.Items {
138+
pod := &etcdPods.Items[i]
139+
for k, v := range pod.GetAnnotations() {
140+
if k == cloudv1.EtcdMemberIDAnnotationName {
141+
target := strconv.FormatInt(int64(req.TargetID), 10)
142+
if v == target {
143+
updatedPod := pod.DeepCopy()
144+
annotations := updatedPod.GetAnnotations()
145+
annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Now().Format(time.RFC3339)
146+
updatedPod.SetAnnotations(annotations)
147+
err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(pod))
148+
if err != nil {
149+
return nil, err
150+
}
151+
return out, nil
152+
}
153+
}
154+
}
155+
}
156+
// If we reach this point leadership was not moved.
157+
return nil, errors.Errorf("etcd member with ID %d did not become the leader: expected etcd Pod not found", req.TargetID)
118158
}
119159

120160
func (m *maintenanceServer) Downgrade(_ context.Context, _ *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
@@ -130,8 +170,39 @@ func (c *clusterServerServer) MemberAdd(_ context.Context, _ *pb.MemberAddReques
130170
return nil, fmt.Errorf("not implemented: MemberAdd")
131171
}
132172

133-
func (c *clusterServerServer) MemberRemove(_ context.Context, _ *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
134-
return nil, fmt.Errorf("not implemented: MemberRemove")
173+
func (c *clusterServerServer) MemberRemove(ctx context.Context, req *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
174+
out := new(pb.MemberRemoveResponse)
175+
resourceGroup, _, err := c.getResourceGroupAndMember(ctx)
176+
if err != nil {
177+
return nil, err
178+
}
179+
cloudClient := c.manager.GetResourceGroup(resourceGroup).GetClient()
180+
181+
etcdPods := &corev1.PodList{}
182+
183+
if err := cloudClient.List(ctx, etcdPods,
184+
client.InNamespace(metav1.NamespaceSystem),
185+
client.MatchingLabels{
186+
"component": "etcd",
187+
"tier": "control-plane"},
188+
); err != nil {
189+
return nil, errors.Wrap(err, "failed to list etcd members")
190+
}
191+
192+
for i := range etcdPods.Items {
193+
pod := etcdPods.Items[i]
194+
memberID := pod.Annotations[cloudv1.EtcdMemberIDAnnotationName]
195+
if memberID != fmt.Sprintf("%d", req.ID) {
196+
continue
197+
}
198+
updatedPod := pod.DeepCopy()
199+
updatedPod.Annotations[cloudv1.EtcdMemberRemoved] = ""
200+
if err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(&pod)); err != nil {
201+
return nil, err
202+
}
203+
return out, nil
204+
}
205+
return nil, errors.Errorf("no etcd member with id %d found", req.ID)
135206
}
136207

137208
func (c *clusterServerServer) MemberUpdate(_ context.Context, _ *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
@@ -197,6 +268,12 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client
197268
var leaderID int
198269
var leaderFrom time.Time
199270
for _, pod := range etcdPods.Items {
271+
if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok {
272+
if pod.Name == fmt.Sprintf("%s%s", "etcd-", etcdMember) {
273+
return nil, nil, errors.New("inspect called on etcd which has been removed")
274+
}
275+
continue
276+
}
200277
if clusterID == 0 {
201278
var err error
202279
clusterID, err = strconv.Atoi(pod.Annotations[cloudv1.EtcdClusterIDAnnotationName])
@@ -224,7 +301,6 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client
224301
ClusterId: uint64(clusterID),
225302
MemberId: uint64(memberID),
226303
}
227-
228304
statusResponse.Header = memberList.Header
229305
}
230306
memberList.Members = append(memberList.Members, &pb.Member{

0 commit comments

Comments
 (0)