Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ spec:
description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
x-kubernetes-preserve-unknown-fields: true

disableCache:
description: DisableCache, when set to true, configures the adapter to disable client-side caching of Kubernetes resources. This prevents the adapter from using resourceVersion-based caching which can cause throttling when tracking many resources across namespaces.
type: boolean
status:
type: object
properties:
Expand Down
34 changes: 34 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -7410,6 +7410,23 @@ evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.</p>
</td>
</tr>
<tr>
<td>
<code>disableCache</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>DisableCache, when set to true, configures the adapter to use cluster-scoped watches
instead of per-namespace watches. This reduces the number of API connections from N
(namespaces) to 1 per resource type, preventing client-side throttling when tracking
resources across many namespaces. Events from non-selected namespaces are filtered
client-side in the adapter.
Defaults to false (per-namespace watches for better isolation).</p>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -8036,6 +8053,23 @@ evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.</p>
</td>
</tr>
<tr>
<td>
<code>disableCache</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>DisableCache, when set to true, configures the adapter to use cluster-scoped watches
instead of per-namespace watches. This reduces the number of API connections from N
(namespaces) to 1 per resource type, preventing client-side throttling when tracking
resources across many namespaces. Events from non-selected namespaces are filtered
client-side in the adapter.
Defaults to false (per-namespace watches for better isolation).</p>
</td>
</tr>
</tbody>
</table>
<h3 id="sources.knative.dev/v1.ApiServerSourceStatus">ApiServerSourceStatus
Expand Down
15 changes: 15 additions & 0 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,28 @@ func (a *apiServerAdapter) startFailFast(ctx context.Context, stopCh <-chan stru
}

