Skip to content

Commit 639e584

Browse files
committed
Add kcp native gc
Signed-off-by: Nelo-T. Wallus <[email protected]> Signed-off-by: Nelo-T. Wallus <[email protected]>
1 parent 1969089 commit 639e584

File tree

6 files changed

+723
-4
lines changed

6 files changed

+723
-4
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package garbagecollector
18+
19+
import (
20+
"context"
21+
22+
"github.com/go-logr/logr"
23+
24+
"k8s.io/apimachinery/pkg/runtime/schema"
25+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26+
"k8s.io/client-go/util/workqueue"
27+
28+
kcpapiextensionsv1 "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
29+
kcpkubernetesclient "github.com/kcp-dev/client-go/kubernetes"
30+
kcpmetadataclient "github.com/kcp-dev/client-go/metadata"
31+
corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1"
32+
33+
"github.com/kcp-dev/kcp/pkg/informer"
34+
"github.com/kcp-dev/kcp/pkg/logging"
35+
"github.com/kcp-dev/kcp/pkg/reconciler/dynamicrestmapper"
36+
)
37+
38+
const NativeControllerName = "kcp-native-garbage-collector"
39+
40+
type Options struct {
41+
LogicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer
42+
CRDInformer kcpapiextensionsv1.CustomResourceDefinitionClusterInformer
43+
DynRESTMapper *dynamicrestmapper.DynamicRESTMapper
44+
Logger logr.Logger
45+
KubeClusterClient kcpkubernetesclient.ClusterInterface
46+
MetadataClusterClient kcpmetadataclient.ClusterInterface
47+
SharedInformerFactory *informer.DiscoveringDynamicSharedInformerFactory
48+
InformersSynced chan struct{}
49+
50+
DeletionWorkers int
51+
}
52+
53+
type GarbageCollector struct {
54+
options Options
55+
56+
log logr.Logger
57+
58+
graph *Graph
59+
60+
handlerCancels map[schema.GroupVersionResource]func()
61+
62+
deletionQueue workqueue.TypedRateLimitingInterface[*deletionItem]
63+
}
64+
65+
func NewGarbageCollector(options Options) *GarbageCollector {
66+
gc := &GarbageCollector{}
67+
68+
gc.options = options
69+
if gc.options.DeletionWorkers <= 0 {
70+
gc.options.DeletionWorkers = 2
71+
}
72+
73+
gc.log = logging.WithReconciler(options.Logger, NativeControllerName)
74+
75+
gc.graph = NewGraph()
76+
gc.handlerCancels = make(map[schema.GroupVersionResource]func())
77+
gc.deletionQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
78+
workqueue.DefaultTypedControllerRateLimiter[*deletionItem](),
79+
workqueue.TypedRateLimitingQueueConfig[*deletionItem]{
80+
Name: ControllerName,
81+
},
82+
)
83+
84+
return gc
85+
}
86+
87+
func (gc *GarbageCollector) Start(ctx context.Context) {
88+
defer utilruntime.HandleCrash()
89+
defer gc.deletionQueue.ShutDown()
90+
91+
// Wait for informers to be started and synced.
92+
//
93+
// TODO(ntnn): Without waiting the GC will fail. Specifically
94+
// builtin APIs will work and the CRD handlers will register new
95+
// monitors for new resources _but_ the handlers for these resources
96+
// then do not fire.
97+
// That doesn't make a lot of sense to me because registering the
98+
// handlers and the caches being started and synced should be
99+
// independent.
100+
// I suspect that somewhere something in the informer factory is
101+
// swapped out without carrying the existing regisrations over,
102+
// causing handlers registered before the swapping to not be
103+
// notified once the informers are started.
104+
<-gc.options.InformersSynced
105+
106+
// Register handlers for builtin APIs and CRDs.
107+
deregister := gc.registerHandlers(ctx)
108+
defer deregister()
109+
110+
// Run deletion workers.
111+
gc.startDeletion(ctx)
112+
113+
<-ctx.Done()
114+
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package garbagecollector
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"time"
24+
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
)
30+
31+
type deletionItem struct {
32+
reference ObjectReference
33+
deletionPropagation metav1.DeletionPropagation
34+
}
35+
36+
func (gc *GarbageCollector) startDeletion(ctx context.Context) {
37+
// TODO(ntnn): Scale workers dynamically.
38+
//
39+
// One option would be scaling based off of the number of
40+
// workspaces, e.g. logarithmic scaling based off of the number
41+
// of workspaces on this shard or total number of workspaces.
42+
// With lower and upper bound configurable.
43+
//
44+
// Could also scale off of queue length, though that could be more
45+
// expensive than just having a larger number of deletion worker
46+
// goroutines running.
47+
for range gc.options.DeletionWorkers {
48+
go wait.UntilWithContext(ctx, gc.runDeletionQueueWorker, time.Second)
49+
}
50+
}
51+
52+
func (gc *GarbageCollector) runDeletionQueueWorker(ctx context.Context) {
53+
for gc.processDeletionQueue(ctx) {
54+
}
55+
}
56+
57+
func (gc *GarbageCollector) processDeletionQueue(ctx context.Context) bool {
58+
di, shutdown := gc.deletionQueue.Get()
59+
if shutdown {
60+
return false
61+
}
62+
defer gc.deletionQueue.Done(di)
63+
64+
gc.log.Info("processing deletion queue item", "object", di.reference)
65+
requeue, err := gc.processDeletionQueueItem(ctx, di)
66+
if err != nil {
67+
gc.log.Error(err, "error processing deletion queue item", "object", di.reference)
68+
gc.deletionQueue.AddRateLimited(di)
69+
return true
70+
}
71+
if requeue {
72+
gc.deletionQueue.Add(di)
73+
return true
74+
}
75+
76+
// This is more of a failsafe. If the processDeletionQueueItem
77+
// returned no error and no requeue the object should be gone from
78+
// the API server and removable from the graph.
79+
//
80+
// If this is not the case something went wrong, to be safe we
81+
// requeue the object for reprocessing.
82+
success, owned := gc.graph.Remove(di.reference)
83+
if !success || len(owned) > 0 {
84+
gc.log.Error(nil, "owned objects still exist, requeuing", "object", di.reference, "graphRemovalSuccess", success, "ownedCount", len(owned))
85+
gc.deletionQueue.AddRateLimited(di)
86+
return true
87+
}
88+
89+
gc.deletionQueue.Forget(di)
90+
return true
91+
}
92+
93+
func (gc *GarbageCollector) processDeletionQueueItem(ctx context.Context, di *deletionItem) (bool, error) {
94+
gvr, err := gc.GVR(di.reference)
95+
if err != nil {
96+
return false, fmt.Errorf("error getting GVR for object %v: %w", di.reference, err)
97+
}
98+
99+
client := gc.options.MetadataClusterClient.Cluster(di.reference.ClusterName.Path()).
100+
Resource(gvr).
101+
Namespace(di.reference.Namespace)
102+
103+
if string(di.deletionPropagation) == "" {
104+
// Deletion propagation is empty. This is likely because the
105+
// object was queued for deletion as an owned object without
106+
// foreground deletion.
107+
// If the object is still available in the api server we
108+
// determine the propagation from there, otherwise default to
109+
// background deletion.
110+
di.deletionPropagation = metav1.DeletePropagationBackground
111+
if latest, err := client.Get(ctx, di.reference.Name, metav1.GetOptions{}); err == nil {
112+
di.deletionPropagation = deletionPropagationFromFinalizers(latest)
113+
}
114+
}
115+
116+
// Grab any owned objects from the graph
117+
owned := gc.graph.Owned(di.reference)
118+
if len(owned) > 0 {
119+
switch di.deletionPropagation {
120+
case metav1.DeletePropagationOrphan:
121+
// Orphan owned resources and requeue until the graph lists no owned objects.
122+
if err := gc.orphanOwned(ctx, di.reference, owned); err != nil {
123+
return false, err
124+
}
125+
return true, nil
126+
case metav1.DeletePropagationForeground:
127+
// Owned resources should be foreground deleted. Enqueue them for
128+
// deletion and requeue the original object. This will requeue this
129+
// object until owned objects are gone.
130+
for _, ownedRef := range owned {
131+
gc.log.Info("queuing owned object for deletion", "ownedObject", ownedRef, "ownerObject", di.reference)
132+
gc.deletionQueue.Add(&deletionItem{
133+
reference: ownedRef,
134+
// Propagate the foreground deletion down the chain.
135+
deletionPropagation: metav1.DeletePropagationForeground,
136+
})
137+
}
138+
// Requeue the original object to check later if owned objects are gone.
139+
gc.log.Info("owned objects exist, requeuing deletion", "object", di.reference, "ownedCount", len(owned))
140+
return true, nil
141+
case metav1.DeletePropagationBackground:
142+
// Owned resources should be background deleted. Enqueue them for
143+
// deletion, but do not requeue the original object as it can be
144+
// deleted right away.
145+
for _, ownedRef := range owned {
146+
gc.log.Info("queuing owned object for deletion", "ownedObject", ownedRef, "ownerObject", di.reference)
147+
gc.deletionQueue.Add(&deletionItem{
148+
reference: ownedRef,
149+
// Do not set an explicit deletion propagation. This
150+
// will be discovered per object when it is processed.
151+
// deletionPropagation: ,
152+
})
153+
}
154+
default:
155+
return false, fmt.Errorf("unknown deletion propagation policy %q for object %v with owned objects", di.deletionPropagation, di.reference)
156+
}
157+
}
158+
159+
latest, err := client.Get(ctx, di.reference.Name, metav1.GetOptions{})
160+
if apierrors.IsNotFound(err) {
161+
// Object is already gone, report success.
162+
return false, nil
163+
}
164+
if err != nil {
165+
return false, err
166+
}
167+
168+
// No owned objects. Proceed to delete the object.
169+
gc.log.Info("deleting object from API server", "object", di.reference)
170+
171+
// TODO(ntnn): scrub other gc finalizers
172+
if hasFinalizer(latest, metav1.FinalizerOrphanDependents) {
173+
gc.log.Info("removing orphan finalizer from object", "object", di.reference)
174+
patch, err := patchRemoveFinalizer(latest, metav1.FinalizerOrphanDependents)
175+
if err != nil {
176+
return false, err
177+
}
178+
179+
if _, err := client.Patch(ctx, di.reference.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
180+
return false, err
181+
}
182+
}
183+
184+
preconditions := metav1.Preconditions{UID: &di.reference.UID}
185+
186+
if err := client.Delete(ctx, di.reference.Name, metav1.DeleteOptions{
187+
Preconditions: &preconditions,
188+
}); err != nil && !apierrors.IsNotFound(err) {
189+
return false, err
190+
}
191+
192+
// Deletion successful.
193+
return false, nil
194+
}
195+
196+
func (gc *GarbageCollector) orphanOwned(ctx context.Context, or ObjectReference, owned []ObjectReference) error {
197+
// Remove owner reference from all owned objects.
198+
var errs error
199+
for _, ownedRef := range owned {
200+
gvr, err := gc.GVR(ownedRef)
201+
if err != nil {
202+
errs = errors.Join(errs, err)
203+
continue
204+
}
205+
206+
client := gc.options.MetadataClusterClient.
207+
Cluster(ownedRef.ClusterName.Path()).
208+
Resource(gvr).
209+
Namespace(ownedRef.Namespace)
210+
211+
latest, err := client.Get(ctx, ownedRef.Name, metav1.GetOptions{})
212+
if err != nil {
213+
errs = errors.Join(errs, err)
214+
continue
215+
}
216+
217+
patch, err := patchRemoveOwnerReference(latest, or.UID)
218+
if err != nil {
219+
errs = errors.Join(errs, err)
220+
continue
221+
}
222+
223+
if _, err := client.Patch(ctx, ownedRef.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
224+
errs = errors.Join(errs, err)
225+
continue
226+
}
227+
}
228+
return errs
229+
}

0 commit comments

Comments
 (0)