From dbfb8af79c9fd923ccaba41cdc9b419932d6ec7d Mon Sep 17 00:00:00 2001
From: Kunal1522
Date: Wed, 7 Jan 2026 22:00:18 +0530
Subject: [PATCH] Add disableCache flag to ApiServerSource for reduced API
connections This adds a DisableCache field to ApiServerSource that when
enabled: - Forces cluster-scoped watches instead of per-namespace watches -
Reduces API server connections from O(N) to O(1) for N namespaces -
Implements client-side namespace filtering via OriginalNamespaces field -
Prevents client-side throttling in high-namespace scenarios
---
config/core/resources/apiserversource.yaml | 4 +-
docs/eventing-api.md | 34 +++++
pkg/adapter/apiserver/adapter.go | 15 ++
pkg/adapter/apiserver/config.go | 9 ++
pkg/adapter/apiserver/delegate.go | 19 +++
.../apiserver/namespace_filter_test.go | 140 ++++++++++++++++++
pkg/apis/sources/v1/apiserver_types.go | 9 ++
pkg/apis/sources/v1/zz_generated.deepcopy.go | 5 +
.../apiserversource/apiserversource.go | 6 +
.../resources/receive_adapter.go | 20 ++-
.../receive_adapter_disable_cache_test.go | 80 ++++++++++
.../resources/receive_adapter_test.go | 6 +-
12 files changed, 336 insertions(+), 11 deletions(-)
create mode 100644 pkg/adapter/apiserver/namespace_filter_test.go
create mode 100644 pkg/reconciler/apiserversource/resources/receive_adapter_disable_cache_test.go
diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml
index dafba1672d8..ccbab51de91 100644
--- a/config/core/resources/apiserversource.yaml
+++ b/config/core/resources/apiserversource.yaml
@@ -186,7 +186,9 @@ spec:
description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
x-kubernetes-preserve-unknown-fields: true
-
+ disableCache:
+ description: DisableCache, when set to true, configures the adapter to disable client-side caching of Kubernetes resources. This prevents the adapter from using resourceVersion-based caching which can cause throttling when tracking many resources across namespaces.
+ type: boolean
status:
type: object
properties:
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 4f97e43dda8..02eb428bc09 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -7410,6 +7410,23 @@ evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.
+
+
+disableCache
+
+bool
+
+ |
+
+(Optional)
+ DisableCache, when set to true, configures the adapter to use cluster-scoped watches
+instead of per-namespace watches. This reduces the number of API connections from N
+(namespaces) to 1 per resource type, preventing client-side throttling when tracking
+resources across many namespaces. Events from non-selected namespaces are filtered
+client-side in the adapter.
+Defaults to false (per-namespace watches for better isolation).
+ |
+
@@ -8036,6 +8053,23 @@ evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.
+
+
+disableCache
+
+bool
+
+ |
+
+(Optional)
+ DisableCache, when set to true, configures the adapter to use cluster-scoped watches
+instead of per-namespace watches. This reduces the number of API connections from N
+(namespaces) to 1 per resource type, preventing client-side throttling when tracking
+resources across many namespaces. Events from non-selected namespaces are filtered
+client-side in the adapter.
+Defaults to false (per-namespace watches for better isolation).
+ |
+
ApiServerSourceStatus
diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go
index 90128eb3ee6..1c439137d69 100644
--- a/pkg/adapter/apiserver/adapter.go
+++ b/pkg/adapter/apiserver/adapter.go
@@ -191,6 +191,19 @@ func (a *apiServerAdapter) startFailFast(ctx context.Context, stopCh <-chan stru
}
func (a *apiServerAdapter) setupDelegate() cache.Store {
+ // Build namespace filter set for DisableCache mode
+ var allowedNs map[string]struct{}
+ filterByNs := false
+ if a.config.DisableCache && len(a.config.OriginalNamespaces) > 0 && a.config.AllNamespaces {
+ // DisableCache forced cluster-scoped watches, so we need client-side filtering
+ filterByNs = true
+ allowedNs = make(map[string]struct{}, len(a.config.OriginalNamespaces))
+ for _, ns := range a.config.OriginalNamespaces {
+ allowedNs[ns] = struct{}{}
+ }
+ a.logger.Infof("DisableCache enabled: using cluster-scoped watches with client-side filtering for %d namespaces", len(a.config.OriginalNamespaces))
+ }
+
var delegate cache.Store = &resourceDelegate{
ce: a.ce,
source: a.source,
@@ -198,6 +211,8 @@ func (a *apiServerAdapter) setupDelegate() cache.Store {
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
+ allowedNamespaces: allowedNs,
+ filterByNamespace: filterByNs,
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go
index 58831190a53..e03a1a8ab62 100644
--- a/pkg/adapter/apiserver/config.go
+++ b/pkg/adapter/apiserver/config.go
@@ -74,4 +74,13 @@ type Config struct {
// (via the features.knative.dev/apiserversource-skip-permissions-check annotation), and the ApiServerSource
// adapter should not keep trying to establish watches on resources that it perhaps does not have permissions for.
FailFast bool `json:"failFast,omitempty"`
+
+ // DisableCache when true, forces cluster-scoped watches to reduce API connections
+ // and prevent client-side throttling in high-namespace scenarios.
+ DisableCache bool `json:"disableCache,omitempty"`
+
+ // OriginalNamespaces holds the user's intended namespaces for filtering
+ // when DisableCache forces AllNamespaces mode. This allows cluster-scoped
+ // watches while still filtering events to only the desired namespaces.
+ OriginalNamespaces []string `json:"originalNamespaces,omitempty"`
}
diff --git a/pkg/adapter/apiserver/delegate.go b/pkg/adapter/apiserver/delegate.go
index 3bc362261e3..40de604ee09 100644
--- a/pkg/adapter/apiserver/delegate.go
+++ b/pkg/adapter/apiserver/delegate.go
@@ -22,6 +22,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"go.uber.org/zap"
+ "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
"knative.dev/eventing/pkg/eventfilter"
@@ -35,6 +36,12 @@ type resourceDelegate struct {
filter eventfilter.Filter
logger *zap.SugaredLogger
+
+ // Namespace filtering for DisableCache mode
+ // When DisableCache forces cluster-scoped watches, we filter events
+ // to only those from the originally selected namespaces
+ allowedNamespaces map[string]struct{}
+ filterByNamespace bool
}
var _ cache.Store = (*resourceDelegate)(nil)
@@ -57,6 +64,18 @@ func (a *resourceDelegate) Delete(obj interface{}) error {
type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)
func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
+ // Quick namespace filter FIRST (before expensive CloudEvent creation)
+ // This is O(1) map lookup when DisableCache forces cluster-scoped watches
+ if a.filterByNamespace {
+ acc, err := meta.Accessor(obj)
+ if err != nil {
+ return err
+ }
+ if _, ok := a.allowedNamespaces[acc.GetNamespace()]; !ok {
+ return nil // Drop event - not in allowed namespaces
+ }
+ }
+
ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)
if err != nil {
diff --git a/pkg/adapter/apiserver/namespace_filter_test.go b/pkg/adapter/apiserver/namespace_filter_test.go
new file mode 100644
index 00000000000..7a3bac42759
--- /dev/null
+++ b/pkg/adapter/apiserver/namespace_filter_test.go
@@ -0,0 +1,140 @@
+package apiserver
+
+import (
+ "testing"
+
+ "go.uber.org/zap"
+
+ adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
+)
+
+// TestNamespaceFiltering tests that events from non-allowed namespaces are dropped
+func TestNamespaceFiltering(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+
+ // Create delegate with namespace filter for "allowed-ns"
+ allowedNs := map[string]struct{}{
+ "allowed-ns": {},
+ }
+
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ ref: false,
+ apiServerSourceName: "test-source",
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
+ allowedNamespaces: allowedNs,
+ filterByNamespace: true,
+ }
+
+ // Test 1: Event from allowed namespace SHOULD be sent
+ allowedPod := simplePod("allowed-pod", "allowed-ns")
+ err := delegate.Add(allowedPod)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if got := len(ce.Sent()); got != 1 {
+ t.Errorf("Expected 1 event to be sent for allowed namespace, got: %d", got)
+ }
+
+ // Reset before next test
+ ce.Reset()
+
+ // Test 2: Event from non-allowed namespace should NOT be sent
+ deniedPod := simplePod("denied-pod", "denied-ns")
+ err = delegate.Add(deniedPod)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if got := len(ce.Sent()); got != 0 {
+ t.Errorf("Expected 0 events to be sent for denied namespace, got: %d", got)
+ }
+}
+
+// TestNamespaceFilteringDisabled tests that when filtering is disabled, all events are sent
+func TestNamespaceFilteringDisabled(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+
+ logger := zap.NewExample().Sugar()
+ // Create delegate WITHOUT namespace filtering
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ ref: false,
+ apiServerSourceName: "test-source",
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
+ filterByNamespace: false, // Filtering disabled
+ }
+
+ // Both namespaces should send events
+ pod1 := simplePod("pod-1", "ns-1")
+ err := delegate.Add(pod1)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if got := len(ce.Sent()); got != 1 {
+ t.Errorf("Expected 1 event after first pod, got: %d", got)
+ }
+
+ pod2 := simplePod("pod-2", "ns-2")
+ err = delegate.Add(pod2)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if got := len(ce.Sent()); got != 2 {
+ t.Errorf("Expected 2 events after second pod, got: %d", got)
+ }
+}
+
+// TestNamespaceFilteringMultipleAllowed tests multiple allowed namespaces
+func TestNamespaceFilteringMultipleAllowed(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+
+ // Create delegate with multiple allowed namespaces
+ allowedNs := map[string]struct{}{
+ "prod-1": {},
+ "prod-2": {},
+ "prod-3": {},
+ }
+
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ ref: false,
+ apiServerSourceName: "test-source",
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
+ allowedNamespaces: allowedNs,
+ filterByNamespace: true,
+ }
+
+ // Test allowed namespaces - each should add one event
+ expectedCount := 0
+ for _, ns := range []string{"prod-1", "prod-2", "prod-3"} {
+ pod := simplePod("test-pod", ns)
+ err := delegate.Add(pod)
+ if err != nil {
+ t.Fatalf("unexpected error for namespace %s: %v", ns, err)
+ }
+ expectedCount++
+ if got := len(ce.Sent()); got != expectedCount {
+ t.Errorf("Expected %d events after namespace %s, got: %d", expectedCount, ns, got)
+ }
+ }
+
+ // Test denied namespace - count should stay the same
+ deniedPod := simplePod("test-pod", "dev-1")
+ err := delegate.Add(deniedPod)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if got := len(ce.Sent()); got != expectedCount {
+ t.Errorf("Expected %d events (denied namespace shouldn't add), got: %d", expectedCount, got)
+ }
+}
diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go
index e3d30765e8e..ce7ec282911 100644
--- a/pkg/apis/sources/v1/apiserver_types.go
+++ b/pkg/apis/sources/v1/apiserver_types.go
@@ -96,6 +96,15 @@ type ApiServerSourceSpec struct {
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
+
+ // DisableCache, when set to true, configures the adapter to use cluster-scoped watches
+ // instead of per-namespace watches. This reduces the number of API connections from N
+ // (namespaces) to 1 per resource type, preventing client-side throttling when tracking
+ // resources across many namespaces. Events from non-selected namespaces are filtered
+ // client-side in the adapter.
+ // Defaults to false (per-namespace watches for better isolation).
+ // +optional
+ DisableCache *bool `json:"disableCache,omitempty"`
}
// ApiServerSourceStatus defines the observed state of ApiServerSource
diff --git a/pkg/apis/sources/v1/zz_generated.deepcopy.go b/pkg/apis/sources/v1/zz_generated.deepcopy.go
index 8de185540fc..d9d4908f796 100644
--- a/pkg/apis/sources/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/sources/v1/zz_generated.deepcopy.go
@@ -153,6 +153,11 @@ func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
+ if in.DisableCache != nil {
+ in, out := &in.DisableCache, &out.DisableCache
+ *out = new(bool)
+ **out = **in
+ }
return
}
diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go
index b51467033fb..30be9f3c1db 100644
--- a/pkg/reconciler/apiserversource/apiserversource.go
+++ b/pkg/reconciler/apiserversource/apiserversource.go
@@ -239,6 +239,11 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
featureFlags := feature.FromContext(ctx)
+ disableCache := false
+ if src.Spec.DisableCache != nil {
+ disableCache = *src.Spec.DisableCache
+ }
+
adapterArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
@@ -251,6 +256,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
AllNamespaces: allNamespaces,
NodeSelector: featureFlags.NodeSelector(),
FailFast: skipPermissions == "true",
+ DisableCache: disableCache,
}
expected, err := resources.MakeReceiveAdapter(&adapterArgs)
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go
index fc640aa7ced..582a8df01ae 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go
@@ -51,6 +51,7 @@ type ReceiveAdapterArgs struct {
AllNamespaces bool
NodeSelector map[string]string
FailFast bool
+ DisableCache bool
}
// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for
@@ -132,14 +133,19 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) {
}
func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
+ // When DisableCache is true, force cluster-scoped watches to reduce API connections
+ allNamespaces := args.AllNamespaces || args.DisableCache
+
cfg := &apiserver.Config{
- Namespaces: args.Namespaces,
- Resources: make([]apiserver.ResourceWatch, 0, len(args.Source.Spec.Resources)),
- ResourceOwner: args.Source.Spec.ResourceOwner,
- EventMode: args.Source.Spec.EventMode,
- AllNamespaces: args.AllNamespaces,
- Filters: args.Source.Spec.Filters,
- FailFast: args.FailFast,
+ Namespaces: args.Namespaces,
+ Resources: make([]apiserver.ResourceWatch, 0, len(args.Source.Spec.Resources)),
+ ResourceOwner: args.Source.Spec.ResourceOwner,
+ EventMode: args.Source.Spec.EventMode,
+ AllNamespaces: allNamespaces,
+ Filters: args.Source.Spec.Filters,
+ FailFast: args.FailFast,
+ DisableCache: args.DisableCache,
+ OriginalNamespaces: args.Namespaces, // Store for client-side filtering
}
for _, r := range args.Source.Spec.Resources {
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_disable_cache_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_disable_cache_test.go
new file mode 100644
index 00000000000..ebc09d99e18
--- /dev/null
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter_disable_cache_test.go
@@ -0,0 +1,80 @@
+package resources
+
+import (
+ "encoding/json"
+ "testing"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ "knative.dev/eventing/pkg/adapter/apiserver"
+ v1 "knative.dev/eventing/pkg/apis/sources/v1"
+ "knative.dev/eventing/pkg/reconciler/source"
+)
+
+func TestMakeReceiveAdapterWithDisableCache(t *testing.T) {
+ name := "source-name"
+
+ testCases := []struct {
+ name string
+ disableCache bool
+ want bool
+ }{{
+ name: "DisableCache true",
+ disableCache: true,
+ want: true,
+ }, {
+ name: "DisableCache false",
+ disableCache: false,
+ want: false,
+ }}
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ src := &v1.ApiServerSource{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: "source-namespace",
+ UID: "1234",
+ },
+ Spec: v1.ApiServerSourceSpec{
+ Resources: []v1.APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Pod",
+ }},
+ EventMode: "Resource",
+ ServiceAccountName: "source-svc-acct",
+ },
+ }
+
+ args := &ReceiveAdapterArgs{
+ Image: "test-image",
+ Source: src,
+ Labels: Labels(src.Name),
+ SinkURI: "http://sink.example.com",
+ Configs: &source.EmptyVarsGenerator{},
+ Namespaces: []string{"default"},
+ DisableCache: tc.disableCache,
+ }
+
+ deployment, err := MakeReceiveAdapter(args)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Find K_SOURCE_CONFIG env var
+ var config apiserver.Config
+ for _, env := range deployment.Spec.Template.Spec.Containers[0].Env {
+ if env.Name == "K_SOURCE_CONFIG" {
+ if err := json.Unmarshal([]byte(env.Value), &config); err != nil {
+ t.Fatalf("failed to unmarshal K_SOURCE_CONFIG: %v", err)
+ }
+ break
+ }
+ }
+
+ if config.DisableCache != tc.want {
+ t.Errorf("DisableCache mismatch: got %v, want %v", config.DisableCache, tc.want)
+ }
+ })
+ }
+}
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
index 446d464e92b..33f683c820f 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go
@@ -152,7 +152,7 @@ O2dgzikq8iSy1BlRsVw=
Value: "sink-uri",
}, {
Name: "K_SOURCE_CONFIG",
- Value: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}},{"gvr":{"Group":"batch","Version":"v1","Resource":"jobs"}},{"gvr":{"Group":"","Version":"","Resource":"pods"},"selector":"test-key1=test-value1"}],"owner":{"apiVersion":"custom/v1","kind":"Parent"},"mode":"Resource"}`,
+ Value: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}},{"gvr":{"Group":"batch","Version":"v1","Resource":"jobs"}},{"gvr":{"Group":"","Version":"","Resource":"pods"},"selector":"test-key1=test-value1"}],"owner":{"apiVersion":"custom/v1","kind":"Parent"},"mode":"Resource","originalNamespaces":["source-namespace"]}`,
}, {
Name: "SYSTEM_NAMESPACE",
Value: "knative-testing",
@@ -269,12 +269,12 @@ Test certificate content here
{
name: "FailFast true",
failFast: true,
- expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource","failFast":true}`,
+ expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource","failFast":true,"originalNamespaces":["source-namespace"]}`,
},
{
name: "FailFast false",
failFast: false,
- expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource"}`,
+ expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource","originalNamespaces":["source-namespace"]}`,
},
}