func (a *apiServerAdapter) setupDelegate() cache.Store {
// Build namespace filter set for DisableCache mode
var allowedNs map[string]struct{}
filterByNs := false
if a.config.DisableCache && len(a.config.OriginalNamespaces) > 0 && a.config.AllNamespaces {
// DisableCache forced cluster-scoped watches, so we need client-side filtering
filterByNs = true
allowedNs = make(map[string]struct{}, len(a.config.OriginalNamespaces))
for _, ns := range a.config.OriginalNamespaces {
allowedNs[ns] = struct{}{}
}
a.logger.Infof("DisableCache enabled: using cluster-scoped watches with client-side filtering for %d namespaces", len(a.config.OriginalNamespaces))
}

var delegate cache.Store = &resourceDelegate{
ce: a.ce,
source: a.source,
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
allowedNamespaces: allowedNs,
filterByNamespace: filterByNs,
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
Expand Down
9 changes: 9 additions & 0 deletions pkg/adapter/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,13 @@ type Config struct {
// (via the features.knative.dev/apiserversource-skip-permissions-check annotation), and the ApiServerSource
// adapter should not keep trying to establish watches on resources that it perhaps does not have permissions for.
FailFast bool `json:"failFast,omitempty"`

// DisableCache when true, forces cluster-scoped watches to reduce API connections
// and prevent client-side throttling in high-namespace scenarios.
DisableCache bool `json:"disableCache,omitempty"`

// OriginalNamespaces holds the user's intended namespaces for filtering
// when DisableCache forces AllNamespaces mode. This allows cluster-scoped
// watches while still filtering events to only the desired namespaces.
OriginalNamespaces []string `json:"originalNamespaces,omitempty"`
}
19 changes: 19 additions & 0 deletions pkg/adapter/apiserver/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
"knative.dev/eventing/pkg/eventfilter"
Expand All @@ -35,6 +36,12 @@ type resourceDelegate struct {
filter eventfilter.Filter

logger *zap.SugaredLogger

// Namespace filtering for DisableCache mode
// When DisableCache forces cluster-scoped watches, we filter events
// to only those from the originally selected namespaces
allowedNamespaces map[string]struct{}
filterByNamespace bool
}

var _ cache.Store = (*resourceDelegate)(nil)
Expand All @@ -57,6 +64,18 @@ func (a *resourceDelegate) Delete(obj interface{}) error {
type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)

func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
// Quick namespace filter FIRST (before expensive CloudEvent creation)
// This is O(1) map lookup when DisableCache forces cluster-scoped watches
if a.filterByNamespace {
acc, err := meta.Accessor(obj)
if err != nil {
return err
}
if _, ok := a.allowedNamespaces[acc.GetNamespace()]; !ok {
return nil // Drop event - not in allowed namespaces
}
}

ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)

if err != nil {
Expand Down
140 changes: 140 additions & 0 deletions pkg/adapter/apiserver/namespace_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package apiserver

import (
"testing"

"go.uber.org/zap"

adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

// TestNamespaceFiltering tests that events from non-allowed namespaces are dropped
func TestNamespaceFiltering(t *testing.T) {
ce := adaptertest.NewTestClient()

// Create delegate with namespace filter for "allowed-ns"
allowedNs := map[string]struct{}{
"allowed-ns": {},
}

logger := zap.NewExample().Sugar()
delegate := &resourceDelegate{
ce: ce,
source: "unit-test",
ref: false,
apiServerSourceName: "test-source",
logger: logger,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
allowedNamespaces: allowedNs,
filterByNamespace: true,
}

// Test 1: Event from allowed namespace SHOULD be sent
allowedPod := simplePod("allowed-pod", "allowed-ns")
err := delegate.Add(allowedPod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := len(ce.Sent()); got != 1 {
t.Errorf("Expected 1 event to be sent for allowed namespace, got: %d", got)
}

// Reset before next test
ce.Reset()

// Test 2: Event from non-allowed namespace should NOT be sent
deniedPod := simplePod("denied-pod", "denied-ns")
err = delegate.Add(deniedPod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := len(ce.Sent()); got != 0 {
t.Errorf("Expected 0 events to be sent for denied namespace, got: %d", got)
}
}

// TestNamespaceFilteringDisabled tests that when filtering is disabled, all events are sent
func TestNamespaceFilteringDisabled(t *testing.T) {
ce := adaptertest.NewTestClient()

logger := zap.NewExample().Sugar()
// Create delegate WITHOUT namespace filtering
delegate := &resourceDelegate{
ce: ce,
source: "unit-test",
ref: false,
apiServerSourceName: "test-source",
logger: logger,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
filterByNamespace: false, // Filtering disabled
}

// Both namespaces should send events
pod1 := simplePod("pod-1", "ns-1")
err := delegate.Add(pod1)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := len(ce.Sent()); got != 1 {
t.Errorf("Expected 1 event after first pod, got: %d", got)
}

pod2 := simplePod("pod-2", "ns-2")
err = delegate.Add(pod2)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := len(ce.Sent()); got != 2 {
t.Errorf("Expected 2 events after second pod, got: %d", got)
}
}

// TestNamespaceFilteringMultipleAllowed tests multiple allowed namespaces
func TestNamespaceFilteringMultipleAllowed(t *testing.T) {
ce := adaptertest.NewTestClient()

// Create delegate with multiple allowed namespaces
allowedNs := map[string]struct{}{
"prod-1": {},
"prod-2": {},
"prod-3": {},
}

logger := zap.NewExample().Sugar()
delegate := &resourceDelegate{
ce: ce,
source: "unit-test",
ref: false,
apiServerSourceName: "test-source",
logger: logger,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
allowedNamespaces: allowedNs,
filterByNamespace: true,
}

// Test allowed namespaces - each should add one event
expectedCount := 0
for _, ns := range []string{"prod-1", "prod-2", "prod-3"} {
pod := simplePod("test-pod", ns)
err := delegate.Add(pod)
if err != nil {
t.Fatalf("unexpected error for namespace %s: %v", ns, err)
}
expectedCount++
if got := len(ce.Sent()); got != expectedCount {
t.Errorf("Expected %d events after namespace %s, got: %d", expectedCount, ns, got)
}
}

// Test denied namespace - count should stay the same
deniedPod := simplePod("test-pod", "dev-1")
err := delegate.Add(deniedPod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got := len(ce.Sent()); got != expectedCount {
t.Errorf("Expected %d events (denied namespace shouldn't add), got: %d", expectedCount, got)
}
}
9 changes: 9 additions & 0 deletions pkg/apis/sources/v1/apiserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type ApiServerSourceSpec struct {
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`

// DisableCache, when set to true, configures the adapter to use cluster-scoped watches
// instead of per-namespace watches. This reduces the number of API connections from N
// (namespaces) to 1 per resource type, preventing client-side throttling when tracking
// resources across many namespaces. Events from non-selected namespaces are filtered
// client-side in the adapter.
// Defaults to false (per-namespace watches for better isolation).
// +optional
DisableCache *bool `json:"disableCache,omitempty"`
}

// ApiServerSourceStatus defines the observed state of ApiServerSource
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/sources/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer

featureFlags := feature.FromContext(ctx)

disableCache := false
if src.Spec.DisableCache != nil {
disableCache = *src.Spec.DisableCache
}

adapterArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Expand All @@ -251,6 +256,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
AllNamespaces: allNamespaces,
NodeSelector: featureFlags.NodeSelector(),
FailFast: skipPermissions == "true",
DisableCache: disableCache,
}

expected, err := resources.MakeReceiveAdapter(&adapterArgs)
Expand Down
20 changes: 13 additions & 7 deletions pkg/reconciler/apiserversource/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ReceiveAdapterArgs struct {
AllNamespaces bool
NodeSelector map[string]string
FailFast bool
DisableCache bool
}

// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for
Expand Down Expand Up @@ -132,14 +133,19 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) {
}

func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
// When DisableCache is true, force cluster-scoped watches to reduce API connections
allNamespaces := args.AllNamespaces || args.DisableCache

cfg := &apiserver.Config{
Namespaces: args.Namespaces,
Resources: make([]apiserver.ResourceWatch, 0, len(args.Source.Spec.Resources)),
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
AllNamespaces: args.AllNamespaces,
Filters: args.Source.Spec.Filters,
FailFast: args.FailFast,
Namespaces: args.Namespaces,
Resources: make([]apiserver.ResourceWatch, 0, len(args.Source.Spec.Resources)),
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
AllNamespaces: allNamespaces,
Filters: args.Source.Spec.Filters,
FailFast: args.FailFast,
DisableCache: args.DisableCache,
OriginalNamespaces: args.Namespaces, // Store for client-side filtering
}

for _, r := range args.Source.Spec.Resources {
Expand Down
Loading