Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ func Register(plugins *admission.Plugins) {
func NewKubeValidatingAdmissionPolicy() *KubeValidatingAdmissionPolicy {
return &KubeValidatingAdmissionPolicy{
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
delegates: make(map[logicalcluster.Name]*stoppableValidatingAdmissionPolicy),
delegates: make(map[delegateKey]*stoppableValidatingAdmissionPolicy),
}
}

type delegateKey struct {
policyCluster logicalcluster.Name
targetCluster logicalcluster.Name
}

type KubeValidatingAdmissionPolicy struct {
*admission.Handler

Expand All @@ -83,7 +88,7 @@ type KubeValidatingAdmissionPolicy struct {
getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error)

delegatesLock sync.RWMutex
delegates map[logicalcluster.Name]*stoppableValidatingAdmissionPolicy
delegates map[delegateKey]*stoppableValidatingAdmissionPolicy

logicalClusterDeletionMonitorStarter sync.Once
}
Expand Down Expand Up @@ -120,8 +125,9 @@ func (k *KubeValidatingAdmissionPolicy) SetKcpInformers(local, global kcpinforme
k.delegatesLock.Lock()
defer k.delegatesLock.Unlock()

// Remove all delegates that involve this cluster (either as policy or target)
for key, delegate := range k.delegates {
if key == clName {
if key.policyCluster == clName || key.targetCluster == clName {
delete(k.delegates, key)
delegate.stop()
}
Expand Down Expand Up @@ -168,7 +174,7 @@ func (k *KubeValidatingAdmissionPolicy) Validate(ctx context.Context, a admissio
return err
}

delegate, err := k.getOrCreateDelegate(sourceCluster)
delegate, err := k.getOrCreateDelegate(sourceCluster, cluster.Name)
if err != nil {
return err
}
Expand All @@ -193,10 +199,16 @@ func (k *KubeValidatingAdmissionPolicy) getSourceClusterForGroupResource(cluster
return clusterName, nil
}

// getOrCreateDelegate creates an actual plugin for clusterName.
func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(clusterName logicalcluster.Name) (*stoppableValidatingAdmissionPolicy, error) {
// getOrCreateDelegate creates an actual plugin for policyClusterName (where policies are defined).
// targetClusterName is the cluster where the object being validated resides.
func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(policyClusterName, targetClusterName logicalcluster.Name) (*stoppableValidatingAdmissionPolicy, error) {
key := delegateKey{
policyCluster: policyClusterName,
targetCluster: targetClusterName,
}

k.delegatesLock.RLock()
delegate := k.delegates[clusterName]
delegate := k.delegates[key]
k.delegatesLock.RUnlock()

if delegate != nil {
Expand All @@ -206,7 +218,7 @@ func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(clusterName logicalc
k.delegatesLock.Lock()
defer k.delegatesLock.Unlock()

delegate = k.delegates[clusterName]
delegate = k.delegates[key]
if delegate != nil {
return delegate, nil
}
Expand All @@ -229,29 +241,41 @@ func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(clusterName logicalc
stop: cancel,
}

plugin.SetNamespaceInformer(k.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))
plugin.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(clusterName.Path()))
// Use the target cluster's namespace informer, as that's where the objects being validated reside.
//
// The namespace informer points to where the TARGET OBJECT (being admitted) is located,
// NOT where the policy is defined.
//
// Flow:
// 1. Admission request arrives for an object in namespace "foo"
// 2. Matcher retrieves namespace "foo" using the informer: matcher.GetNamespace("foo")
// 3. Namespace is used for:
// - Label matching: Check if namespace labels match policy's namespaceSelector
// - CEL evaluation: Available as "namespaceObject" variable in policy expressions
// - Policy decisions: Evaluate rules based on target namespace metadata
plugin.SetNamespaceInformer(k.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(targetClusterName))
plugin.SetExternalKubeClientSet(k.kubeClusterClient.Cluster(policyClusterName.Path()))

// TODO(ncdc): this is super inefficient to do per workspace
discoveryClient := memory.NewMemCacheClient(k.kubeClusterClient.Cluster(clusterName.Path()).Discovery())
discoveryClient := memory.NewMemCacheClient(k.kubeClusterClient.Cluster(policyClusterName.Path()).Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
plugin.SetRESTMapper(restMapper)

plugin.SetDynamicClient(k.dynamicClusterClient.Cluster(clusterName.Path()))
plugin.SetDynamicClient(k.dynamicClusterClient.Cluster(policyClusterName.Path()))
plugin.SetDrainedNotification(ctx.Done())
plugin.SetAuthorizer(k.authorizer)
plugin.SetClusterName(clusterName)
plugin.SetSourceFactory(func(_ informers.SharedInformerFactory, client kubernetes.Interface, dynamicClient dynamic.Interface, restMapper meta.RESTMapper, clusterName logicalcluster.Name) generic.Source[validating.PolicyHook] {
plugin.SetClusterName(policyClusterName)
plugin.SetSourceFactory(func(_ informers.SharedInformerFactory, client kubernetes.Interface, dynamicClient dynamic.Interface, restMapper meta.RESTMapper, cn logicalcluster.Name) generic.Source[validating.PolicyHook] {
return generic.NewPolicySource(
k.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicies().Informer().Cluster(clusterName),
k.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicyBindings().Informer().Cluster(clusterName),
k.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicies().Informer().Cluster(cn),
k.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingAdmissionPolicyBindings().Informer().Cluster(cn),
validating.NewValidatingAdmissionPolicyAccessor,
validating.NewValidatingAdmissionPolicyBindingAccessor,
validating.CompilePolicy,
nil,
dynamicClient,
restMapper,
clusterName,
cn,
)
})

Expand All @@ -260,7 +284,7 @@ func (k *KubeValidatingAdmissionPolicy) getOrCreateDelegate(clusterName logicalc
return nil, err
}

k.delegates[clusterName] = delegate
k.delegates[key] = delegate

return delegate, nil
}
Expand All @@ -269,19 +293,22 @@ func (k *KubeValidatingAdmissionPolicy) logicalClusterDeleted(clusterName logica
k.delegatesLock.Lock()
defer k.delegatesLock.Unlock()

delegate := k.delegates[clusterName]

logger := klog.Background().WithValues("clusterName", clusterName)

if delegate == nil {
logger.V(3).Info("received event to stop validating admission policy for logical cluster, but it wasn't in the map")
return
// Remove all delegates that involve this cluster (either as policy or target)
found := false
for key, delegate := range k.delegates {
if key.policyCluster == clusterName || key.targetCluster == clusterName {
logger.V(2).Info("stopping validating admission policy for logical cluster")
delete(k.delegates, key)
delegate.stop()
found = true
}
}

logger.V(2).Info("stopping validating admission policy for logical cluster")

delete(k.delegates, clusterName)
delegate.stop()
if !found {
logger.V(3).Info("received event to stop validating admission policy for logical cluster, but it wasn't in the map")
}
}

type stoppableValidatingAdmissionPolicy struct {
Expand Down