Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/backend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
corev1 "k8s.io/api/core/v1"
)

type ContextKey string

const ForUpdateContextKey ContextKey = "forUpdate"

type ApplicationSelector struct {

// Labels is not currently implemented.
Expand Down
26 changes: 25 additions & 1 deletion internal/backend/kubernetes/application/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)

var _ backend.Application = &KubernetesBackend{}
Expand All @@ -43,18 +44,24 @@ type KubernetesBackend struct {
appClient appclientset.Interface
// appInformer is used to watch for change events for Argo CD Application resources on the cluster
appInformer informer.InformerInterface
// appLister is used to list Argo CD Application resources from the cache
appLister cache.GenericLister
// namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases
namespace string
usePatch bool
}

func NewKubernetesBackend(appClient appclientset.Interface, namespace string, appInformer informer.InformerInterface, usePatch bool) *KubernetesBackend {
return &KubernetesBackend{
be := &KubernetesBackend{
appClient: appClient,
appInformer: appInformer,
usePatch: usePatch,
namespace: namespace,
}
if specificInformer, ok := appInformer.(*informer.Informer[*v1alpha1.Application]); ok {
be.appLister = specificInformer.Lister()
}
return be
}

func (be *KubernetesBackend) List(ctx context.Context, selector backend.ApplicationSelector) ([]v1alpha1.Application, error) {
Expand Down Expand Up @@ -82,6 +89,23 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati
}

func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) {
forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool)

if !forUpdate && be.appLister != nil && be.appInformer != nil && be.appInformer.HasSynced() {
Comment on lines +92 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain this construct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please take a look at my explanation below?

namespaceLister := be.appLister.ByNamespace(namespace)
if namespaceLister != nil {
obj, err := namespaceLister.Get(name)
if err != nil {
return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
}
app, ok := obj.(*v1alpha1.Application)
if !ok {
return nil, fmt.Errorf("object is not an Application: %T", obj)
}
return app.DeepCopy(), nil
}
}

return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
}

Expand Down
112 changes: 108 additions & 4 deletions internal/backend/kubernetes/application/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
"testing"

"github.com/argoproj-labs/argocd-agent/internal/backend"
"github.com/argoproj-labs/argocd-agent/internal/informer"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wI2L/jsondiff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

func Test_NewKubernetes(t *testing.T) {
Expand Down Expand Up @@ -110,22 +115,121 @@ func Test_Create(t *testing.T) {

func Test_Get(t *testing.T) {
apps := mkApps()
ctx := context.TODO()
t.Run("Get existing app", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
k := NewKubernetesBackend(fakeAppC, "", nil, true)
app, err := k.Get(context.TODO(), "app", "ns1")

inf, err := informer.NewInformer[*v1alpha1.Application](
ctx,
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
}),
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
}),
)
require.NoError(t, err)

go inf.Start(ctx)
require.NoError(t, inf.WaitForSync(ctx))

// Create the backend with the informer
backend := NewKubernetesBackend(fakeAppC, "", inf, true)

app, err := backend.Get(ctx, "app", "ns1")
assert.NoError(t, err)
assert.NotNil(t, app)
assert.Equal(t, "app", app.Name)
assert.Equal(t, "ns1", app.Namespace)

})
t.Run("Get non-existing app", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
k := NewKubernetesBackend(fakeAppC, "", nil, true)
app, err := k.Get(context.TODO(), "foo", "ns1")
inf, err := informer.NewInformer[*v1alpha1.Application](
ctx,
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
}),
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
}),
)
require.NoError(t, err)
go inf.Start(ctx)
require.NoError(t, inf.WaitForSync(ctx))

backend := NewKubernetesBackend(fakeAppC, "", inf, true)

app, err := backend.Get(ctx, "nonexistent", "ns1")
assert.ErrorContains(t, err, "not found")
assert.Equal(t, &v1alpha1.Application{}, app)

})

t.Run("Get returns type assertion error for invalid object", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset()

mockInf := &mockInformerWithInvalidType{}

backend := &KubernetesBackend{
appClient: fakeAppC,
appInformer: mockInf,
appLister: mockInf.Lister(),
}

app, err := backend.Get(ctx, "test", "ns1")
require.Error(t, err)
require.Nil(t, app)
assert.Contains(t, err.Error(), "object is not an Application")
})
}

type mockInformerWithInvalidType struct{}

func (m *mockInformerWithInvalidType) Start(ctx context.Context) error {
return nil
}

func (m *mockInformerWithInvalidType) WaitForSync(ctx context.Context) error {
return nil
}

func (m *mockInformerWithInvalidType) HasSynced() bool {
return true
}

func (m *mockInformerWithInvalidType) Stop() error {
return nil
}

func (m *mockInformerWithInvalidType) Lister() cache.GenericLister {
return &mockListerWithInvalidType{}
}

type mockListerWithInvalidType struct{}

func (m *mockListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
return nil, nil
}

func (m *mockListerWithInvalidType) Get(name string) (runtime.Object, error) {
return &corev1.ConfigMap{}, nil
}

func (m *mockListerWithInvalidType) ByNamespace(namespace string) cache.GenericNamespaceLister {
return &mockNamespaceListerWithInvalidType{}
}

