From 3512dba94b31b05772d0ce54904d583937b897e4 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 20 Aug 2025 12:29:05 +0900 Subject: [PATCH 01/18] feat: add Lister() to expose informer cache bia GenericLister Signed-off-by: yeonsoo --- internal/informer/informer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/informer/informer.go b/internal/informer/informer.go index 31799a33..022563d0 100644 --- a/internal/informer/informer.go +++ b/internal/informer/informer.go @@ -28,6 +28,7 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) @@ -57,6 +58,9 @@ type Informer[T runtime.Object] struct { resType reflect.Type + // groupResource is the group and resource of the watched objects. + groupResource schema.GroupResource + // logger is this informer's logger. logger *logrus.Entry @@ -109,6 +113,13 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T i := &Informer[T]{} var r T i.resType = reflect.TypeOf(r) + + // groupResource is the group and resource of the watched objects. + i.groupResource = schema.GroupResource{ + Group: "argoproj.io", + Resource: "applications", + } + i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{ "type": i.resType, "module": "Informer", @@ -287,3 +298,8 @@ func (i *Informer[T]) WaitForSync(ctx context.Context) error { } return nil } + +// Lister returns a GenericLister that can be used to list and get cached resources. +func (i *Informer[T]) Lister() cache.GenericLister { + return cache.NewGenericLister(i.informer.GetIndexer(), i.groupResource) +} From 7de11645ae9376a73f0a107923e62104dcfec0ea Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 20 Aug 2025 12:29:34 +0900 Subject: [PATCH 02/18] feat: use informr cache in KubernetesBackend Get method Signed-off-by: yeonsoo --- .../kubernetes/application/kubernetes.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index b7905cae..c48347be 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -30,6 +30,7 @@ import ( appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" ) var _ backend.Application = &KubernetesBackend{} @@ -43,18 +44,24 @@ type KubernetesBackend struct { appClient appclientset.Interface // appInformer is used to watch for change events for Argo CD Application resources on the cluster appInformer informer.InformerInterface + // appLister is used to list Argo CD Application resources from the cache + appLister cache.GenericLister // namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases namespace string usePatch bool } func NewKubernetesBackend(appClient appclientset.Interface, namespace string, appInformer informer.InformerInterface, usePatch bool) *KubernetesBackend { - return &KubernetesBackend{ + be := &KubernetesBackend{ appClient: appClient, appInformer: appInformer, usePatch: usePatch, namespace: namespace, } + if specificInformer, ok := appInformer.(*informer.Informer[*v1alpha1.Application]); ok { + be.appLister = specificInformer.Lister() + } + return be } func (be *KubernetesBackend) List(ctx context.Context, selector backend.ApplicationSelector) ([]v1alpha1.Application, error) { @@ -82,7 +89,15 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati } func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { - return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) + obj, err := be.appLister.ByNamespace(namespace).Get(name) + if err != nil { + return nil, err + } + app, ok := obj.(*v1alpha1.Application) + if !ok { + return nil, fmt.Errorf("object is not an Application: %T", obj) + } + return app, nil } func (be *KubernetesBackend) Delete(ctx context.Context, name string, namespace string, deletionPropagation *backend.DeletionPropagation) error { From 87fb31126549d31ab841b816c9b2a6725227964e Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 20 Aug 2025 15:48:27 +0900 Subject: [PATCH 03/18] test: add unit tests for Get method using informer cache Signed-off-by: yeonsoo --- .../kubernetes/application/kubernetes_test.go | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/internal/backend/kubernetes/application/kubernetes_test.go b/internal/backend/kubernetes/application/kubernetes_test.go index aed06fc0..ae8d76e0 100644 --- a/internal/backend/kubernetes/application/kubernetes_test.go +++ b/internal/backend/kubernetes/application/kubernetes_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/argoproj-labs/argocd-agent/internal/backend" + "github.com/argoproj-labs/argocd-agent/internal/informer" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake" "github.com/stretchr/testify/assert" @@ -28,6 +29,7 @@ import ( "github.com/wI2L/jsondiff" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" ) func Test_NewKubernetes(t *testing.T) { @@ -110,19 +112,57 @@ func Test_Create(t *testing.T) { func Test_Get(t *testing.T) { apps := mkApps() + ctx := context.TODO() t.Run("Get existing app", func(t *testing.T) { fakeAppC := fakeappclient.NewSimpleClientset(apps...) - k := NewKubernetesBackend(fakeAppC, "", nil, true) - app, err := k.Get(context.TODO(), "app", "ns1") + + inf, err := informer.NewInformer[*v1alpha1.Application]( + ctx, + informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options) + }), + informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) + }), + ) + require.NoError(t, err) + + go inf.Start(ctx) + require.NoError(t, inf.WaitForSync(ctx)) + + // Create the backend with the informer + backend := NewKubernetesBackend(fakeAppC, "", inf, true) + + app, err := backend.Get(ctx, "app", "ns1") assert.NoError(t, err) assert.NotNil(t, app) + assert.Equal(t, "app", app.Name) + assert.Equal(t, "ns1", app.Namespace) + }) t.Run("Get non-existing app", func(t *testing.T) { fakeAppC := fakeappclient.NewSimpleClientset(apps...) - k := NewKubernetesBackend(fakeAppC, "", nil, true) - app, err := k.Get(context.TODO(), "foo", "ns1") - assert.ErrorContains(t, err, "not found") - assert.Equal(t, &v1alpha1.Application{}, app) + inf, err := informer.NewInformer[*v1alpha1.Application]( + ctx, + informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options) + }), + informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) + }), + ) + require.NoError(t, err) + // Start the informer + go inf.Start(ctx) + require.NoError(t, inf.WaitForSync(ctx)) + + // Create the backend with the informer + backend := NewKubernetesBackend(fakeAppC, "", inf, true) + + app, err := backend.Get(ctx, "nonexistent", "ns1") + require.Error(t, err) + require.Nil(t, app) + }) } From c0b9baafc77d508fee2e4303c4d0d00fc1fa6667 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Thu, 21 Aug 2025 15:02:41 +0900 Subject: [PATCH 04/18] fix: Return deep copy of Application object in Get() Signed-off-by: yeonsoo --- internal/backend/kubernetes/application/kubernetes.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index c48347be..efc44667 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -97,7 +97,8 @@ func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace str if !ok { return nil, fmt.Errorf("object is not an Application: %T", obj) } - return app, nil + // Return a deep copy to prevent mutations + return app.DeepCopy(), nil } func (be *KubernetesBackend) Delete(ctx context.Context, name string, namespace string, deletionPropagation *backend.DeletionPropagation) error { From 3a22c60c432f09f6d03253be865ca3162539e346 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Sat, 23 Aug 2025 16:55:37 +0900 Subject: [PATCH 05/18] test: add informer start Signed-off-by: yeonsoo --- .../manager/application/application_test.go | 20 +++++++++++++++++++ principal/event_test.go | 4 ++++ 2 files changed, 24 insertions(+) diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index b40d4a90..81152fcb 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -58,6 +58,19 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f informer.WithNamespaceScope[*v1alpha1.Application](namespace), ) require.NoError(t, err) + + go func() { + err = informer.Start(context.Background()) + if err != nil { + t.Fatalf("failed to start informer: %v", err) + } + }() + + // cache.WaitForCacheSync(context.Background().Done(), informer.HasSynced) + if err = informer.WaitForSync(context.Background()); err != nil { + t.Fatalf("failed to wait for informer sync: %v", err) + } + return appC, informer } @@ -69,6 +82,7 @@ func fakeAppManager(t *testing.T, objects ...runtime.Object) (*fakeappclient.Cli am, err := NewApplicationManager(be, "argocd") assert.NoError(t, err) + // go am.StartBackend(context.Background()) return appC, am } @@ -209,7 +223,13 @@ func Test_ManagerUpdateManaged(t *testing.T) { mgr, err := NewApplicationManager(be, "argocd", WithMode(manager.ManagerModeManaged), WithRole(manager.ManagerRoleAgent)) require.NoError(t, err) + // ctx, cancel := context.WithCancel(context.Background()) + // defer cancel() + // go mgr.StartBackend(ctx) + // cache.WaitForCacheSync(ctx.Done(), ai.HasSynced) + updated, err := mgr.UpdateManagedApp(context.Background(), incoming) + require.NoError(t, err) require.NotNil(t, updated) diff --git a/principal/event_test.go b/principal/event_test.go index 1dd16526..79411d94 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -128,6 +128,7 @@ func Test_CreateEvents(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s.Start(context.Background(), make(chan error)) s.clusterMgr.MapCluster("argocd", &v1alpha1.Cluster{Name: "argocd", Server: "https://argocd.com"}) require.NoError(t, err) s.setAgentMode("argocd", types.AgentModeAutonomous) @@ -247,6 +248,7 @@ func Test_CreateEvents(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s.Start(context.Background(), make(chan error)) require.NoError(t, err) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -327,6 +329,7 @@ func Test_UpdateEvents(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) + s.Start(context.Background(), make(chan error)) s.setAgentMode("foo", types.AgentModeAutonomous) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -446,6 +449,7 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { wq.On("Done", &ev) s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) require.NoError(t, err) + s.setAgentMode("foo", types.AgentModeManaged) _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Create(context.Background(), delApp, v1.CreateOptions{}) From da532147c205efe5fac00b06ccc65d434b194236 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 02:56:27 +0900 Subject: [PATCH 06/18] fix: flaky principal tests Add configurable informer sync timeout and proper context handling to resolve intermittent test failures. Signed-off-by: yeonsoo --- principal/event_test.go | 102 +++++++++++++++++++++++++++++---------- principal/listen_test.go | 9 ++-- principal/options.go | 28 +++++++---- principal/server.go | 17 +++++-- 4 files changed, 113 insertions(+), 43 deletions(-) diff --git a/principal/event_test.go b/principal/event_test.go index 79411d94..7beca826 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -247,15 +247,32 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) - s.Start(context.Background(), make(chan error)) + + // Use context with timeout to prevent hanging + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Create server with short sync timeout for testing + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + WithInformerSyncTimeout(5*time.Second), + ) require.NoError(t, err) + + // Ensure cleanup + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) + require.NoError(t, err) + s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) s.setAgentMode("foo", types.AgentModeAutonomous) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(ctx, "foo", wq) assert.Nil(t, err) require.Equal(t, ev, *got) - napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) + napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(ctx, "test", v1.GetOptions{}) assert.NoError(t, err) require.NotNil(t, napp) assert.Empty(t, napp.OwnerReferences) @@ -327,15 +344,29 @@ func Test_UpdateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + WithInformerSyncTimeout(2*time.Second), + ) require.NoError(t, err) - s.Start(context.Background(), make(chan error)) + + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) + require.NoError(t, err) + s.setAgentMode("foo", types.AgentModeAutonomous) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(ctx, "foo", wq) require.Equal(t, ev, *got) assert.NoError(t, err) - napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) + napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(ctx, "test", v1.GetOptions{}) assert.NoError(t, err) require.NotNil(t, napp) assert.Equal(t, "HEAD", napp.Spec.Source.TargetRevision) @@ -412,11 +443,9 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { t.Run(test.name, func(t *testing.T) { delApp := &v1alpha1.Application{ ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "foo", - Finalizers: []string{ - "test-finalizers", - }, + Name: "test", + Namespace: "foo", + Finalizers: []string{}, }, Spec: v1alpha1.ApplicationSpec{ Source: &v1alpha1.ApplicationSource{ @@ -439,7 +468,9 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { delApp.DeletionTimestamp = ptr.To(v1.Time{Time: time.Now()}) } - fac := kube.NewKubernetesFakeClientWithApps("argocd") + // Create fake client with the application already in it + fac := kube.NewKubernetesFakeClientWithApps("argocd", delApp) + ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType(event.Delete.String()) @@ -447,31 +478,50 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + WithInformerSyncTimeout(2*time.Second), + ) + require.NoError(t, err) + + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Create(context.Background(), delApp, v1.CreateOptions{}) + var cachedApp *v1alpha1.Application + for i := 0; i < 20; i++ { + cachedApp, err = s.appManager.Get(ctx, delApp.Name, delApp.Namespace) + if err == nil { + break + } + time.Sleep(50 * time.Millisecond) + } + require.NoError(t, err) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + if test.deletionTimestampSetOnPrincipal { + require.NotNil(t, cachedApp.DeletionTimestamp) + } + + got, err := s.processRecvQueue(ctx, "foo", wq) require.NoError(t, err) if test.deletionTimestampSetOnPrincipal { - // If deletionTimestamp is set on principal, the Application should be removed by the call require.NoError(t, err) require.Equal(t, ev, *got) - - // Verify Application is deleted - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(context.Background(), delApp.Name, v1.GetOptions{}) + _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(ctx, delApp.Name, v1.GetOptions{}) require.True(t, apierrors.IsNotFound(err)) - } else { - // If deletionTimestamp is NOT set on principal, the Application should NOT be removed by the call require.NoError(t, err) - - // Verify Application is NOT deleted - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(context.Background(), delApp.Name, v1.GetOptions{}) + _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(ctx, delApp.Name, v1.GetOptions{}) require.NoError(t, err) } }) diff --git a/principal/listen_test.go b/principal/listen_test.go index c2e1a11f..86525fd0 100644 --- a/principal/listen_test.go +++ b/principal/listen_test.go @@ -144,19 +144,22 @@ func Test_Serve(t *testing.T) { templ := certTempl fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ) - // We start a real (non-mocked) server - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s, err := NewServer(ctx, kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")), WithGeneratedTokenSigningKey(), WithListenerPort(0), WithListenerAddress("127.0.0.1"), WithShutDownGracePeriod(2*time.Second), WithGRPC(true), + WithInformerSyncTimeout(5*time.Second), ) require.NoError(t, err) errch := make(chan error) - err = s.Start(context.Background(), errch) + err = s.Start(ctx, errch) require.NoError(t, err) // Create and register authentication method diff --git a/principal/options.go b/principal/options.go index 1b108d42..1508d61d 100644 --- a/principal/options.go +++ b/principal/options.go @@ -72,9 +72,10 @@ type ServerOptions struct { rootCa *x509.CertPool clientCertSubjectMatch bool redisAddress string - redisPassword string - redisCompressionType cacheutil.RedisCompressionType - healthzPort int + redisPassword string + redisCompressionType cacheutil.RedisCompressionType + healthzPort int + informerSyncTimeout time.Duration } type ServerOption func(o *Server) error @@ -82,12 +83,13 @@ type ServerOption func(o *Server) error // defaultOptions returns a set of default options for the server func defaultOptions() *ServerOptions { return &ServerOptions{ - port: 443, - address: "", - tlsMinVersion: tls.VersionTLS13, - unauthMethods: make(map[string]bool), - eventProcessors: 10, - rootCa: x509.NewCertPool(), + port: 443, + address: "", + tlsMinVersion: tls.VersionTLS13, + unauthMethods: make(map[string]bool), + eventProcessors: 10, + rootCa: x509.NewCertPool(), + informerSyncTimeout: 60 * time.Second } } @@ -415,6 +417,14 @@ func WithWebSocket(enableWebSocket bool) ServerOption { } } +// WithInformerSyncTimeout sets the informer sync timeout duration. +func WithInformerSyncTimeout(timeout time.Duration) ServerOption { + return func(o *Server) error { + o.options.informerSyncTimeout = timeout + return nil + } +} + func WithResourceProxyEnabled(enabled bool) ServerOption { return func(o *Server) error { o.resourceProxyEnabled = enabled diff --git a/principal/server.go b/principal/server.go index 626a63ea..e7ef1dab 100644 --- a/principal/server.go +++ b/principal/server.go @@ -337,7 +337,9 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace s.resourceProxyListenAddr = defaultResourceProxyListenerAddr } - s.redisProxy = redisproxy.New(defaultRedisProxyListenerAddr, s.options.redisAddress, s.sendSynchronousRedisMessageToAgent) + if s.options.redisAddress != "" { + s.redisProxy = redisproxy.New(defaultRedisProxyListenerAddr, s.options.redisAddress, s.sendSynchronousRedisMessageToAgent) + } // Instantiate our ResourceProxy to intercept Kubernetes requests from Argo // CD's API server. @@ -508,17 +510,22 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { s.events = event.NewEventSource(s.options.serverName) - if err := s.appManager.EnsureSynced(waitForSyncedDuration); err != nil { + syncTimeout := s.options.informerSyncTimeout + if syncTimeout == 0 { + syncTimeout = waitForSyncedDuration + } + + if err := s.appManager.EnsureSynced(syncTimeout); err != nil { return fmt.Errorf("unable to sync Application informer: %w", err) } log().Infof("Application informer synced and ready") - if err := s.projectManager.EnsureSynced(waitForSyncedDuration); err != nil { + if err := s.projectManager.EnsureSynced(syncTimeout); err != nil { return fmt.Errorf("unable to sync AppProject informer: %w", err) } log().Infof("AppProject informer synced and ready") - if err := s.repoManager.EnsureSynced(waitForSyncedDuration); err != nil { + if err := s.repoManager.EnsureSynced(syncTimeout); err != nil { return fmt.Errorf("unable to sync Repository informer: %w", err) } log().Infof("Repository informer synced and ready") @@ -538,7 +545,7 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { if err != nil { return err } - if err := s.namespaceManager.EnsureSynced(waitForSyncedDuration); err != nil { + if err := s.namespaceManager.EnsureSynced(syncTimeout); err != nil { return fmt.Errorf("unable to sync Namespace informer: %w", err) } log().Infof("Namespace informer synced and ready") From 53100da024cd5d62ac25dd0a8e6ac1fb81e69e2c Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 03:01:39 +0900 Subject: [PATCH 07/18] fix: syntax error in options.go Signed-off-by: yeonsoo --- principal/options.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/principal/options.go b/principal/options.go index 1508d61d..157dd4ee 100644 --- a/principal/options.go +++ b/principal/options.go @@ -72,10 +72,10 @@ type ServerOptions struct { rootCa *x509.CertPool clientCertSubjectMatch bool redisAddress string - redisPassword string - redisCompressionType cacheutil.RedisCompressionType - healthzPort int - informerSyncTimeout time.Duration + redisPassword string + redisCompressionType cacheutil.RedisCompressionType + healthzPort int + informerSyncTimeout time.Duration } type ServerOption func(o *Server) error @@ -89,7 +89,7 @@ func defaultOptions() *ServerOptions { unauthMethods: make(map[string]bool), eventProcessors: 10, rootCa: x509.NewCertPool(), - informerSyncTimeout: 60 * time.Second + informerSyncTimeout: 60 * time.Second, } } From 705aed2fa7c31d41d5bf42669be414ba30e2a013 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 14:00:51 +0900 Subject: [PATCH 08/18] fix: flaky principal tests Add configurable informer timeout and Redis proxy disable for tests. Signed-off-by: yeonsoo --- principal/event_test.go | 41 ++++++++++++++++++++-------------------- principal/listen_test.go | 1 + principal/options.go | 9 +++++++++ principal/server.go | 2 +- principal/server_test.go | 9 +++++++-- 5 files changed, 38 insertions(+), 24 deletions(-) diff --git a/principal/event_test.go b/principal/event_test.go index 7beca826..78c8daa7 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -43,7 +43,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unknown target") @@ -57,7 +57,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unable to process event of type application") @@ -72,7 +72,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "failed to unmarshal") @@ -88,7 +88,7 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorIs(t, err, event.ErrEventDiscarded) @@ -127,7 +127,7 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) s.Start(context.Background(), make(chan error)) s.clusterMgr.MapCluster("argocd", &v1alpha1.Cluster{Name: "argocd", Server: "https://argocd.com"}) require.NoError(t, err) @@ -181,7 +181,7 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -248,18 +248,16 @@ func Test_CreateEvents(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - // Use context with timeout to prevent hanging ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Create server with short sync timeout for testing s, err := NewServer(ctx, fac, "argocd", WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(5*time.Second), + WithRedisProxyDisabled(), ) require.NoError(t, err) - // Ensure cleanup defer func() { _ = s.Shutdown() }() @@ -351,6 +349,7 @@ func Test_UpdateEvents(t *testing.T) { s, err := NewServer(ctx, fac, "argocd", WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(2*time.Second), + WithRedisProxyDisabled(), ) require.NoError(t, err) @@ -410,7 +409,7 @@ func Test_UpdateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -532,7 +531,7 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Namespace creation is not enabled", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.False(t, created) @@ -540,7 +539,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { }) t.Run("Pattern matches and namespace is created", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a-z]+$", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a-z]+$", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.True(t, created) @@ -549,7 +548,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Pattern does not match and namespace is not created", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a]+$", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a]+$", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.False(t, created) @@ -558,7 +557,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Namespace is created only once", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.True(t, created) @@ -576,7 +575,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) require.Equal(t, ev, *got) @@ -622,7 +621,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -683,7 +682,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -727,7 +726,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -802,7 +801,7 @@ func Test_processIncomingResourceResyncEvent(t *testing.T) { fakeClient := kube.NewKubernetesFakeClientWithApps(agentName) fakeClient.RestConfig = &rest.Config{} - s, err := NewServer(ctx, fakeClient, agentName, WithGeneratedTokenSigningKey()) + s, err := NewServer(ctx, fakeClient, agentName, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.Nil(t, err) err = s.queues.Create(agentName) @@ -934,7 +933,7 @@ func Test_processClusterCacheInfoUpdateEvent(t *testing.T) { // Create a server with fake client fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode(agentName, types.AgentModeAutonomous) @@ -962,7 +961,7 @@ func Test_processClusterCacheInfoUpdateEvent(t *testing.T) { // Create a server with fake client fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode(agentName, types.AgentModeAutonomous) diff --git a/principal/listen_test.go b/principal/listen_test.go index 86525fd0..a019e607 100644 --- a/principal/listen_test.go +++ b/principal/listen_test.go @@ -155,6 +155,7 @@ func Test_Serve(t *testing.T) { WithShutDownGracePeriod(2*time.Second), WithGRPC(true), WithInformerSyncTimeout(5*time.Second), + WithRedisProxyDisabled(), ) require.NoError(t, err) errch := make(chan error) diff --git a/principal/options.go b/principal/options.go index 157dd4ee..d27c03ac 100644 --- a/principal/options.go +++ b/principal/options.go @@ -76,6 +76,7 @@ type ServerOptions struct { redisCompressionType cacheutil.RedisCompressionType healthzPort int informerSyncTimeout time.Duration + redisProxyDisabled bool } type ServerOption func(o *Server) error @@ -425,6 +426,14 @@ func WithInformerSyncTimeout(timeout time.Duration) ServerOption { } } +// WithRedisProxyDisabled disables the Redis proxy for testing. +func WithRedisProxyDisabled() ServerOption { + return func(o *Server) error { + o.options.redisProxyDisabled = true + return nil + } +} + func WithResourceProxyEnabled(enabled bool) ServerOption { return func(o *Server) error { o.resourceProxyEnabled = enabled diff --git a/principal/server.go b/principal/server.go index e7ef1dab..6aadb07e 100644 --- a/principal/server.go +++ b/principal/server.go @@ -337,7 +337,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace s.resourceProxyListenAddr = defaultResourceProxyListenerAddr } - if s.options.redisAddress != "" { + if !s.options.redisProxyDisabled { s.redisProxy = redisproxy.New(defaultRedisProxyListenerAddr, s.options.redisAddress, s.sendSynchronousRedisMessageToAgent) } diff --git a/principal/server_test.go b/principal/server_test.go index dfae05fb..15514e16 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -52,6 +52,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) tlsConfig, err := s.loadTLSConfig() @@ -62,6 +63,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "other-cert.crt"), path.Join(tempDir, "other-cert.key")), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) tlsConfig, err := s.loadTLSConfig() @@ -73,6 +75,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath("server_test.go", "server_test.go"), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) require.NotNil(t, s) @@ -84,7 +87,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { func Test_NewServer(t *testing.T) { t.Run("Instantiate new server object with non-default options", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey()) + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.NoError(t, err) assert.NotNil(t, s) assert.NotEqual(t, defaultOptions(), s.options) @@ -92,7 +95,7 @@ func Test_NewServer(t *testing.T) { }) t.Run("Instantiate new server object with invalid option", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey()) + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.Error(t, err) assert.Nil(t, s) }) @@ -101,6 +104,7 @@ func Test_NewServer(t *testing.T) { func Test_handleResyncOnConnect(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) s.kubeClient.RestConfig = &rest.Config{} require.NoError(t, err) @@ -153,6 +157,7 @@ func Test_handleResyncOnConnect(t *testing.T) { func Test_RunHandlersOnConnect(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) s.events = event.NewEventSource("test") From 4933a062d199aededf049dc7df135c61ad193ea5 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 15:07:52 +0900 Subject: [PATCH 09/18] fix: flaky tests and e2e stability Add configurable informer timeout and Redis proxy disable for tests. Implement cache-first with API server fallback in Get method. Signed-off-by: yeonsoo --- .../kubernetes/application/kubernetes.go | 21 +++--- .../kubernetes/application/kubernetes_test.go | 69 ++++++++++++++++++- internal/informer/informer_test.go | 34 +++++++++ principal/options_test.go | 15 ++++ principal/server_test.go | 48 +++++++++++++ 5 files changed, 175 insertions(+), 12 deletions(-) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index efc44667..7e74a17c 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -89,16 +89,19 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati } func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { - obj, err := be.appLister.ByNamespace(namespace).Get(name) - if err != nil { - return nil, err - } - app, ok := obj.(*v1alpha1.Application) - if !ok { - return nil, fmt.Errorf("object is not an Application: %T", obj) + if be.appLister != nil { + obj, err := be.appLister.ByNamespace(namespace).Get(name) + if err != nil { + return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) + } + app, ok := obj.(*v1alpha1.Application) + if !ok { + return nil, fmt.Errorf("object is not an Application: %T", obj) + } + return app.DeepCopy(), nil } - // Return a deep copy to prevent mutations - return app.DeepCopy(), nil + + return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) } func (be *KubernetesBackend) Delete(ctx context.Context, name string, namespace string, deletionPropagation *backend.DeletionPropagation) error { diff --git a/internal/backend/kubernetes/application/kubernetes_test.go b/internal/backend/kubernetes/application/kubernetes_test.go index ae8d76e0..8209aceb 100644 --- a/internal/backend/kubernetes/application/kubernetes_test.go +++ b/internal/backend/kubernetes/application/kubernetes_test.go @@ -27,9 +27,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/wI2L/jsondiff" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" ) func Test_NewKubernetes(t *testing.T) { @@ -152,20 +155,80 @@ func Test_Get(t *testing.T) { }), ) require.NoError(t, err) - // Start the informer go inf.Start(ctx) require.NoError(t, inf.WaitForSync(ctx)) - // Create the backend with the informer backend := NewKubernetesBackend(fakeAppC, "", inf, true) app, err := backend.Get(ctx, "nonexistent", "ns1") + assert.ErrorContains(t, err, "not found") + assert.Equal(t, &v1alpha1.Application{}, app) + + }) + + t.Run("Get returns type assertion error for invalid object", func(t *testing.T) { + fakeAppC := fakeappclient.NewSimpleClientset() + + mockInf := &mockInformerWithInvalidType{} + + backend := &KubernetesBackend{ + appClient: fakeAppC, + appLister: mockInf.Lister(), + } + + app, err := backend.Get(ctx, "test", "ns1") require.Error(t, err) require.Nil(t, app) - + assert.Contains(t, err.Error(), "object is not an Application") }) } +type mockInformerWithInvalidType struct{} + +func (m *mockInformerWithInvalidType) Start(ctx context.Context) error { + return nil +} + +func (m *mockInformerWithInvalidType) WaitForSync(ctx context.Context) error { + return nil +} + +func (m *mockInformerWithInvalidType) HasSynced() bool { + return true +} + +func (m *mockInformerWithInvalidType) Stop() error { + return nil +} + +func (m *mockInformerWithInvalidType) Lister() cache.GenericLister { + return &mockListerWithInvalidType{} +} + +type mockListerWithInvalidType struct{} + +func (m *mockListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) { + return nil, nil +} + +func (m *mockListerWithInvalidType) Get(name string) (runtime.Object, error) { + return &corev1.ConfigMap{}, nil +} + +func (m *mockListerWithInvalidType) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &mockNamespaceListerWithInvalidType{} +} + +type mockNamespaceListerWithInvalidType struct{} + +func (m *mockNamespaceListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) { + return nil, nil +} + +func (m *mockNamespaceListerWithInvalidType) Get(name string) (runtime.Object, error) { + return &corev1.ConfigMap{}, nil +} + func Test_Delete(t *testing.T) { apps := mkApps() t.Run("Delete existing app", func(t *testing.T) { diff --git a/internal/informer/informer_test.go b/internal/informer/informer_test.go index 39a7cdb6..e13e230e 100644 --- a/internal/informer/informer_test.go +++ b/internal/informer/informer_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) @@ -233,6 +234,39 @@ func Test_InformerScope(t *testing.T) { } +func Test_Lister(t *testing.T) { + t.Run("Lister returns GenericLister", func(t *testing.T) { + i := newInformer(t, "", apps[0], apps[1]) + go i.Start(context.TODO()) + require.NoError(t, i.WaitForSync(context.TODO())) + defer i.Stop() + + lister := i.Lister() + require.NotNil(t, lister) + + obj, err := lister.ByNamespace("argocd").Get("test1") + require.NoError(t, err) + require.NotNil(t, obj) + + app, ok := obj.(*v1alpha1.Application) + require.True(t, ok) + assert.Equal(t, "test1", app.Name) + assert.Equal(t, "argocd", app.Namespace) + }) + + t.Run("Lister can list objects", func(t *testing.T) { + i := newInformer(t, "", apps[0], apps[1]) + go i.Start(context.TODO()) + require.NoError(t, i.WaitForSync(context.TODO())) + defer i.Stop() + + lister := i.Lister() + objs, err := lister.List(labels.Everything()) + require.NoError(t, err) + assert.Len(t, objs, 2) + }) +} + func init() { logrus.SetLevel(logrus.TraceLevel) } diff --git a/principal/options_test.go b/principal/options_test.go index 8a63e506..dd570a5c 100644 --- a/principal/options_test.go +++ b/principal/options_test.go @@ -17,6 +17,7 @@ package principal import ( "crypto/tls" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -90,3 +91,17 @@ func Test_WithMinimumTLSVersion(t *testing.T) { } }) } + +func Test_WithInformerSyncTimeout(t *testing.T) { + s := &Server{options: &ServerOptions{}} + err := WithInformerSyncTimeout(5 * time.Second)(s) + assert.NoError(t, err) + assert.Equal(t, 5*time.Second, s.options.informerSyncTimeout) +} + +func Test_WithRedisProxyDisabled(t *testing.T) { + s := &Server{options: &ServerOptions{}} + err := WithRedisProxyDisabled()(s) + assert.NoError(t, err) + assert.True(t, s.options.redisProxyDisabled) +} diff --git a/principal/server_test.go b/principal/server_test.go index 15514e16..8ebbd2ba 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -99,6 +99,34 @@ func Test_NewServer(t *testing.T) { assert.Error(t, err) assert.Nil(t, s) }) + + t.Run("Redis proxy should be nil when disabled", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Nil(t, s.redisProxy) + }) + + t.Run("Redis proxy should be created when not disabled", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.NotNil(t, s.redisProxy) + }) + + t.Run("Informer sync timeout should be configurable", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(10*time.Second), WithRedisProxyDisabled()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Equal(t, 10*time.Second, s.options.informerSyncTimeout) + }) + + t.Run("Informer sync timeout should default to 60s when not set", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Equal(t, 60*time.Second, s.options.informerSyncTimeout) + }) } func Test_handleResyncOnConnect(t *testing.T) { @@ -182,6 +210,26 @@ func Test_RunHandlersOnConnect(t *testing.T) { assert.Equal(t, expected, got) } +func Test_ServerStartWithDefaultSyncTimeout(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, + WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), + ) + require.NoError(t, err) + s.kubeClient.RestConfig = &rest.Config{} + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + errch := make(chan error, 1) + err = s.Start(ctx, errch) + require.NoError(t, err) + + defer s.Shutdown() + + assert.Equal(t, 60*time.Second, s.options.informerSyncTimeout) +} + func init() { logrus.SetLevel(logrus.TraceLevel) } From 88a1c1445c852dc4ba8aeb4fd7a4f1e966ec8eb9 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 16:15:47 +0900 Subject: [PATCH 10/18] fix: prevent panic in Get when namespace lister is nil Signed-off-by: yeonsoo --- .../kubernetes/application/kubernetes.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index 7e74a17c..0137158c 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -90,15 +90,18 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { if be.appLister != nil { - obj, err := be.appLister.ByNamespace(namespace).Get(name) - if err != nil { - return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) - } - app, ok := obj.(*v1alpha1.Application) - if !ok { - return nil, fmt.Errorf("object is not an Application: %T", obj) + namespaceLister := be.appLister.ByNamespace(namespace) + if namespaceLister != nil { + obj, err := namespaceLister.Get(name) + if err != nil { + return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) + } + app, ok := obj.(*v1alpha1.Application) + if !ok { + return nil, fmt.Errorf("object is not an Application: %T", obj) + } + return app.DeepCopy(), nil } - return app.DeepCopy(), nil } return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) From eee800dfe1c1700cc679ac47a70b3c28a76e7a8c Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 17:16:29 +0900 Subject: [PATCH 11/18] feat: use informer cache for Get with context-based API routing - Get() uses cache by default, API server when forUpdate=true - update() sets forUpdate context to ensure latest ResourceVersion - Prevents update conflicts while improving read performance Signed-off-by: yeonsoo --- internal/backend/interface.go | 4 ++++ internal/backend/kubernetes/application/kubernetes.go | 4 +++- internal/manager/application/application.go | 5 ++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/backend/interface.go b/internal/backend/interface.go index f3d35fe1..e694e291 100644 --- a/internal/backend/interface.go +++ b/internal/backend/interface.go @@ -25,6 +25,10 @@ import ( corev1 "k8s.io/api/core/v1" ) +type ContextKey string + +const ForUpdateContextKey ContextKey = "forUpdate" + type ApplicationSelector struct { // Labels is not currently implemented. diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index 0137158c..ef95de20 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -89,7 +89,9 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati } func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { - if be.appLister != nil { + forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool) + + if !forUpdate && be.appLister != nil && be.appInformer.HasSynced() { namespaceLister := be.appLister.ByNamespace(namespace) if namespaceLister != nil { obj, err := namespaceLister.Get(name) diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 38177954..1f6f802e 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -555,8 +555,11 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom // be returned. func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) { var updated *v1alpha1.Application + + ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true) + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - existing, ierr := m.applicationBackend.Get(ctx, incoming.Name, incoming.Namespace) + existing, ierr := m.applicationBackend.Get(ctxForUpdate, incoming.Name, incoming.Namespace) if ierr != nil { if errors.IsNotFound(ierr) && upsert { updated, ierr = m.Create(ctx, incoming) From 2620360de9044dc0e8950c731f1608329aca8c02 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 17:51:41 +0900 Subject: [PATCH 12/18] feat: add nil check Signed-off-by: yeonsoo --- internal/manager/application/application.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 1f6f802e..25007c06 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -556,6 +556,9 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) { var updated *v1alpha1.Application + if ctx == nil { + ctx = context.Background() + } ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true) err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { From 3edf88ae695a3d72078dd52888334e4444fdf4b5 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 18:28:08 +0900 Subject: [PATCH 13/18] feat: set appInformer for test backend Signed-off-by: yeonsoo --- internal/backend/kubernetes/application/kubernetes.go | 2 +- internal/backend/kubernetes/application/kubernetes_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index ef95de20..58fbb069 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -91,7 +91,7 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool) - if !forUpdate && be.appLister != nil && be.appInformer.HasSynced() { + if !forUpdate && be.appLister != nil && be.appInformer != nil && be.appInformer.HasSynced() { namespaceLister := be.appLister.ByNamespace(namespace) if namespaceLister != nil { obj, err := namespaceLister.Get(name) diff --git a/internal/backend/kubernetes/application/kubernetes_test.go b/internal/backend/kubernetes/application/kubernetes_test.go index 8209aceb..45a6345c 100644 --- a/internal/backend/kubernetes/application/kubernetes_test.go +++ b/internal/backend/kubernetes/application/kubernetes_test.go @@ -172,8 +172,9 @@ func Test_Get(t *testing.T) { mockInf := &mockInformerWithInvalidType{} backend := &KubernetesBackend{ - appClient: fakeAppC, - appLister: mockInf.Lister(), + appClient: fakeAppC, + appInformer: mockInf, + appLister: mockInf.Lister(), } app, err := backend.Get(ctx, "test", "ns1") From df1199a4368f327c3323b4a54a74b063e6abec9c Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Wed, 8 Oct 2025 19:19:47 +0900 Subject: [PATCH 14/18] remove: unneccesary comments Signed-off-by: yeonsoo --- internal/informer/informer.go | 1 - internal/manager/application/application_test.go | 7 ------- 2 files changed, 8 deletions(-) diff --git a/internal/informer/informer.go b/internal/informer/informer.go index 022563d0..aa3809fb 100644 --- a/internal/informer/informer.go +++ b/internal/informer/informer.go @@ -114,7 +114,6 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T var r T i.resType = reflect.TypeOf(r) - // groupResource is the group and resource of the watched objects. i.groupResource = schema.GroupResource{ Group: "argoproj.io", Resource: "applications", diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index 81152fcb..63e40933 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -66,7 +66,6 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f } }() - // cache.WaitForCacheSync(context.Background().Done(), informer.HasSynced) if err = informer.WaitForSync(context.Background()); err != nil { t.Fatalf("failed to wait for informer sync: %v", err) } @@ -82,7 +81,6 @@ func fakeAppManager(t *testing.T, objects ...runtime.Object) (*fakeappclient.Cli am, err := NewApplicationManager(be, "argocd") assert.NoError(t, err) - // go am.StartBackend(context.Background()) return appC, am } @@ -223,11 +221,6 @@ func Test_ManagerUpdateManaged(t *testing.T) { mgr, err := NewApplicationManager(be, "argocd", WithMode(manager.ManagerModeManaged), WithRole(manager.ManagerRoleAgent)) require.NoError(t, err) - // ctx, cancel := context.WithCancel(context.Background()) - // defer cancel() - // go mgr.StartBackend(ctx) - // cache.WaitForCacheSync(ctx.Done(), ai.HasSynced) - updated, err := mgr.UpdateManagedApp(context.Background(), incoming) require.NoError(t, err) From 16a082d5d159bcd2a3137280cfba195a62353631 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Thu, 9 Oct 2025 03:00:52 +0900 Subject: [PATCH 15/18] feat: remove informer sync timeout configuration Signed-off-by: yeonsoo --- principal/event_test.go | 3 --- principal/listen_test.go | 1 - principal/options.go | 22 ++++++---------------- principal/options_test.go | 8 -------- principal/server.go | 13 ++++--------- principal/server_test.go | 16 ---------------- 6 files changed, 10 insertions(+), 53 deletions(-) diff --git a/principal/event_test.go b/principal/event_test.go index 78c8daa7..d38301c0 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -253,7 +253,6 @@ func Test_CreateEvents(t *testing.T) { s, err := NewServer(ctx, fac, "argocd", WithGeneratedTokenSigningKey(), - WithInformerSyncTimeout(5*time.Second), WithRedisProxyDisabled(), ) require.NoError(t, err) @@ -348,7 +347,6 @@ func Test_UpdateEvents(t *testing.T) { s, err := NewServer(ctx, fac, "argocd", WithGeneratedTokenSigningKey(), - WithInformerSyncTimeout(2*time.Second), WithRedisProxyDisabled(), ) require.NoError(t, err) @@ -483,7 +481,6 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { s, err := NewServer(ctx, fac, "argocd", WithGeneratedTokenSigningKey(), - WithInformerSyncTimeout(2*time.Second), ) require.NoError(t, err) diff --git a/principal/listen_test.go b/principal/listen_test.go index a019e607..93a83092 100644 --- a/principal/listen_test.go +++ b/principal/listen_test.go @@ -154,7 +154,6 @@ func Test_Serve(t *testing.T) { WithListenerAddress("127.0.0.1"), WithShutDownGracePeriod(2*time.Second), WithGRPC(true), - WithInformerSyncTimeout(5*time.Second), WithRedisProxyDisabled(), ) require.NoError(t, err) diff --git a/principal/options.go b/principal/options.go index d27c03ac..a16aaed7 100644 --- a/principal/options.go +++ b/principal/options.go @@ -75,7 +75,6 @@ type ServerOptions struct { redisPassword string redisCompressionType cacheutil.RedisCompressionType healthzPort int - informerSyncTimeout time.Duration redisProxyDisabled bool } @@ -84,13 +83,12 @@ type ServerOption func(o *Server) error // defaultOptions returns a set of default options for the server func defaultOptions() *ServerOptions { return &ServerOptions{ - port: 443, - address: "", - tlsMinVersion: tls.VersionTLS13, - unauthMethods: make(map[string]bool), - eventProcessors: 10, - rootCa: x509.NewCertPool(), - informerSyncTimeout: 60 * time.Second, + port: 443, + address: "", + tlsMinVersion: tls.VersionTLS13, + unauthMethods: make(map[string]bool), + eventProcessors: 10, + rootCa: x509.NewCertPool(), } } @@ -418,14 +416,6 @@ func WithWebSocket(enableWebSocket bool) ServerOption { } } -// WithInformerSyncTimeout sets the informer sync timeout duration. -func WithInformerSyncTimeout(timeout time.Duration) ServerOption { - return func(o *Server) error { - o.options.informerSyncTimeout = timeout - return nil - } -} - // WithRedisProxyDisabled disables the Redis proxy for testing. func WithRedisProxyDisabled() ServerOption { return func(o *Server) error { diff --git a/principal/options_test.go b/principal/options_test.go index dd570a5c..9f26691e 100644 --- a/principal/options_test.go +++ b/principal/options_test.go @@ -17,7 +17,6 @@ package principal import ( "crypto/tls" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -92,13 +91,6 @@ func Test_WithMinimumTLSVersion(t *testing.T) { }) } -func Test_WithInformerSyncTimeout(t *testing.T) { - s := &Server{options: &ServerOptions{}} - err := WithInformerSyncTimeout(5 * time.Second)(s) - assert.NoError(t, err) - assert.Equal(t, 5*time.Second, s.options.informerSyncTimeout) -} - func Test_WithRedisProxyDisabled(t *testing.T) { s := &Server{options: &ServerOptions{}} err := WithRedisProxyDisabled()(s) diff --git a/principal/server.go b/principal/server.go index 6aadb07e..d8183dfe 100644 --- a/principal/server.go +++ b/principal/server.go @@ -510,22 +510,17 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { s.events = event.NewEventSource(s.options.serverName) - syncTimeout := s.options.informerSyncTimeout - if syncTimeout == 0 { - syncTimeout = waitForSyncedDuration - } - - if err := s.appManager.EnsureSynced(syncTimeout); err != nil { + if err := s.appManager.EnsureSynced(waitForSyncedDuration); err != nil { return fmt.Errorf("unable to sync Application informer: %w", err) } log().Infof("Application informer synced and ready") - if err := s.projectManager.EnsureSynced(syncTimeout); err != nil { + if err := s.projectManager.EnsureSynced(waitForSyncedDuration); err != nil { return fmt.Errorf("unable to sync AppProject informer: %w", err) } log().Infof("AppProject informer synced and ready") - if err := s.repoManager.EnsureSynced(syncTimeout); err != nil { + if err := s.repoManager.EnsureSynced(waitForSyncedDuration); err != nil { return fmt.Errorf("unable to sync Repository informer: %w", err) } log().Infof("Repository informer synced and ready") @@ -545,7 +540,7 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { if err != nil { return err } - if err := s.namespaceManager.EnsureSynced(syncTimeout); err != nil { + if err := s.namespaceManager.EnsureSynced(waitForSyncedDuration); err != nil { return fmt.Errorf("unable to sync Namespace informer: %w", err) } log().Infof("Namespace informer synced and ready") diff --git a/principal/server_test.go b/principal/server_test.go index 8ebbd2ba..b2f22c3f 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -113,20 +113,6 @@ func Test_NewServer(t *testing.T) { assert.NotNil(t, s) assert.NotNil(t, s.redisProxy) }) - - t.Run("Informer sync timeout should be configurable", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(10*time.Second), WithRedisProxyDisabled()) - assert.NoError(t, err) - assert.NotNil(t, s) - assert.Equal(t, 10*time.Second, s.options.informerSyncTimeout) - }) - - t.Run("Informer sync timeout should default to 60s when not set", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) - assert.NoError(t, err) - assert.NotNil(t, s) - assert.Equal(t, 60*time.Second, s.options.informerSyncTimeout) - }) } func Test_handleResyncOnConnect(t *testing.T) { @@ -226,8 +212,6 @@ func Test_ServerStartWithDefaultSyncTimeout(t *testing.T) { require.NoError(t, err) defer s.Shutdown() - - assert.Equal(t, 60*time.Second, s.options.informerSyncTimeout) } func init() { From 9864fe1096d267300dc93b1ffc9e8067a111ea6d Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Thu, 9 Oct 2025 12:59:23 +0900 Subject: [PATCH 16/18] fix: syntax error resolve Signed-off-by: yeonsoo --- principal/options.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/principal/options.go b/principal/options.go index 155e39c1..f9164fa8 100644 --- a/principal/options.go +++ b/principal/options.go @@ -422,6 +422,10 @@ func WithWebSocket(enableWebSocket bool) ServerOption { func WithRedisProxyDisabled() ServerOption { return func(o *Server) error { o.options.redisProxyDisabled = true + return nil + } +} + // WithInformerSyncTimeout sets the informer sync timeout duration. func WithInformerSyncTimeout(timeout time.Duration) ServerOption { return func(o *Server) error { From 66231cc686bc9e538c01898a66a2c4844569bf34 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Thu, 9 Oct 2025 13:11:28 +0900 Subject: [PATCH 17/18] fix: syntax error resolve Signed-off-by: yeonsoo --- principal/server_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/principal/server_test.go b/principal/server_test.go index 3b98db6a..784eebdf 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -112,6 +112,8 @@ func Test_NewServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, s) assert.NotNil(t, s.redisProxy) + }) + t.Run("Informer sync timeout should be configurable", func(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(10*time.Second)) assert.NoError(t, err) From 693ab36a8ec4dec605d19147383a6813bab829d8 Mon Sep 17 00:00:00 2001 From: yeonsoo Date: Thu, 9 Oct 2025 21:39:16 +0900 Subject: [PATCH 18/18] fix: add WithGroupResource option to informer Lister() for all resources Signed-off-by: yeonsoo --- agent/agent.go | 3 +++ internal/argocd/cluster/manager.go | 1 + .../kubernetes/application/kubernetes_test.go | 2 ++ internal/informer/informer.go | 5 ----- internal/informer/options.go | 12 ++++++++++++ internal/manager/application/application_test.go | 1 + internal/manager/appproject/appproject_test.go | 1 + principal/server.go | 4 ++++ 8 files changed, 24 insertions(+), 5 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index b5e21542..87bc6b80 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -186,6 +186,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithDeleteHandler[*v1alpha1.Application](a.addAppDeletionToQueue), informer.WithFilters[*v1alpha1.Application](a.DefaultAppFilterChain()), informer.WithNamespaceScope[*v1alpha1.Application](a.namespace), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), } appProjectManagerOption := []appproject.AppProjectManagerOption{ @@ -227,6 +228,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithAddHandler[*v1alpha1.AppProject](a.addAppProjectCreationToQueue), informer.WithUpdateHandler[*v1alpha1.AppProject](a.addAppProjectUpdateToQueue), informer.WithDeleteHandler[*v1alpha1.AppProject](a.addAppProjectDeletionToQueue), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), } projInformer, err := informer.NewInformer(ctx, projInformerOptions...) @@ -263,6 +265,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate), informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion), informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)), + informer.WithGroupResource[*corev1.Secret]("", "secrets"), } repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...) diff --git a/internal/argocd/cluster/manager.go b/internal/argocd/cluster/manager.go index 07976d82..92b51132 100644 --- a/internal/argocd/cluster/manager.go +++ b/internal/argocd/cluster/manager.go @@ -108,6 +108,7 @@ func NewManager(ctx context.Context, namespace, redisAddress, redisPassword stri informer.WithUpdateHandler(m.onClusterUpdated), informer.WithDeleteHandler(m.onClusterDeleted), informer.WithFilters(m.filters), + informer.WithGroupResource[*v1.Secret]("", "secrets"), ) if err != nil { return nil, err diff --git a/internal/backend/kubernetes/application/kubernetes_test.go b/internal/backend/kubernetes/application/kubernetes_test.go index 45a6345c..89e177be 100644 --- a/internal/backend/kubernetes/application/kubernetes_test.go +++ b/internal/backend/kubernetes/application/kubernetes_test.go @@ -127,6 +127,7 @@ func Test_Get(t *testing.T) { informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) }), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), ) require.NoError(t, err) @@ -153,6 +154,7 @@ func Test_Get(t *testing.T) { informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) }), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), ) require.NoError(t, err) go inf.Start(ctx) diff --git a/internal/informer/informer.go b/internal/informer/informer.go index aa3809fb..86798a29 100644 --- a/internal/informer/informer.go +++ b/internal/informer/informer.go @@ -114,11 +114,6 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T var r T i.resType = reflect.TypeOf(r) - i.groupResource = schema.GroupResource{ - Group: "argoproj.io", - Resource: "applications", - } - i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{ "type": i.resType, "module": "Informer", diff --git a/internal/informer/options.go b/internal/informer/options.go index 7ba63596..d363e510 100644 --- a/internal/informer/options.go +++ b/internal/informer/options.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" ) @@ -105,3 +106,14 @@ func WithResyncPeriod[T runtime.Object](d time.Duration) InformerOption[T] { return nil } } + +// WithGroupResource sets the group and resource for the informer's lister. +func WithGroupResource[T runtime.Object](group, resource string) InformerOption[T] { + return func(i *Informer[T]) error { + i.groupResource = schema.GroupResource{ + Group: group, + Resource: resource, + } + return nil + } +} diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index 63e40933..99a64f56 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -56,6 +56,7 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f return appC.ArgoprojV1alpha1().Applications(namespace).Watch(ctx, opts) }), informer.WithNamespaceScope[*v1alpha1.Application](namespace), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), ) require.NoError(t, err) diff --git a/internal/manager/appproject/appproject_test.go b/internal/manager/appproject/appproject_test.go index c79d420f..8a092fa2 100644 --- a/internal/manager/appproject/appproject_test.go +++ b/internal/manager/appproject/appproject_test.go @@ -49,6 +49,7 @@ func fakeProjManager(t *testing.T, namespace string, objects ...runtime.Object) informer.WithWatchHandler[*v1alpha1.AppProject](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { return client.ArgoprojV1alpha1().AppProjects(namespace).Watch(ctx, opts) }), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), ) assert.NoError(t, err) diff --git a/principal/server.go b/principal/server.go index 51b3dc15..020f4757 100644 --- a/principal/server.go +++ b/principal/server.go @@ -234,6 +234,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithUpdateHandler[*v1alpha1.Application](s.updateAppCallback), informer.WithDeleteHandler[*v1alpha1.Application](s.deleteAppCallback), informer.WithFilters[*v1alpha1.Application](appFilters), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), } appManagerOpts := []application.ApplicationManagerOption{ @@ -251,6 +252,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithAddHandler[*v1alpha1.AppProject](s.newAppProjectCallback), informer.WithUpdateHandler[*v1alpha1.AppProject](s.updateAppProjectCallback), informer.WithDeleteHandler[*v1alpha1.AppProject](s.deleteAppProjectCallback), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), } projManagerOpts := []appproject.AppProjectManagerOption{ @@ -297,6 +299,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace return kubeClient.Clientset.CoreV1().Namespaces().Watch(ctx, opts) }), informer.WithDeleteHandler[*corev1.Namespace](s.deleteNamespaceCallback), + informer.WithGroupResource[*corev1.Namespace]("", "namespaces"), } nsInformer, err := informer.NewInformer(ctx, nsInformerOpts...) @@ -316,6 +319,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithUpdateHandler[*corev1.Secret](s.updateRepositoryCallback), informer.WithDeleteHandler[*corev1.Secret](s.deleteRepositoryCallback), informer.WithFilters(kuberepository.DefaultFilterChain(s.namespace)), + informer.WithGroupResource[*corev1.Secret]("", "secrets"), } repoInformer, err := informer.NewInformer(ctx, repoInformerOpts...)