Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ ifdef SUITES
SUITES_ARG = --suites $(SUITES)
COMPLETE_SUITES_ARG = -args $(SUITES_ARG)
endif
TEST_FEATURE_GATES ?= WorkspaceMounts=true,CacheAPIs=true,WorkspaceAuthentication=true
TEST_FEATURE_GATES ?= WorkspaceMounts=true,CacheAPIs=true,WorkspaceAuthentication=true,KcpNativeGarbageCollector=false
PROXY_FEATURE_GATES ?= $(TEST_FEATURE_GATES)

.PHONY: test-e2e
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/kcp_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ const (
// users into workspaces from foreign OIDC issuers. This feature can be individually enabled on each shard and
// the front-proxy.
WorkspaceAuthentication featuregate.Feature = "WorkspaceAuthentication"

// owner: @ntnn
// alpha: v0.1
// Enables the kcp-native garbage collector. When disabled kcp relies on a modified version of the upstream garbage collector.
KcpNativeGarbageCollector featuregate.Feature = "KcpNativeGarbageCollector"
)

// DefaultFeatureGate exposes the upstream feature gate, but with our gate setting applied.
Expand Down Expand Up @@ -141,6 +146,9 @@ var defaultVersionedGenericControlPlaneFeatureGates = map[featuregate.Feature]fe
WorkspaceAuthentication: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
KcpNativeGarbageCollector: {
{Version: version.MustParse("1.34"), Default: false, PreRelease: featuregate.Alpha},
},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:
genericfeatures.APIResponseCompression: {
Expand Down
114 changes: 114 additions & 0 deletions pkg/reconciler/garbagecollector/gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2025 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package garbagecollector

import (
"context"

"github.com/go-logr/logr"

"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"

kcpapiextensionsv1 "github.com/kcp-dev/client-go/apiextensions/informers/apiextensions/v1"
kcpkubernetesclient "github.com/kcp-dev/client-go/kubernetes"
kcpmetadataclient "github.com/kcp-dev/client-go/metadata"
corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1"

"github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/dynamicrestmapper"
)

const NativeControllerName = "kcp-native-garbage-collector"

type Options struct {
LogicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer
CRDInformer kcpapiextensionsv1.CustomResourceDefinitionClusterInformer
DynRESTMapper *dynamicrestmapper.DynamicRESTMapper
Logger logr.Logger
KubeClusterClient kcpkubernetesclient.ClusterInterface
MetadataClusterClient kcpmetadataclient.ClusterInterface
SharedInformerFactory *informer.DiscoveringDynamicSharedInformerFactory
InformersSynced chan struct{}

DeletionWorkers int
}

type GarbageCollector struct {
options Options

log logr.Logger

graph *Graph

handlerCancels map[schema.GroupVersionResource]func()

deletionQueue workqueue.TypedRateLimitingInterface[*deletionItem]
}

func NewGarbageCollector(options Options) *GarbageCollector {
gc := &GarbageCollector{}

gc.options = options
if gc.options.DeletionWorkers <= 0 {
gc.options.DeletionWorkers = 2
}

gc.log = logging.WithReconciler(options.Logger, NativeControllerName)

gc.graph = NewGraph()
gc.handlerCancels = make(map[schema.GroupVersionResource]func())
gc.deletionQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[*deletionItem](),
workqueue.TypedRateLimitingQueueConfig[*deletionItem]{
Name: ControllerName,
},
)

return gc
}

func (gc *GarbageCollector) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
defer gc.deletionQueue.ShutDown()

// Wait for informers to be started and synced.
//
// TODO(ntnn): Without waiting the GC will fail. Specifically
// builtin APIs will work and the CRD handlers will register new
// monitors for new resources _but_ the handlers for these resources
// then do not fire.
// That doesn't make a lot of sense to me because registering the
// handlers and the caches being started and synced should be
// independent.
// I suspect that somewhere something in the informer factory is
// swapped out without carrying the existing regisrations over,
// causing handlers registered before the swapping to not be
// notified once the informers are started.
<-gc.options.InformersSynced

// Register handlers for builtin APIs and CRDs.
deregister := gc.registerHandlers(ctx)
defer deregister()

// Run deletion workers.
gc.startDeletion(ctx)

<-ctx.Done()
}
229 changes: 229 additions & 0 deletions pkg/reconciler/garbagecollector/gc_deletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
Copyright 2025 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package garbagecollector