type mockNamespaceListerWithInvalidType struct{}

func (m *mockNamespaceListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
return nil, nil
}

func (m *mockNamespaceListerWithInvalidType) Get(name string) (runtime.Object, error) {
return &corev1.ConfigMap{}, nil
}

func Test_Delete(t *testing.T) {
apps := mkApps()
t.Run("Delete existing app", func(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -57,6 +58,9 @@ type Informer[T runtime.Object] struct {

resType reflect.Type

// groupResource is the group and resource of the watched objects.
groupResource schema.GroupResource

// logger is this informer's logger.
logger *logrus.Entry

Expand Down Expand Up @@ -109,6 +113,12 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T
i := &Informer[T]{}
var r T
i.resType = reflect.TypeOf(r)

i.groupResource = schema.GroupResource{
Group: "argoproj.io",
Resource: "applications",
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this hard-coded, every informer would only ever have a lister for Applications. I think the groupResource must be set from the outside, potentially at initialization, depending on the type concrete type of the informer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. You’re right — I removed the hard-coded groupResource.
Instead, I added WithGroupResource[T](group, resource string) as a new InformerOption, which allows each informer to be configured with the correct groupResource at initialization time based on its concrete type.


i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{
"type": i.resType,
"module": "Informer",
Expand Down Expand Up @@ -287,3 +297,8 @@ func (i *Informer[T]) WaitForSync(ctx context.Context) error {
}
return nil
}

// Lister returns a GenericLister that can be used to list and get cached resources.
func (i *Informer[T]) Lister() cache.GenericLister {
return cache.NewGenericLister(i.informer.GetIndexer(), i.groupResource)
}
34 changes: 34 additions & 0 deletions internal/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
Expand Down Expand Up @@ -233,6 +234,39 @@ func Test_InformerScope(t *testing.T) {

}

func Test_Lister(t *testing.T) {
t.Run("Lister returns GenericLister", func(t *testing.T) {
i := newInformer(t, "", apps[0], apps[1])
go i.Start(context.TODO())
require.NoError(t, i.WaitForSync(context.TODO()))
defer i.Stop()

lister := i.Lister()
require.NotNil(t, lister)

obj, err := lister.ByNamespace("argocd").Get("test1")
require.NoError(t, err)
require.NotNil(t, obj)

app, ok := obj.(*v1alpha1.Application)
require.True(t, ok)
assert.Equal(t, "test1", app.Name)
assert.Equal(t, "argocd", app.Namespace)
})

t.Run("Lister can list objects", func(t *testing.T) {
i := newInformer(t, "", apps[0], apps[1])
go i.Start(context.TODO())
require.NoError(t, i.WaitForSync(context.TODO()))
defer i.Stop()

lister := i.Lister()
objs, err := lister.List(labels.Everything())
require.NoError(t, err)
assert.Len(t, objs, 2)
})
}

func init() {
logrus.SetLevel(logrus.TraceLevel)
}
8 changes: 7 additions & 1 deletion internal/manager/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,14 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom
// be returned.
func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) {
var updated *v1alpha1.Application

if ctx == nil {
ctx = context.Background()
}
ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true)

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
existing, ierr := m.applicationBackend.Get(ctx, incoming.Name, incoming.Namespace)
existing, ierr := m.applicationBackend.Get(ctxForUpdate, incoming.Name, incoming.Namespace)
Copy link
Contributor Author

@juanxiu juanxiu Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jannfis

The reason why forUpdate is necessary lies here. Kubernetes implements Optimistic Concurrency Control through resourceVersion. When an Update request is made, the API server compares the resourceVersion in the request with the one currently stored, and if they differ, it returns a Conflict.

The issue arises when using the informer cache during updates. Although the informer is synchronized with the API server, there is a slight lag, meaning that the cache may return a stale resourceVersion. As a result, the Update request fails, and even when using retry.RetryOnConflict() to retry, it keeps reading the same stale resourceVersion from the cache, causing repeated failures.

In contrast, when forUpdate=true, the object is fetched directly from the API server, guaranteeing the latest resourceVersion. This ensures that during retries, the latest version is retrieved, allowing the Update to succeed.

Therefore, we use the informer cache for regular reads to optimize performance, and direct API reads during updates to ensure correctness.

if ierr != nil {
if errors.IsNotFound(ierr) && upsert {
updated, ierr = m.Create(ctx, incoming)
Expand Down
13 changes: 13 additions & 0 deletions internal/manager/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f
informer.WithNamespaceScope[*v1alpha1.Application](namespace),
)
require.NoError(t, err)

go func() {
err = informer.Start(context.Background())
if err != nil {
t.Fatalf("failed to start informer: %v", err)
}
}()

if err = informer.WaitForSync(context.Background()); err != nil {
t.Fatalf("failed to wait for informer sync: %v", err)
}

return appC, informer
}

Expand Down Expand Up @@ -210,6 +222,7 @@ func Test_ManagerUpdateManaged(t *testing.T) {
require.NoError(t, err)

updated, err := mgr.UpdateManagedApp(context.Background(), incoming)

require.NoError(t, err)
require.NotNil(t, updated)

Expand Down
Loading
Loading