diff --git a/agent/agent.go b/agent/agent.go index b5e21542..87bc6b80 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -186,6 +186,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithDeleteHandler[*v1alpha1.Application](a.addAppDeletionToQueue), informer.WithFilters[*v1alpha1.Application](a.DefaultAppFilterChain()), informer.WithNamespaceScope[*v1alpha1.Application](a.namespace), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), } appProjectManagerOption := []appproject.AppProjectManagerOption{ @@ -227,6 +228,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithAddHandler[*v1alpha1.AppProject](a.addAppProjectCreationToQueue), informer.WithUpdateHandler[*v1alpha1.AppProject](a.addAppProjectUpdateToQueue), informer.WithDeleteHandler[*v1alpha1.AppProject](a.addAppProjectDeletionToQueue), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), } projInformer, err := informer.NewInformer(ctx, projInformerOptions...) @@ -263,6 +265,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate), informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion), informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)), + informer.WithGroupResource[*corev1.Secret]("", "secrets"), } repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...) diff --git a/internal/argocd/cluster/manager.go b/internal/argocd/cluster/manager.go index 07976d82..92b51132 100644 --- a/internal/argocd/cluster/manager.go +++ b/internal/argocd/cluster/manager.go @@ -108,6 +108,7 @@ func NewManager(ctx context.Context, namespace, redisAddress, redisPassword stri informer.WithUpdateHandler(m.onClusterUpdated), informer.WithDeleteHandler(m.onClusterDeleted), informer.WithFilters(m.filters), + informer.WithGroupResource[*v1.Secret]("", "secrets"), ) if err != nil { return nil, err diff --git a/internal/backend/interface.go b/internal/backend/interface.go index f3d35fe1..e694e291 100644 --- a/internal/backend/interface.go +++ b/internal/backend/interface.go @@ -25,6 +25,10 @@ import ( corev1 "k8s.io/api/core/v1" ) +type ContextKey string + +const ForUpdateContextKey ContextKey = "forUpdate" + type ApplicationSelector struct { // Labels is not currently implemented. diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index b7905cae..58fbb069 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -30,6 +30,7 @@ import ( appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" ) var _ backend.Application = &KubernetesBackend{} @@ -43,18 +44,24 @@ type KubernetesBackend struct { appClient appclientset.Interface // appInformer is used to watch for change events for Argo CD Application resources on the cluster appInformer informer.InformerInterface + // appLister is used to list Argo CD Application resources from the cache + appLister cache.GenericLister // namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases namespace string usePatch bool } func NewKubernetesBackend(appClient appclientset.Interface, namespace string, appInformer informer.InformerInterface, usePatch bool) *KubernetesBackend { - return &KubernetesBackend{ + be := &KubernetesBackend{ appClient: appClient, appInformer: appInformer, usePatch: usePatch, namespace: namespace, } + if specificInformer, ok := appInformer.(*informer.Informer[*v1alpha1.Application]); ok { + be.appLister = specificInformer.Lister() + } + return be } func (be *KubernetesBackend) List(ctx context.Context, selector backend.ApplicationSelector) ([]v1alpha1.Application, error) { @@ -82,6 +89,23 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati } func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) { + forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool) + + if !forUpdate && be.appLister != nil && be.appInformer != nil && be.appInformer.HasSynced() { + namespaceLister := be.appLister.ByNamespace(namespace) + if namespaceLister != nil { + obj, err := namespaceLister.Get(name) + if err != nil { + return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) + } + app, ok := obj.(*v1alpha1.Application) + if !ok { + return nil, fmt.Errorf("object is not an Application: %T", obj) + } + return app.DeepCopy(), nil + } + } + return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{}) } diff --git a/internal/backend/kubernetes/application/kubernetes_test.go b/internal/backend/kubernetes/application/kubernetes_test.go index aed06fc0..89e177be 100644 --- a/internal/backend/kubernetes/application/kubernetes_test.go +++ b/internal/backend/kubernetes/application/kubernetes_test.go @@ -21,13 +21,18 @@ import ( "testing" "github.com/argoproj-labs/argocd-agent/internal/backend" + "github.com/argoproj-labs/argocd-agent/internal/informer" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/wI2L/jsondiff" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" ) func Test_NewKubernetes(t *testing.T) { @@ -110,22 +115,123 @@ func Test_Create(t *testing.T) { func Test_Get(t *testing.T) { apps := mkApps() + ctx := context.TODO() t.Run("Get existing app", func(t *testing.T) { fakeAppC := fakeappclient.NewSimpleClientset(apps...) - k := NewKubernetesBackend(fakeAppC, "", nil, true) - app, err := k.Get(context.TODO(), "app", "ns1") + + inf, err := informer.NewInformer[*v1alpha1.Application]( + ctx, + informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options) + }), + informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) + }), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), + ) + require.NoError(t, err) + + go inf.Start(ctx) + require.NoError(t, inf.WaitForSync(ctx)) + + // Create the backend with the informer + backend := NewKubernetesBackend(fakeAppC, "", inf, true) + + app, err := backend.Get(ctx, "app", "ns1") assert.NoError(t, err) assert.NotNil(t, app) + assert.Equal(t, "app", app.Name) + assert.Equal(t, "ns1", app.Namespace) + }) t.Run("Get non-existing app", func(t *testing.T) { fakeAppC := fakeappclient.NewSimpleClientset(apps...) - k := NewKubernetesBackend(fakeAppC, "", nil, true) - app, err := k.Get(context.TODO(), "foo", "ns1") + inf, err := informer.NewInformer[*v1alpha1.Application]( + ctx, + informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options) + }), + informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { + return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options) + }), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), + ) + require.NoError(t, err) + go inf.Start(ctx) + require.NoError(t, inf.WaitForSync(ctx)) + + backend := NewKubernetesBackend(fakeAppC, "", inf, true) + + app, err := backend.Get(ctx, "nonexistent", "ns1") assert.ErrorContains(t, err, "not found") assert.Equal(t, &v1alpha1.Application{}, app) + + }) + + t.Run("Get returns type assertion error for invalid object", func(t *testing.T) { + fakeAppC := fakeappclient.NewSimpleClientset() + + mockInf := &mockInformerWithInvalidType{} + + backend := &KubernetesBackend{ + appClient: fakeAppC, + appInformer: mockInf, + appLister: mockInf.Lister(), + } + + app, err := backend.Get(ctx, "test", "ns1") + require.Error(t, err) + require.Nil(t, app) + assert.Contains(t, err.Error(), "object is not an Application") }) } +type mockInformerWithInvalidType struct{} + +func (m *mockInformerWithInvalidType) Start(ctx context.Context) error { + return nil +} + +func (m *mockInformerWithInvalidType) WaitForSync(ctx context.Context) error { + return nil +} + +func (m *mockInformerWithInvalidType) HasSynced() bool { + return true +} + +func (m *mockInformerWithInvalidType) Stop() error { + return nil +} + +func (m *mockInformerWithInvalidType) Lister() cache.GenericLister { + return &mockListerWithInvalidType{} +} + +type mockListerWithInvalidType struct{} + +func (m *mockListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) { + return nil, nil +} + +func (m *mockListerWithInvalidType) Get(name string) (runtime.Object, error) { + return &corev1.ConfigMap{}, nil +} + +func (m *mockListerWithInvalidType) ByNamespace(namespace string) cache.GenericNamespaceLister { + return &mockNamespaceListerWithInvalidType{} +} + +type mockNamespaceListerWithInvalidType struct{} + +func (m *mockNamespaceListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) { + return nil, nil +} + +func (m *mockNamespaceListerWithInvalidType) Get(name string) (runtime.Object, error) { + return &corev1.ConfigMap{}, nil +} + func Test_Delete(t *testing.T) { apps := mkApps() t.Run("Delete existing app", func(t *testing.T) { diff --git a/internal/informer/informer.go b/internal/informer/informer.go index 31799a33..86798a29 100644 --- a/internal/informer/informer.go +++ b/internal/informer/informer.go @@ -28,6 +28,7 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) @@ -57,6 +58,9 @@ type Informer[T runtime.Object] struct { resType reflect.Type + // groupResource is the group and resource of the watched objects. + groupResource schema.GroupResource + // logger is this informer's logger. logger *logrus.Entry @@ -109,6 +113,7 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T i := &Informer[T]{} var r T i.resType = reflect.TypeOf(r) + i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{ "type": i.resType, "module": "Informer", @@ -287,3 +292,8 @@ func (i *Informer[T]) WaitForSync(ctx context.Context) error { } return nil } + +// Lister returns a GenericLister that can be used to list and get cached resources. +func (i *Informer[T]) Lister() cache.GenericLister { + return cache.NewGenericLister(i.informer.GetIndexer(), i.groupResource) +} diff --git a/internal/informer/informer_test.go b/internal/informer/informer_test.go index 39a7cdb6..e13e230e 100644 --- a/internal/informer/informer_test.go +++ b/internal/informer/informer_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" ) @@ -233,6 +234,39 @@ func Test_InformerScope(t *testing.T) { } +func Test_Lister(t *testing.T) { + t.Run("Lister returns GenericLister", func(t *testing.T) { + i := newInformer(t, "", apps[0], apps[1]) + go i.Start(context.TODO()) + require.NoError(t, i.WaitForSync(context.TODO())) + defer i.Stop() + + lister := i.Lister() + require.NotNil(t, lister) + + obj, err := lister.ByNamespace("argocd").Get("test1") + require.NoError(t, err) + require.NotNil(t, obj) + + app, ok := obj.(*v1alpha1.Application) + require.True(t, ok) + assert.Equal(t, "test1", app.Name) + assert.Equal(t, "argocd", app.Namespace) + }) + + t.Run("Lister can list objects", func(t *testing.T) { + i := newInformer(t, "", apps[0], apps[1]) + go i.Start(context.TODO()) + require.NoError(t, i.WaitForSync(context.TODO())) + defer i.Stop() + + lister := i.Lister() + objs, err := lister.List(labels.Everything()) + require.NoError(t, err) + assert.Len(t, objs, 2) + }) +} + func init() { logrus.SetLevel(logrus.TraceLevel) } diff --git a/internal/informer/options.go b/internal/informer/options.go index 7ba63596..d363e510 100644 --- a/internal/informer/options.go +++ b/internal/informer/options.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" ) @@ -105,3 +106,14 @@ func WithResyncPeriod[T runtime.Object](d time.Duration) InformerOption[T] { return nil } } + +// WithGroupResource sets the group and resource for the informer's lister. +func WithGroupResource[T runtime.Object](group, resource string) InformerOption[T] { + return func(i *Informer[T]) error { + i.groupResource = schema.GroupResource{ + Group: group, + Resource: resource, + } + return nil + } +} diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 38177954..25007c06 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -555,8 +555,14 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom // be returned. func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) { var updated *v1alpha1.Application + + if ctx == nil { + ctx = context.Background() + } + ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true) + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - existing, ierr := m.applicationBackend.Get(ctx, incoming.Name, incoming.Namespace) + existing, ierr := m.applicationBackend.Get(ctxForUpdate, incoming.Name, incoming.Namespace) if ierr != nil { if errors.IsNotFound(ierr) && upsert { updated, ierr = m.Create(ctx, incoming) diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index b40d4a90..99a64f56 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -56,8 +56,21 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f return appC.ArgoprojV1alpha1().Applications(namespace).Watch(ctx, opts) }), informer.WithNamespaceScope[*v1alpha1.Application](namespace), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), ) require.NoError(t, err) + + go func() { + err = informer.Start(context.Background()) + if err != nil { + t.Fatalf("failed to start informer: %v", err) + } + }() + + if err = informer.WaitForSync(context.Background()); err != nil { + t.Fatalf("failed to wait for informer sync: %v", err) + } + return appC, informer } @@ -210,6 +223,7 @@ func Test_ManagerUpdateManaged(t *testing.T) { require.NoError(t, err) updated, err := mgr.UpdateManagedApp(context.Background(), incoming) + require.NoError(t, err) require.NotNil(t, updated) diff --git a/internal/manager/appproject/appproject_test.go b/internal/manager/appproject/appproject_test.go index c79d420f..8a092fa2 100644 --- a/internal/manager/appproject/appproject_test.go +++ b/internal/manager/appproject/appproject_test.go @@ -49,6 +49,7 @@ func fakeProjManager(t *testing.T, namespace string, objects ...runtime.Object) informer.WithWatchHandler[*v1alpha1.AppProject](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { return client.ArgoprojV1alpha1().AppProjects(namespace).Watch(ctx, opts) }), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), ) assert.NoError(t, err) diff --git a/principal/event_test.go b/principal/event_test.go index 1dd16526..d38301c0 100644 --- a/principal/event_test.go +++ b/principal/event_test.go @@ -43,7 +43,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unknown target") @@ -57,7 +57,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "unable to process event of type application") @@ -72,7 +72,7 @@ func Test_InvalidEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorContains(t, err, "failed to unmarshal") @@ -88,7 +88,7 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) assert.ErrorIs(t, err, event.ErrEventDiscarded) @@ -127,7 +127,8 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) + s.Start(context.Background(), make(chan error)) s.clusterMgr.MapCluster("argocd", &v1alpha1.Cluster{Name: "argocd", Server: "https://argocd.com"}) require.NoError(t, err) s.setAgentMode("argocd", types.AgentModeAutonomous) @@ -180,7 +181,7 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -246,14 +247,29 @@ func Test_CreateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), + ) require.NoError(t, err) + + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) + require.NoError(t, err) + s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) s.setAgentMode("foo", types.AgentModeAutonomous) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(ctx, "foo", wq) assert.Nil(t, err) require.Equal(t, ev, *got) - napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) + napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(ctx, "test", v1.GetOptions{}) assert.NoError(t, err) require.NotNil(t, napp) assert.Empty(t, napp.OwnerReferences) @@ -325,14 +341,29 @@ func Test_UpdateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), + ) require.NoError(t, err) + + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) + require.NoError(t, err) + s.setAgentMode("foo", types.AgentModeAutonomous) s.clusterMgr.MapCluster("foo", &v1alpha1.Cluster{Name: "foo", Server: "https://foo.com"}) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + got, err := s.processRecvQueue(ctx, "foo", wq) require.Equal(t, ev, *got) assert.NoError(t, err) - napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{}) + napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(ctx, "test", v1.GetOptions{}) assert.NoError(t, err) require.NotNil(t, napp) assert.Equal(t, "HEAD", napp.Spec.Source.TargetRevision) @@ -376,7 +407,7 @@ func Test_UpdateEvents(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -409,11 +440,9 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { t.Run(test.name, func(t *testing.T) { delApp := &v1alpha1.Application{ ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "foo", - Finalizers: []string{ - "test-finalizers", - }, + Name: "test", + Namespace: "foo", + Finalizers: []string{}, }, Spec: v1alpha1.ApplicationSpec{ Source: &v1alpha1.ApplicationSource{ @@ -436,7 +465,9 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { delApp.DeletionTimestamp = ptr.To(v1.Time{Time: time.Now()}) } - fac := kube.NewKubernetesFakeClientWithApps("argocd") + // Create fake client with the application already in it + fac := kube.NewKubernetesFakeClientWithApps("argocd", delApp) + ev := cloudevents.NewEvent() ev.SetDataSchema("application") ev.SetType(event.Delete.String()) @@ -444,30 +475,49 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s, err := NewServer(ctx, fac, "argocd", + WithGeneratedTokenSigningKey(), + ) + require.NoError(t, err) + + defer func() { + _ = s.Shutdown() + }() + + err = s.Start(ctx, make(chan error)) require.NoError(t, err) + s.setAgentMode("foo", types.AgentModeManaged) - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Create(context.Background(), delApp, v1.CreateOptions{}) + var cachedApp *v1alpha1.Application + for i := 0; i < 20; i++ { + cachedApp, err = s.appManager.Get(ctx, delApp.Name, delApp.Namespace) + if err == nil { + break + } + time.Sleep(50 * time.Millisecond) + } + require.NoError(t, err) - got, err := s.processRecvQueue(context.Background(), "foo", wq) + if test.deletionTimestampSetOnPrincipal { + require.NotNil(t, cachedApp.DeletionTimestamp) + } + + got, err := s.processRecvQueue(ctx, "foo", wq) require.NoError(t, err) if test.deletionTimestampSetOnPrincipal { - // If deletionTimestamp is set on principal, the Application should be removed by the call require.NoError(t, err) require.Equal(t, ev, *got) - - // Verify Application is deleted - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(context.Background(), delApp.Name, v1.GetOptions{}) + _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(ctx, delApp.Name, v1.GetOptions{}) require.True(t, apierrors.IsNotFound(err)) - } else { - // If deletionTimestamp is NOT set on principal, the Application should NOT be removed by the call require.NoError(t, err) - - // Verify Application is NOT deleted - _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(context.Background(), delApp.Name, v1.GetOptions{}) + _, err = fac.ApplicationsClientset.ArgoprojV1alpha1().Applications(delApp.Namespace).Get(ctx, delApp.Name, v1.GetOptions{}) require.NoError(t, err) } }) @@ -478,7 +528,7 @@ func Test_DeleteEvents_ManagedMode(t *testing.T) { func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Namespace creation is not enabled", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.False(t, created) @@ -486,7 +536,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { }) t.Run("Pattern matches and namespace is created", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a-z]+$", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a-z]+$", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.True(t, created) @@ -495,7 +545,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Pattern does not match and namespace is not created", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a]+$", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "^[a]+$", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.False(t, created) @@ -504,7 +554,7 @@ func Test_createNamespaceIfNotExists(t *testing.T) { t.Run("Namespace is created only once", func(t *testing.T) { fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil)) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithAutoNamespaceCreate(true, "", nil), WithRedisProxyDisabled()) require.NoError(t, err) created, err := s.createNamespaceIfNotExist(context.TODO(), "test") assert.True(t, created) @@ -522,7 +572,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), kube.NewKubernetesFakeClientWithApps("argocd"), "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) got, err := s.processRecvQueue(context.Background(), "foo", wq) require.Equal(t, ev, *got) @@ -568,7 +618,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -629,7 +679,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeAutonomous) @@ -673,7 +723,7 @@ func Test_processAppProjectEvent(t *testing.T) { wq := wqmock.NewTypedRateLimitingInterface[*cloudevents.Event](t) wq.On("Get").Return(&ev, false) wq.On("Done", &ev) - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode("foo", types.AgentModeManaged) got, err := s.processRecvQueue(context.Background(), "foo", wq) @@ -748,7 +798,7 @@ func Test_processIncomingResourceResyncEvent(t *testing.T) { fakeClient := kube.NewKubernetesFakeClientWithApps(agentName) fakeClient.RestConfig = &rest.Config{} - s, err := NewServer(ctx, fakeClient, agentName, WithGeneratedTokenSigningKey()) + s, err := NewServer(ctx, fakeClient, agentName, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.Nil(t, err) err = s.queues.Create(agentName) @@ -880,7 +930,7 @@ func Test_processClusterCacheInfoUpdateEvent(t *testing.T) { // Create a server with fake client fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode(agentName, types.AgentModeAutonomous) @@ -908,7 +958,7 @@ func Test_processClusterCacheInfoUpdateEvent(t *testing.T) { // Create a server with fake client fac := kube.NewKubernetesFakeClientWithApps("argocd") - s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey()) + s, err := NewServer(context.Background(), fac, "argocd", WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) require.NoError(t, err) s.setAgentMode(agentName, types.AgentModeAutonomous) diff --git a/principal/listen_test.go b/principal/listen_test.go index ff6ecc50..44bcdc37 100644 --- a/principal/listen_test.go +++ b/principal/listen_test.go @@ -144,20 +144,23 @@ func Test_Serve(t *testing.T) { templ := certTempl fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ) - // We start a real (non-mocked) server - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s, err := NewServer(ctx, kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")), WithGeneratedTokenSigningKey(), WithListenerPort(0), WithListenerAddress("127.0.0.1"), WithShutDownGracePeriod(2*time.Second), WithGRPC(true), + WithRedisProxyDisabled(), WithInformerSyncTimeout(5*time.Second), ) require.NoError(t, err) errch := make(chan error) - err = s.Start(context.Background(), errch) + err = s.Start(ctx, errch) require.NoError(t, err) // Create and register authentication method diff --git a/principal/server.go b/principal/server.go index 51b3dc15..020f4757 100644 --- a/principal/server.go +++ b/principal/server.go @@ -234,6 +234,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithUpdateHandler[*v1alpha1.Application](s.updateAppCallback), informer.WithDeleteHandler[*v1alpha1.Application](s.deleteAppCallback), informer.WithFilters[*v1alpha1.Application](appFilters), + informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"), } appManagerOpts := []application.ApplicationManagerOption{ @@ -251,6 +252,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithAddHandler[*v1alpha1.AppProject](s.newAppProjectCallback), informer.WithUpdateHandler[*v1alpha1.AppProject](s.updateAppProjectCallback), informer.WithDeleteHandler[*v1alpha1.AppProject](s.deleteAppProjectCallback), + informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"), } projManagerOpts := []appproject.AppProjectManagerOption{ @@ -297,6 +299,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace return kubeClient.Clientset.CoreV1().Namespaces().Watch(ctx, opts) }), informer.WithDeleteHandler[*corev1.Namespace](s.deleteNamespaceCallback), + informer.WithGroupResource[*corev1.Namespace]("", "namespaces"), } nsInformer, err := informer.NewInformer(ctx, nsInformerOpts...) @@ -316,6 +319,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace informer.WithUpdateHandler[*corev1.Secret](s.updateRepositoryCallback), informer.WithDeleteHandler[*corev1.Secret](s.deleteRepositoryCallback), informer.WithFilters(kuberepository.DefaultFilterChain(s.namespace)), + informer.WithGroupResource[*corev1.Secret]("", "secrets"), } repoInformer, err := informer.NewInformer(ctx, repoInformerOpts...) diff --git a/principal/server_test.go b/principal/server_test.go index a2c4421e..784eebdf 100644 --- a/principal/server_test.go +++ b/principal/server_test.go @@ -52,6 +52,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) tlsConfig, err := s.loadTLSConfig() @@ -62,6 +63,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath(path.Join(tempDir, "other-cert.crt"), path.Join(tempDir, "other-cert.key")), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) tlsConfig, err := s.loadTLSConfig() @@ -73,6 +75,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithTLSKeyPairFromPath("server_test.go", "server_test.go"), WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) require.NotNil(t, s) @@ -84,7 +87,7 @@ func Test_ServerWithTLSConfig(t *testing.T) { func Test_NewServer(t *testing.T) { t.Run("Instantiate new server object with non-default options", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey()) + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.NoError(t, err) assert.NotNil(t, s) assert.NotEqual(t, defaultOptions(), s.options) @@ -92,11 +95,25 @@ func Test_NewServer(t *testing.T) { }) t.Run("Instantiate new server object with invalid option", func(t *testing.T) { - s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey()) + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) assert.Error(t, err) assert.Nil(t, s) }) + t.Run("Redis proxy should be nil when disabled", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithRedisProxyDisabled()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Nil(t, s.redisProxy) + }) + + t.Run("Redis proxy should be created when not disabled", func(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey()) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.NotNil(t, s.redisProxy) + }) + t.Run("Informer sync timeout should be configurable", func(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), WithInformerSyncTimeout(10*time.Second)) assert.NoError(t, err) @@ -115,6 +132,7 @@ func Test_NewServer(t *testing.T) { func Test_handleResyncOnConnect(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) s.kubeClient.RestConfig = &rest.Config{} require.NoError(t, err) @@ -167,6 +185,7 @@ func Test_handleResyncOnConnect(t *testing.T) { func Test_RunHandlersOnConnect(t *testing.T) { s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), ) require.NoError(t, err) s.events = event.NewEventSource("test") @@ -191,6 +210,24 @@ func Test_RunHandlersOnConnect(t *testing.T) { assert.Equal(t, expected, got) } +func Test_ServerStartWithDefaultSyncTimeout(t *testing.T) { + s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClientWithApps(testNamespace), testNamespace, + WithGeneratedTokenSigningKey(), + WithRedisProxyDisabled(), + ) + require.NoError(t, err) + s.kubeClient.RestConfig = &rest.Config{} + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + errch := make(chan error, 1) + err = s.Start(ctx, errch) + require.NoError(t, err) + + defer s.Shutdown() +} + func init() { logrus.SetLevel(logrus.TraceLevel) }