import (
"context"
"errors"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

type deletionItem struct {
reference ObjectReference
deletionPropagation metav1.DeletionPropagation
}

func (gc *GarbageCollector) startDeletion(ctx context.Context) {
// TODO(ntnn): Scale workers dynamically.
//
// One option would be scaling based off of the number of
// workspaces, e.g. logarithmic scaling based off of the number
// of workspaces on this shard or total number of workspaces.
// With lower and upper bound configurable.
//
// Could also scale off of queue length, though that could be more
// expensive than just having a larger number of deletion worker
// goroutines running.
for range gc.options.DeletionWorkers {
go wait.UntilWithContext(ctx, gc.runDeletionQueueWorker, time.Second)
}
}

func (gc *GarbageCollector) runDeletionQueueWorker(ctx context.Context) {
for gc.processDeletionQueue(ctx) {
}
}

func (gc *GarbageCollector) processDeletionQueue(ctx context.Context) bool {
di, shutdown := gc.deletionQueue.Get()
if shutdown {
return false
}
defer gc.deletionQueue.Done(di)

gc.log.Info("processing deletion queue item", "object", di.reference)
requeue, err := gc.processDeletionQueueItem(ctx, di)
if err != nil {
gc.log.Error(err, "error processing deletion queue item", "object", di.reference)
gc.deletionQueue.AddRateLimited(di)
return true
}
if requeue {
gc.deletionQueue.Add(di)
return true
}

// This is more of a failsafe. If the processDeletionQueueItem
// returned no error and no requeue the object should be gone from
// the API server and removable from the graph.
//
// If this is not the case something went wrong, to be safe we
// requeue the object for reprocessing.
success, owned := gc.graph.Remove(di.reference)
if !success || len(owned) > 0 {
gc.log.Error(nil, "owned objects still exist, requeuing", "object", di.reference, "graphRemovalSuccess", success, "ownedCount", len(owned))
gc.deletionQueue.AddRateLimited(di)
return true
}

gc.deletionQueue.Forget(di)
return true
}

func (gc *GarbageCollector) processDeletionQueueItem(ctx context.Context, di *deletionItem) (bool, error) {
gvr, err := gc.GVR(di.reference)
if err != nil {
return false, fmt.Errorf("error getting GVR for object %v: %w", di.reference, err)
}

client := gc.options.MetadataClusterClient.Cluster(di.reference.ClusterName.Path()).
Resource(gvr).
Namespace(di.reference.Namespace)

if string(di.deletionPropagation) == "" {
// Deletion propagation is empty. This is likely because the
// object was queued for deletion as an owned object without
// foreground deletion.
// If the object is still available in the api server we
// determine the propagation from there, otherwise default to
// background deletion.
di.deletionPropagation = metav1.DeletePropagationBackground
if latest, err := client.Get(ctx, di.reference.Name, metav1.GetOptions{}); err == nil {
di.deletionPropagation = deletionPropagationFromFinalizers(latest)
}
}

// Grab any owned objects from the graph
owned := gc.graph.Owned(di.reference)
if len(owned) > 0 {
switch di.deletionPropagation {
case metav1.DeletePropagationOrphan:
// Orphan owned resources and requeue until the graph lists no owned objects.
if err := gc.orphanOwned(ctx, di.reference, owned); err != nil {
return false, err
}
return true, nil
case metav1.DeletePropagationForeground:
// Owned resources should be foreground deleted. Enqueue them for
// deletion and requeue the original object. This will requeue this
// object until owned objects are gone.
for _, ownedRef := range owned {
gc.log.Info("queuing owned object for deletion", "ownedObject", ownedRef, "ownerObject", di.reference)
gc.deletionQueue.Add(&deletionItem{
reference: ownedRef,
// Propagate the foreground deletion down the chain.
deletionPropagation: metav1.DeletePropagationForeground,
})
}
// Requeue the original object to check later if owned objects are gone.
gc.log.Info("owned objects exist, requeuing deletion", "object", di.reference, "ownedCount", len(owned))
return true, nil
case metav1.DeletePropagationBackground:
// Owned resources should be background deleted. Enqueue them for
// deletion, but do not requeue the original object as it can be
// deleted right away.
for _, ownedRef := range owned {
gc.log.Info("queuing owned object for deletion", "ownedObject", ownedRef, "ownerObject", di.reference)
gc.deletionQueue.Add(&deletionItem{
reference: ownedRef,
// Do not set an explicit deletion propagation. This
// will be discovered per object when it is processed.
// deletionPropagation: ,
})
}
default:
return false, fmt.Errorf("unknown deletion propagation policy %q for object %v with owned objects", di.deletionPropagation, di.reference)
}
}

latest, err := client.Get(ctx, di.reference.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// Object is already gone, report success.
return false, nil
}
if err != nil {
return false, err
}

// No owned objects. Proceed to delete the object.
gc.log.Info("deleting object from API server", "object", di.reference)

// TODO(ntnn): scrub other gc finalizers
if hasFinalizer(latest, metav1.FinalizerOrphanDependents) {
gc.log.Info("removing orphan finalizer from object", "object", di.reference)
patch, err := patchRemoveFinalizer(latest, metav1.FinalizerOrphanDependents)
if err != nil {
return false, err
}

if _, err := client.Patch(ctx, di.reference.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
return false, err
}
}

preconditions := metav1.Preconditions{UID: &di.reference.UID}

if err := client.Delete(ctx, di.reference.Name, metav1.DeleteOptions{
Preconditions: &preconditions,
}); err != nil && !apierrors.IsNotFound(err) {
return false, err
}

// Deletion successful.
return false, nil
}

func (gc *GarbageCollector) orphanOwned(ctx context.Context, or ObjectReference, owned []ObjectReference) error {
// Remove owner reference from all owned objects.
var errs error
for _, ownedRef := range owned {
gvr, err := gc.GVR(ownedRef)
if err != nil {
errs = errors.Join(errs, err)
continue
}

client := gc.options.MetadataClusterClient.
Cluster(ownedRef.ClusterName.Path()).
Resource(gvr).
Namespace(ownedRef.Namespace)

latest, err := client.Get(ctx, ownedRef.Name, metav1.GetOptions{})
if err != nil {
errs = errors.Join(errs, err)
continue
}

patch, err := patchRemoveOwnerReference(latest, or.UID)
if err != nil {
errs = errors.Join(errs, err)
continue
}

if _, err := client.Patch(ctx, ownedRef.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}); err != nil {
errs = errors.Join(errs, err)
continue
}
}
return errs
}
Loading