From 738429d8de97fbceba5053957eb36d10cc0bc321 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 1 Oct 2025 16:34:09 +0530 Subject: [PATCH 1/6] feat: revert appproject updates on peer clusters Signed-off-by: Chetan Banavikalmutt --- agent/agent.go | 67 +++++++-- agent/inbound.go | 17 ++- agent/outbound.go | 9 +- internal/cache/appcache.go | 71 ---------- internal/cache/appcache_test.go | 43 ------ internal/cache/resource_cache.go | 129 ++++++++++++++++++ internal/cache/resource_cache_test.go | 105 ++++++++++++++ internal/manager/application/application.go | 10 +- .../manager/application/application_test.go | 10 +- internal/manager/appproject/appproject.go | 40 ++++++ principal/callbacks.go | 46 ++++++- principal/callbacks_test.go | 7 +- principal/event.go | 16 ++- principal/server.go | 67 ++++++--- test/e2e/appcache_test.go | 75 ++++++++++ 15 files changed, 542 insertions(+), 170 deletions(-) delete mode 100644 internal/cache/appcache.go delete mode 100644 internal/cache/appcache_test.go create mode 100644 internal/cache/resource_cache.go create mode 100644 internal/cache/resource_cache_test.go diff --git a/agent/agent.go b/agent/agent.go index b5e21542..806b147e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -28,6 +28,7 @@ import ( kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject" kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace" kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/config" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/informer" @@ -49,7 +50,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" - appCache "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cacheutil "github.com/argoproj/argo-cd/v3/util/cache" appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate" @@ -104,6 +104,9 @@ type Agent struct { cacheRefreshInterval time.Duration clusterCache *appstatecache.Cache + + // sourceCache is a cache of resources from the source. We use it to revert any changes made to the local resources. + sourceCache *cache.SourceCache } const defaultQueueName = "default" @@ -310,6 +313,8 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri } a.clusterCache = clusterCache + a.sourceCache = cache.NewSourceCache() + return a, nil } @@ -319,21 +324,10 @@ func (a *Agent) Start(ctx context.Context) error { a.context = infCtx a.cancelFn = cancelFn - // For managed-agent we need to maintain a cache to keep applications in sync with last known state of - // principal in case agent is disconnected with principal or application in managed-cluster is modified. + // For managed-agent we need to maintain a cache to keep resources in sync with last known state of + // principal in case agent is disconnected with principal or resources in managed-cluster are modified. if a.mode == types.AgentModeManaged { - log().Infof("Recreating application spec cache from existing resources on cluster") - appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}}) - if err != nil { - log().Errorf("Error while fetching list of applications: %v", err) - } - - for _, app := range appList { - sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation] - if exists { - appCache.SetApplicationSpec(ty.UID(sourceUID), app.Spec, log()) - } - } + a.populateSourceCache(ctx) } if a.options.metricsPort > 0 { @@ -482,3 +476,46 @@ func (a *Agent) healthzHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) } } + +func (a *Agent) populateSourceCache(ctx context.Context) { + log().Infof("Recreating application spec cache from existing resources on cluster") + appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}}) + if err != nil { + log().Errorf("Error while fetching list of applications: %v", err) + } + + for _, app := range appList { + sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation] + if exists { + a.sourceCache.Application.Set(ty.UID(sourceUID), app.Spec) + } + } + + log().Infof("Recreating appProject spec cache from existing resources on cluster") + appProjectList, err := a.projectManager.List(ctx, backend.AppProjectSelector{Namespace: a.namespace}) + if err != nil { + log().Errorf("Error while fetching list of appProjects: %v", err) + } + + for _, appProject := range appProjectList { + sourceUID, exists := appProject.Annotations[manager.SourceUIDAnnotation] + if exists { + a.sourceCache.AppProject.Set(ty.UID(sourceUID), appProject.Spec) + } + } + + log().Infof("Recreating repository spec cache from existing resources on cluster") + repoList, err := a.repoManager.List(ctx, backend.RepositorySelector{Namespace: a.namespace}) + if err != nil { + log().Errorf("Error while fetching list of repositories: %v", err) + } + + for _, repo := range repoList { + sourceUID, exists := repo.Annotations[manager.SourceUIDAnnotation] + if exists { + a.sourceCache.Repository.Set(ty.UID(sourceUID), repo.Data) + } + } + + log().Infof("Source cache populated successfully") +} diff --git a/agent/inbound.go b/agent/inbound.go index e6c7109b..1cea69f8 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -19,7 +19,6 @@ import ( "time" "github.com/argoproj-labs/argocd-agent/internal/backend" - appCache "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/checkpoint" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/manager" @@ -470,7 +469,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App if a.mode == types.AgentModeManaged { // Store app spec in cache - appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx) + a.sourceCache.Application.Set(incoming.UID, incoming.Spec) } created, err := a.appManager.Create(a.context, incoming) @@ -513,7 +512,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App // Update app spec in cache logCtx.Tracef("Calling update spec for this event") - appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx) + a.sourceCache.Application.Set(incoming.UID, incoming.Spec) napp, err = a.appManager.UpdateManagedApp(a.context, incoming) case types.AgentModeAutonomous: @@ -550,7 +549,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { if apierrors.IsNotFound(err) { logCtx.Debug("application is not found, perhaps it is already deleted") if a.mode == types.AgentModeManaged { - appCache.DeleteApplicationSpec(app.UID, logCtx) + a.sourceCache.Application.Delete(app.UID) } return nil } @@ -558,7 +557,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { } if a.mode == types.AgentModeManaged { - appCache.DeleteApplicationSpec(app.UID, logCtx) + a.sourceCache.Application.Delete(app.UID) } err = a.appManager.Unmanage(app.QualifiedName()) @@ -595,6 +594,8 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr logCtx.Infof("Creating a new AppProject on behalf of an incoming event") + a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) + // Get rid of some fields that we do not want to have on the AppProject // as we start fresh. if incoming.Annotations != nil { @@ -632,6 +633,8 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr logCtx.Infof("Updating appProject") + a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) + logCtx.Tracef("Calling update spec for this event") return a.projectManager.UpdateAppProject(a.context, incoming) @@ -661,10 +664,14 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("appProject not found, perhaps it is already deleted") + a.sourceCache.AppProject.Delete(project.UID) return nil } return err } + + a.sourceCache.AppProject.Delete(project.UID) + err = a.projectManager.Unmanage(project.Name) if err != nil { log().Warnf("Could not unmanage appProject %s: %v", project.Name, err) diff --git a/agent/outbound.go b/agent/outbound.go index 3606e99f..602eb77e 100644 --- a/agent/outbound.go +++ b/agent/outbound.go @@ -81,7 +81,7 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App // Revert any direct modifications done in application on managed-cluster // because for managed-agent all changes should be done through principal - if reverted := a.appManager.RevertManagedAppChanges(a.context, new); reverted { + if reverted := a.appManager.RevertManagedAppChanges(a.context, new, a.sourceCache.Application); reverted { logCtx.Debugf("Modifications done in application: %s are reverted", new.Name) return } @@ -215,6 +215,13 @@ func (a *Agent) addAppProjectUpdateToQueue(old *v1alpha1.AppProject, new *v1alph return } + // Revert any direct modifications done to appProject on managed-cluster + // because for managed-agent all changes should be done through principal + if reverted := a.projectManager.RevertAppProjectChanges(a.context, new, a.sourceCache.AppProject); reverted { + logCtx.Debugf("Modifications done to appProject are reverted") + return + } + // Only send the update event when we're in autonomous mode if !a.mode.IsAutonomous() { return diff --git a/internal/cache/appcache.go b/internal/cache/appcache.go deleted file mode 100644 index 3512285f..00000000 --- a/internal/cache/appcache.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2025 The argocd-agent Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "sync" - - "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/types" -) - -// ApplicationSpecCache type is for caching Argo CD application spec to keep -// application in sync with last known state of principal application -type ApplicationSpecCache struct { - lock sync.RWMutex - - // key: source UID of app - // value: application spec - // - acquire 'lock' before accessing - appSpec map[types.UID]v1alpha1.ApplicationSpec -} - -// Initialize instance of ApplicationSpecCache. -var appSpecCache = &ApplicationSpecCache{ - appSpec: make(map[types.UID]v1alpha1.ApplicationSpec), -} - -// SetApplicationSpec inserts/updates app spec in cache -func SetApplicationSpec(sourceUID types.UID, app v1alpha1.ApplicationSpec, log *logrus.Entry) { - appSpecCache.lock.Lock() - defer appSpecCache.lock.Unlock() - - log.Tracef("Setting value in ApplicationSpec cache: '%s' %v", sourceUID, app) - - appSpecCache.appSpec[sourceUID] = app -} - -// GetApplicationSpec returns cached app spec -func GetApplicationSpec(sourceUID types.UID, log *logrus.Entry) (v1alpha1.ApplicationSpec, bool) { - - appSpecCache.lock.RLock() - defer appSpecCache.lock.RUnlock() - - appSpec, ok := appSpecCache.appSpec[sourceUID] - - log.Tracef("Retrieved value from ApplicationSpec cache: '%s' %v", sourceUID, appSpec) - return appSpec, ok -} - -// DeleteApplicationSpec removes app spec from cache -func DeleteApplicationSpec(sourceUID types.UID, log *logrus.Entry) { - appSpecCache.lock.Lock() - defer appSpecCache.lock.Unlock() - - log.Tracef("Deleting value from ApplicationSpec cache: '%s'", sourceUID) - - delete(appSpecCache.appSpec, sourceUID) -} diff --git a/internal/cache/appcache_test.go b/internal/cache/appcache_test.go deleted file mode 100644 index e8073ada..00000000 --- a/internal/cache/appcache_test.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2025 The argocd-agent Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "context" - "testing" - - "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" -) - -func Test_ApplicationSpecCache(t *testing.T) { - t.Run("Test ApplicationSpec Cache", func(t *testing.T) { - assert.Empty(t, appSpecCache.appSpec) - - log := logrus.New().WithContext(context.Background()) - - expectedAppSpec := v1alpha1.ApplicationSpec{Project: "test-project"} - SetApplicationSpec("test", expectedAppSpec, log) - assert.Equal(t, 1, len(appSpecCache.appSpec)) - - actualAppSpec, ok := GetApplicationSpec("test", log) - assert.True(t, ok) - assert.Equal(t, expectedAppSpec, actualAppSpec) - - DeleteApplicationSpec("test", log) - assert.Empty(t, appSpecCache.appSpec) - }) -} diff --git a/internal/cache/resource_cache.go b/internal/cache/resource_cache.go new file mode 100644 index 00000000..8c50cb36 --- /dev/null +++ b/internal/cache/resource_cache.go @@ -0,0 +1,129 @@ +// Copyright 2025 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "sync" + + "github.com/argoproj-labs/argocd-agent/internal/logging" + "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" +) + +type SourceCache struct { + Application *ResourceCache[v1alpha1.ApplicationSpec] + AppProject *ResourceCache[v1alpha1.AppProjectSpec] + Repository *ResourceCache[map[string][]byte] +} + +func NewSourceCache() *SourceCache { + return &SourceCache{ + Application: newResourceCache[v1alpha1.ApplicationSpec]("ApplicationSpec"), + AppProject: newResourceCache[v1alpha1.AppProjectSpec]("AppProjectSpec"), + Repository: newResourceCache[map[string][]byte]("RepositorySpec"), + } +} + +// ResourceCache is used to cache the source resource spec. It is used to keep the resources on the peer in sync with the source. +type ResourceCache[T any] struct { + lock sync.RWMutex + items map[types.UID]T + name string + + log *logrus.Entry +} + +// NewResourceCache creates a new resource cache +func newResourceCache[T any](name string) *ResourceCache[T] { + return &ResourceCache[T]{ + items: make(map[types.UID]T), + name: name, + log: logging.ModuleLogger("SourceCache").WithField("key", name), + } +} + +// Set inserts/updates a resource in the cache +func (c *ResourceCache[T]) Set(sourceUID types.UID, item T) { + c.lock.Lock() + defer c.lock.Unlock() + + c.log.Tracef("Setting value in source cache: '%s'", sourceUID) + + c.items[sourceUID] = item +} + +// Get retrieves a resource from the cache +func (c *ResourceCache[T]) Get(sourceUID types.UID) (T, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + item, ok := c.items[sourceUID] + + c.log.Tracef("Retrieved value from cache: '%s' (found: %v)", sourceUID, ok) + + return item, ok +} + +// Delete removes a resource from the cache +func (c *ResourceCache[T]) Delete(sourceUID types.UID) { + c.lock.Lock() + defer c.lock.Unlock() + + c.log.Tracef("Deleting value from cache: '%s'", sourceUID) + + delete(c.items, sourceUID) +} + +// Clear removes all items from the cache +func (c *ResourceCache[T]) Clear() { + c.lock.Lock() + defer c.lock.Unlock() + + c.log.Tracef("Clearing all values from cache") + + c.items = make(map[types.UID]T) +} + +// Len returns the number of items in the cache +func (c *ResourceCache[T]) Len() int { + c.lock.RLock() + defer c.lock.RUnlock() + return len(c.items) +} + +// Keys returns all keys in the cache +func (c *ResourceCache[T]) Keys() []types.UID { + c.lock.RLock() + defer c.lock.RUnlock() + + keys := make([]types.UID, 0, len(c.items)) + for key := range c.items { + keys = append(keys, key) + } + return keys +} + +// ForEach iterates over all items in the cache +func (c *ResourceCache[T]) ForEach(fn func(types.UID, T) bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + for key, value := range c.items { + if !fn(key, value) { + break + } + } +} diff --git a/internal/cache/resource_cache_test.go b/internal/cache/resource_cache_test.go new file mode 100644 index 00000000..59c70ce0 --- /dev/null +++ b/internal/cache/resource_cache_test.go @@ -0,0 +1,105 @@ +// Copyright 2025 The argocd-agent Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "testing" + + "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" + "github.com/stretchr/testify/assert" +) + +func Test_SourceResourceCache(t *testing.T) { + + sourceCache := NewSourceCache() + t.Run("Test ApplicationSpec Cache", func(t *testing.T) { + + appSpecCache := sourceCache.Application + assert.Empty(t, appSpecCache.items) + + expectedAppSpec := v1alpha1.ApplicationSpec{Project: "test-project"} + appSpecCache.Set("test", expectedAppSpec) + assert.Equal(t, 1, len(appSpecCache.items)) + + actualAppSpec, ok := appSpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedAppSpec, actualAppSpec) + + expectedAppSpec.Project = "test-project-updated" + appSpecCache.Set("test", expectedAppSpec) + assert.Equal(t, 1, len(appSpecCache.items)) + + actualAppSpec, ok = appSpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedAppSpec, actualAppSpec) + + appSpecCache.Delete("test") + assert.Empty(t, appSpecCache.items) + }) + + t.Run("Test AppProjectSpec Cache", func(t *testing.T) { + + appProjectSpecCache := sourceCache.AppProject + assert.Empty(t, appProjectSpecCache.items) + + expectedAppSpec := v1alpha1.AppProjectSpec{Description: "test-project"} + appProjectSpecCache.Set("test", expectedAppSpec) + assert.Equal(t, 1, len(appProjectSpecCache.items)) + + actualAppSpec, ok := appProjectSpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedAppSpec, actualAppSpec) + + expectedAppSpec.Description = "test-project-updated" + appProjectSpecCache.Set("test", expectedAppSpec) + assert.Equal(t, 1, len(appProjectSpecCache.items)) + + actualAppSpec, ok = appProjectSpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedAppSpec, actualAppSpec) + + appProjectSpecCache.Delete("test") + assert.Empty(t, appProjectSpecCache.items) + }) + + t.Run("Test RepositorySpec Cache", func(t *testing.T) { + + repositorySpecCache := sourceCache.Repository + assert.Empty(t, repositorySpecCache.items) + + expectedRepositorySpec := map[string][]byte{ + "test": {0x01, 0x02, 0x03}, + } + repositorySpecCache.Set("test", expectedRepositorySpec) + assert.Equal(t, 1, len(repositorySpecCache.items)) + + actualRepositorySpec, ok := repositorySpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedRepositorySpec, actualRepositorySpec) + + expectedRepositorySpec = map[string][]byte{ + "test": {0x04, 0x05, 0x06}, + } + repositorySpecCache.Set("test", expectedRepositorySpec) + assert.Equal(t, 1, len(repositorySpecCache.items)) + + actualRepositorySpec, ok = repositorySpecCache.Get("test") + assert.True(t, ok) + assert.Equal(t, expectedRepositorySpec, actualRepositorySpec) + + repositorySpecCache.Delete("test") + assert.Empty(t, repositorySpecCache.items) + }) +} diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 38177954..2b430c10 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -27,7 +27,7 @@ import ( "k8s.io/utils/ptr" "github.com/argoproj-labs/argocd-agent/internal/backend" - appCache "github.com/argoproj-labs/argocd-agent/internal/cache" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/logging/logfields" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" @@ -616,7 +616,7 @@ func (m *ApplicationManager) List(ctx context.Context, selector backend.Applicat // RevertManagedAppChanges compares the actual spec with expected spec stored in cache, // if actual spec doesn't match with cache, then it is reverted to be in sync with cache, which is same as principal. -func (m *ApplicationManager) RevertManagedAppChanges(ctx context.Context, app *v1alpha1.Application) bool { +func (m *ApplicationManager) RevertManagedAppChanges(ctx context.Context, app *v1alpha1.Application, appCache *cache.ResourceCache[v1alpha1.ApplicationSpec]) bool { logCtx := log().WithFields(logrus.Fields{ "component": "RevertManagedAppChanges", "application": app.QualifiedName(), @@ -625,7 +625,7 @@ func (m *ApplicationManager) RevertManagedAppChanges(ctx context.Context, app *v sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation] if exists && m.mode == manager.ManagerModeManaged { - if cachedAppSpec, ok := appCache.GetApplicationSpec(ty.UID(sourceUID), logCtx); ok { + if cachedAppSpec, ok := appCache.Get(ty.UID(sourceUID)); ok { logCtx.Debugf("Application %s is available in agent cache", app.Name) if isEqual := reflect.DeepEqual(cachedAppSpec, app.Spec); !isEqual { @@ -646,7 +646,7 @@ func (m *ApplicationManager) RevertManagedAppChanges(ctx context.Context, app *v // RevertAutonomousAppChanges compares the actual spec with expected spec stored in cache, // if actual spec doesn't match with cache, then it is reverted to be in sync with cache, which is same as agent cluster. -func (m *ApplicationManager) RevertAutonomousAppChanges(ctx context.Context, app *v1alpha1.Application) bool { +func (m *ApplicationManager) RevertAutonomousAppChanges(ctx context.Context, app *v1alpha1.Application, appCache *cache.ResourceCache[v1alpha1.ApplicationSpec]) bool { logCtx := log().WithFields(logrus.Fields{ "component": "RevertAutonomousAppChanges", "application": app.QualifiedName(), @@ -658,7 +658,7 @@ func (m *ApplicationManager) RevertAutonomousAppChanges(ctx context.Context, app return false } - if cachedAppSpec, ok := appCache.GetApplicationSpec(ty.UID(sourceUID), logCtx); ok { + if cachedAppSpec, ok := appCache.Get(ty.UID(sourceUID)); ok { logCtx.Debugf("Application %s is available in agent cache", app.Name) if isEqual := reflect.DeepEqual(cachedAppSpec, app.Spec); !isEqual { diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index b40d4a90..a642e376 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -35,7 +35,7 @@ import ( fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake" "k8s.io/apimachinery/pkg/api/errors" - appCache "github.com/argoproj-labs/argocd-agent/internal/cache" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -756,16 +756,18 @@ func Test_RevertManagedAppChanges(t *testing.T) { mgr, err := NewApplicationManager(be, "argocd", WithMode(manager.ManagerModeManaged), WithRole(manager.ManagerRoleAgent)) require.NoError(t, err) + sourceCache := cache.NewSourceCache() + // Store app spec in cache - appCache.SetApplicationSpec("some_uid", app.Spec, log()) + sourceCache.Application.Set("some_uid", app.Spec) - reverted := mgr.RevertManagedAppChanges(context.Background(), app) + reverted := mgr.RevertManagedAppChanges(context.Background(), app, sourceCache.Application) require.False(t, reverted) // Update app spec app.Spec.Project = "test1" - reverted = mgr.RevertManagedAppChanges(context.Background(), app) + reverted = mgr.RevertManagedAppChanges(context.Background(), app, sourceCache.Application) require.True(t, reverted) }) } diff --git a/internal/manager/appproject/appproject.go b/internal/manager/appproject/appproject.go index cfbe8d50..58e9a038 100644 --- a/internal/manager/appproject/appproject.go +++ b/internal/manager/appproject/appproject.go @@ -19,10 +19,12 @@ import ( "encoding/json" "fmt" "net/url" + "reflect" "strings" "time" "github.com/argoproj-labs/argocd-agent/internal/backend" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/logging" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" @@ -31,6 +33,7 @@ import ( "github.com/wI2L/jsondiff" "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ) @@ -363,6 +366,43 @@ func (m *AppProjectManager) CompareSourceUID(ctx context.Context, incoming *v1al return true, string(incoming.UID) == sourceUID, nil } +// RevertAppProjectChanges compares the actual spec with expected spec stored in cache, +// if actual spec doesn't match with cache, then it is reverted to be in sync with cache, which is same as the source cluster. +func (m *AppProjectManager) RevertAppProjectChanges(ctx context.Context, project *v1alpha1.AppProject, projectCache *cache.ResourceCache[v1alpha1.AppProjectSpec]) bool { + logCtx := log().WithFields(logrus.Fields{ + "component": "RevertAppProjectChanges", + "appProject": project.Name, + "resourceVersion": project.ResourceVersion, + }) + + sourceUID, exists := project.Annotations[manager.SourceUIDAnnotation] + if !exists { + return false + } + + if cachedSpec, ok := projectCache.Get(types.UID(sourceUID)); ok { + logCtx.Debugf("AppProject %s is available in agent cache", project.Name) + + if isEqual := reflect.DeepEqual(cachedSpec, project.Spec); !isEqual { + project.Spec = cachedSpec + logCtx.Infof("Reverting modifications done in appProject: %s", project.Name) + if _, err := m.UpdateAppProject(ctx, project); err != nil { + logCtx.Errorf("Unable to revert modifications done in appProject: %s. Error: %v", project.Name, err) + return false + } + return true + } else { + logCtx.Trace(cachedSpec) + logCtx.Trace(project.Spec) + logCtx.Debugf("AppProject %s is already in sync with source cache", project.Name) + } + } else { + logCtx.Errorf("AppProject %s is not available in agent cache", project.Name) + } + + return false +} + // DoesAgentMatchWithProject checks if the agent name matches the given AppProject. // We match the agent to an AppProject if: // 1. The agent name matches any one of the destination names OR diff --git a/principal/callbacks.go b/principal/callbacks.go index c51e290e..0368e4a0 100644 --- a/principal/callbacks.go +++ b/principal/callbacks.go @@ -87,7 +87,7 @@ func (s *Server) updateAppCallback(old *v1alpha1.Application, new *v1alpha1.Appl } // Revert modifications on autonomous agent applications - if s.appManager.RevertAutonomousAppChanges(s.ctx, new) { + if s.appManager.RevertAutonomousAppChanges(s.ctx, new, s.sourceCache.Application) { logCtx.Trace("Modifications to the application are reverted") return } @@ -219,8 +219,13 @@ func (s *Server) updateAppProjectCallback(old *v1alpha1.AppProject, new *v1alpha // Check if this AppProject was created by an autonomous agent if isResourceFromAutonomousAgent(new) { - logCtx.Debugf("Discarding event, because the appProject is managed by an autonomous agent") - return + logCtx.Trace("Reverting modifications on autonomous agent appProjects/////////////////") + // Revert modifications on autonomous agent appProjects + if s.projectManager.RevertAppProjectChanges(s.ctx, new, s.sourceCache.AppProject) { + logCtx.Trace("Modifications to the appProject are reverted*************") + return + } + logCtx.Trace("No revertions!!!!!!!!!!!!!") } if len(new.Finalizers) > 0 && len(new.Finalizers) != len(old.Finalizers) { @@ -263,6 +268,14 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) { "appproject_name": outbound.Name, }) + // Revert user-initiated deletion on autonomous agent applications + if isResourceFromAutonomousAgent(outbound) { + if s.revertUserInitiatedProjectDeletion(outbound, logCtx) { + logCtx.Trace("Deleted appProject is recreated") + } + return + } + s.resources.Remove(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound)) // Check if this AppProject was created by an autonomous agent by examining its name prefix @@ -644,3 +657,30 @@ func (s *Server) revertUserInitiatedDeletion(outbound *v1alpha1.Application, log return true } + +func (s *Server) revertUserInitiatedProjectDeletion(outbound *v1alpha1.AppProject, logCtx *logrus.Entry) bool { + appKey := fmt.Sprintf("%s/%s", outbound.Namespace, outbound.Name) + + // Check if this deletion was expected (agent-initiated) + if s.isExpectedDeletion(appKey) { + logCtx.Debugf("Expected deletion from autonomous agent - allowing it to proceed") + // This is a legitimate deletion from the agent, let it proceed normally + return false + } + + logCtx.Warnf("Unauthorized deletion detected for autonomous agent application - recreating") + // This is an unauthorized deletion (user-initiated), recreate the app + app := outbound.DeepCopy() + app.ResourceVersion = "" + app.DeletionTimestamp = nil + sourceUID := outbound.Annotations[manager.SourceUIDAnnotation] + app.SetUID(k8stypes.UID(sourceUID)) + _, err := s.projectManager.Create(s.ctx, app) + if err != nil { + logCtx.WithError(err).Error("failed to recreate application after unauthorized deletion") + } else { + logCtx.Infof("Recreated application %s after unauthorized deletion", outbound.Name) + } + + return true +} diff --git a/principal/callbacks_test.go b/principal/callbacks_test.go index dcfca927..9f8a5ab3 100644 --- a/principal/callbacks_test.go +++ b/principal/callbacks_test.go @@ -1263,6 +1263,7 @@ func TestServer_updateAppCallback(t *testing.T) { events: event.NewEventSource("test"), namespaceMap: map[string]types.AgentMode{"autonomous-agent": types.AgentModeAutonomous}, appManager: appManager, + sourceCache: cache.NewSourceCache(), } err = s.queues.Create("autonomous-agent") @@ -1299,6 +1300,7 @@ func TestServer_updateAppCallback(t *testing.T) { events: event.NewEventSource("test"), namespaceMap: map[string]types.AgentMode{"autonomous-agent": types.AgentModeAutonomous}, appManager: appManager, + sourceCache: cache.NewSourceCache(), } err = s.queues.Create("autonomous-agent") @@ -1324,8 +1326,8 @@ func TestServer_updateAppCallback(t *testing.T) { } sourceUID := k8stypes.UID(oldApp.Annotations[manager.SourceUIDAnnotation]) - cache.SetApplicationSpec(sourceUID, oldApp.Spec, log()) - defer cache.DeleteApplicationSpec(sourceUID, log()) + s.sourceCache.Application.Set(sourceUID, oldApp.Spec) + defer s.sourceCache.Application.Delete(sourceUID) mockBackend.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(oldApp, nil) mockBackend.On("SupportsPatch").Return(true) @@ -1349,6 +1351,7 @@ func TestServer_updateAppCallback(t *testing.T) { events: event.NewEventSource("test"), namespaceMap: map[string]types.AgentMode{"autonomous-agent": types.AgentModeAutonomous}, appManager: appManager, + sourceCache: cache.NewSourceCache(), } err = s.queues.Create("autonomous-agent") diff --git a/principal/event.go b/principal/event.go index 89fd38d4..d9d11806 100644 --- a/principal/event.go +++ b/principal/event.go @@ -20,7 +20,6 @@ import ( "time" "github.com/argoproj-labs/argocd-agent/internal/backend" - "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/checkpoint" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/kube" @@ -179,7 +178,7 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, return event.ErrEventDiscarded } - cache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx) + s.sourceCache.Application.Set(incoming.UID, incoming.Spec) incoming.SetNamespace(agentName) _, err := s.appManager.Create(ctx, incoming) @@ -201,7 +200,7 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, return event.NewEventNotAllowedErr("event type not allowed when mode is not autonomous") } - cache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx) + s.sourceCache.Application.Set(incoming.UID, incoming.Spec) _, err := s.appManager.UpdateAutonomousApp(ctx, agentName, incoming) if err != nil { @@ -215,7 +214,7 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, return event.NewEventNotAllowedErr("event type not allowed when mode is not managed") } - cache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx) + s.sourceCache.Application.Set(incoming.UID, incoming.Spec) _, err := s.appManager.UpdateStatus(ctx, agentName, incoming) if err != nil { @@ -226,7 +225,7 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, case event.Delete.String(): if agentMode.IsAutonomous() { - cache.DeleteApplicationSpec(incoming.UID, logCtx) + s.sourceCache.Application.Delete(incoming.UID) // Mark this deletion as expected and perform the deletion appKey := fmt.Sprintf("%s/%s", agentName, incoming.Name) @@ -334,6 +333,9 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e case event.Create.String(): if agentMode.IsAutonomous() { incoming.SetNamespace(s.namespace) + + s.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) + _, err := s.projectManager.Create(ctx, incoming) if err != nil { return fmt.Errorf("could not create app-project %s: %w", incoming.Name, err) @@ -351,6 +353,8 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e incoming.SetNamespace(s.namespace) + s.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) + _, err := s.projectManager.UpdateAppProject(ctx, incoming) if err != nil { return fmt.Errorf("could not update app-project %s: %w", incoming.Name, err) @@ -364,6 +368,8 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e incoming.SetNamespace(s.namespace) + s.sourceCache.AppProject.Delete(incoming.UID) + deletionPropagation := backend.DeletePropagationForeground err := s.projectManager.Delete(ctx, incoming, &deletionPropagation) if err != nil { diff --git a/principal/server.go b/principal/server.go index 51b3dc15..8f4ed8c5 100644 --- a/principal/server.go +++ b/principal/server.go @@ -32,7 +32,7 @@ import ( kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject" kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace" kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository" - appCache "github.com/argoproj-labs/argocd-agent/internal/cache" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/config" "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/filter" @@ -157,6 +157,8 @@ type Server struct { handlersOnConnect []handlersOnConnect eventWriters *event.EventWritersMap + + sourceCache *cache.SourceCache } type handlersOnConnect func(agent types.Agent) error @@ -191,6 +193,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace repoToAgents: NewMapToSet(), projectToRepos: NewMapToSet(), expectedDeletions: make(map[string]bool), + sourceCache: cache.NewSourceCache(), } s.ctx, s.ctxCancel = context.WithCancel(ctx) @@ -425,20 +428,9 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { log().Infof("Starting %s (server) v%s (allowed_namespaces=%v)", s.version.Name(), s.version.Version(), s.options.namespaces) } - // We need to maintain a cache to keep applications in sync with last known state of - // autonomous-agent in case it is disconnected with agent or application on the control-plane is modified. - log().Infof("Recreating application spec cache from existing resources on cluster") - appList, err := s.appManager.List(ctx, backend.ApplicationSelector{}) - if err != nil { - log().Errorf("Error while fetching list of applications: %v", err) - } - - for _, app := range appList { - sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation] - if exists { - appCache.SetApplicationSpec(k8stypes.UID(sourceUID), app.Spec, log()) - } - } + // We need to maintain a cache to keep resources in sync with last known state of + // autonomous-agent in case it is disconnected with agent or resources on the control-plane are modified. + s.populateSourceCache(ctx) if s.options.serveGRPC { if err := s.serveGRPC(ctx, s.metrics, errch); err != nil { @@ -461,7 +453,7 @@ func (s *Server) Start(ctx context.Context, errch chan error) error { go s.RunHandlersOnConnect(s.ctx) - err = s.StartEventProcessor(s.ctx) + err := s.StartEventProcessor(s.ctx) if err != nil { return nil } @@ -909,3 +901,46 @@ func (s *Server) healthzHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") w.WriteHeader(http.StatusOK) } + +func (s *Server) populateSourceCache(ctx context.Context) { + log().Infof("Recreating application spec cache from existing resources on cluster") + appList, err := s.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{s.namespace}}) + if err != nil { + log().Errorf("Error while fetching list of applications: %v", err) + } + + for _, app := range appList { + sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation] + if exists { + s.sourceCache.Application.Set(k8stypes.UID(sourceUID), app.Spec) + } + } + + log().Infof("Recreating appProject spec cache from existing resources on cluster") + appProjectList, err := s.projectManager.List(ctx, backend.AppProjectSelector{Namespace: s.namespace}) + if err != nil { + log().Errorf("Error while fetching list of appProjects: %v", err) + } + + for _, appProject := range appProjectList { + sourceUID, exists := appProject.Annotations[manager.SourceUIDAnnotation] + if exists { + s.sourceCache.AppProject.Set(k8stypes.UID(sourceUID), appProject.Spec) + } + } + + log().Infof("Recreating repository spec cache from existing resources on cluster") + repoList, err := s.repoManager.List(ctx, backend.RepositorySelector{Namespace: s.namespace}) + if err != nil { + log().Errorf("Error while fetching list of repositories: %v", err) + } + + for _, repo := range repoList { + sourceUID, exists := repo.Annotations[manager.SourceUIDAnnotation] + if exists { + s.sourceCache.Repository.Set(k8stypes.UID(sourceUID), repo.Data) + } + } + + log().Infof("Source cache populated successfully") +} diff --git a/test/e2e/appcache_test.go b/test/e2e/appcache_test.go index 5705ae12..a9fed26f 100644 --- a/test/e2e/appcache_test.go +++ b/test/e2e/appcache_test.go @@ -258,6 +258,81 @@ func (suite *CacheTestSuite) Test_RevertAutonomousAppDeletion() { }, 30*time.Second, 1*time.Second) } +func (suite *CacheTestSuite) Test_RevertAppProjectUpdatesManagedMode() { + requires := suite.Require() + + // Create an appProject on the control-plane cluster and ensure it is synced to the workload cluster + appProject := sampleAppProject() + + err := suite.PrincipalClient.Create(suite.Ctx, appProject, metav1.CreateOptions{}) + requires.NoError(err) + + projKey := types.NamespacedName{Name: appProject.Name, Namespace: "argocd"} + + requires.Eventually(func() bool { + appProject := argoapp.AppProject{} + err := suite.ManagedAgentClient.Get(suite.Ctx, projKey, &appProject, metav1.GetOptions{}) + if err != nil { + fmt.Println("error getting appProject", err) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Modify the appProject on the workload cluster and ensure the change is reverted to be in sync with the control-plane + err = suite.ManagedAgentClient.EnsureAppProjectUpdate(suite.Ctx, projKey, func(appProject *argoapp.AppProject) error { + appProject.Spec.Description = "random" + return nil + }, metav1.UpdateOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := argoapp.AppProject{} + err := suite.ManagedAgentClient.Get(suite.Ctx, projKey, &got, metav1.GetOptions{}) + fmt.Println("err", err) + fmt.Println("got", got.Spec.Description) + return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) + }, 30*time.Second, 1*time.Second) +} + +func (suite *CacheTestSuite) Test_RevertAppProjectUpdatesAutonomousMode() { + requires := suite.Require() + + // Create an appProject on the workload cluster and ensure it is synced to the control-plane + appProject := sampleAppProject() + + err := suite.AutonomousAgentClient.Create(suite.Ctx, appProject, metav1.CreateOptions{}) + requires.NoError(err) + + autonomousProjName := "agent-autonomous-" + appProject.Name + principalKey := types.NamespacedName{Name: autonomousProjName, Namespace: "argocd"} + + requires.Eventually(func() bool { + appProject := argoapp.AppProject{} + err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &appProject, metav1.GetOptions{}) + if err != nil { + fmt.Println("error getting appProject", err) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Modify the appProject on the control-plane and ensure the change is reverted to be in sync with the agent + err = suite.PrincipalClient.EnsureAppProjectUpdate(suite.Ctx, principalKey, func(appProject *argoapp.AppProject) error { + appProject.Spec.Description = "random" + return nil + }, metav1.UpdateOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := argoapp.AppProject{} + err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &got, metav1.GetOptions{}) + fmt.Println("err", err) + fmt.Println("got", got.Spec.Description) + return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) + }, 30*time.Second, 1*time.Second) +} + func createApp(ctx context.Context, client fixture.KubeClient, requires *require.Assertions, opts ...struct{ Name, Namespace string }) argoapp.Application { // If opts are provided, use them, otherwise use default values name := "guestbook" From c13f8024baa9c17160b73961e3884964e31ce482 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Tue, 7 Oct 2025 17:27:01 +0530 Subject: [PATCH 2/6] revert updates to repository Signed-off-by: Chetan Banavikalmutt Assisted-by: Cursor --- agent/inbound.go | 29 +++-- agent/outbound.go | 37 +++++++ internal/cache/resource_cache.go | 33 ++---- internal/manager/appproject/appproject.go | 2 - internal/manager/manager.go | 62 +++++++++++ internal/manager/manager_test.go | 125 ++++++++++++++++++++++ internal/manager/repository/repository.go | 38 +++++++ principal/callbacks.go | 66 +----------- principal/callbacks_test.go | 79 ++++---------- principal/event.go | 11 +- principal/server.go | 52 +++------ test/e2e/appcache_test.go | 104 +++++++++++++++++- test/e2e/fixture/fixture.go | 18 +++- 13 files changed, 452 insertions(+), 204 deletions(-) diff --git a/agent/inbound.go b/agent/inbound.go index 1cea69f8..03921f57 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -543,23 +543,24 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { logCtx.Infof("Deleting application") + if a.mode == types.AgentModeManaged { + a.sourceCache.Application.Delete(app.UID) + } + deletionPropagation := backend.DeletePropagationBackground err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("application is not found, perhaps it is already deleted") - if a.mode == types.AgentModeManaged { - a.sourceCache.Application.Delete(app.UID) - } return nil } + // Restore the cache if the deletion fails + if a.mode == types.AgentModeManaged { + a.sourceCache.Application.Set(app.UID, app.Spec) + } return err } - if a.mode == types.AgentModeManaged { - a.sourceCache.Application.Delete(app.UID) - } - err = a.appManager.Unmanage(app.QualifiedName()) if err != nil { log().Warnf("Could not unmanage app %s: %v", app.QualifiedName(), err) @@ -659,6 +660,8 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { logCtx.Infof("Deleting appProject") + a.sourceCache.AppProject.Delete(project.UID) + deletionPropagation := backend.DeletePropagationBackground err := a.projectManager.Delete(a.context, project, &deletionPropagation) if err != nil { @@ -667,11 +670,11 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { a.sourceCache.AppProject.Delete(project.UID) return nil } + // Restore the cache if the deletion fails + a.sourceCache.AppProject.Set(project.UID, project.Spec) return err } - a.sourceCache.AppProject.Delete(project.UID) - err = a.projectManager.Unmanage(project.Name) if err != nil { log().Warnf("Could not unmanage appProject %s: %v", project.Name, err) @@ -709,6 +712,8 @@ func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error incoming.Annotations = make(map[string]string) } + a.sourceCache.Repository.Set(incoming.UID, incoming.Data) + // Get rid of some fields that we do not want to have on the repository as we start fresh. delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration") @@ -745,6 +750,8 @@ func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error logCtx.Infof("Updating repository") + a.sourceCache.Repository.Set(incoming.UID, incoming.Data) + return a.repoManager.UpdateManagedRepository(a.context, incoming) } @@ -763,6 +770,8 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error { logCtx.Infof("Deleting repository") + a.sourceCache.Repository.Delete(repo.UID) + deletionPropagation := backend.DeletePropagationBackground err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation) if err != nil { @@ -770,6 +779,8 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error { logCtx.Debug("repository is not found, perhaps it is already deleted") return nil } + // Restore the cache if the deletion fails + a.sourceCache.Repository.Set(repo.UID, repo.Data) return err } diff --git a/agent/outbound.go b/agent/outbound.go index 602eb77e..3e403093 100644 --- a/agent/outbound.go +++ b/agent/outbound.go @@ -19,12 +19,14 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/logging/logfields" + "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/resources" "github.com/argoproj-labs/argocd-agent/pkg/types" "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" cacheutil "github.com/argoproj/argo-cd/v3/util/cache" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // addAppCreationToQueue processes a new application event originating from the @@ -116,6 +118,13 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) { logCtx := log().WithField(logfields.Event, "DeleteApp").WithField(logfields.Application, app.QualifiedName()) logCtx.Debugf("Delete app event") + if isResourceFromPrincipal(app) { + if manager.RevertUserInitiatedDeletion(a.context, app, a.sourceCache.Application, a.appManager, logCtx) { + logCtx.Trace("Deleted app is recreated") + return + } + } + a.resources.Remove(resources.NewResourceKeyFromApp(app)) if !a.appManager.IsManaged(app.QualifiedName()) { @@ -248,6 +257,13 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) { logCtx.Debugf("Delete appProject event") + if isResourceFromPrincipal(appProject) { + if manager.RevertUserInitiatedDeletion(a.context, appProject, a.sourceCache.AppProject, a.projectManager, logCtx) { + logCtx.Trace("Deleted appProject is recreated") + return + } + } + a.resources.Remove(resources.NewResourceKeyFromAppProject(appProject)) // Only send the deletion event when we're in autonomous mode @@ -347,6 +363,11 @@ func (a *Agent) handleRepositoryUpdate(old, new *corev1.Secret) { a.watchLock.Lock() defer a.watchLock.Unlock() + if a.repoManager.RevertRepositoryChanges(a.context, new, a.sourceCache.Repository) { + logCtx.Debugf("Modifications done to repository are reverted") + return + } + if a.mode.IsAutonomous() { logCtx.Debugf("Skipping repository event because the agent is not in managed mode") return @@ -371,6 +392,11 @@ func (a *Agent) handleRepositoryDeletion(repo *corev1.Secret) { logCtx.Debugf("Delete repository event") + if manager.RevertUserInitiatedDeletion(a.context, repo, a.sourceCache.Repository, a.repoManager, logCtx) { + logCtx.Trace("Deleted repository is recreated") + return + } + if a.mode.IsAutonomous() { logCtx.Debugf("Skipping repository event because the agent is not in managed mode") return @@ -388,3 +414,14 @@ func (a *Agent) handleRepositoryDeletion(repo *corev1.Secret) { return } } + +// isResourceFromPrincipal checks if a Kubernetes resource was created by the principal +// by examining if it has the source UID annotation. +func isResourceFromPrincipal(resource metav1.Object) bool { + annotations := resource.GetAnnotations() + if annotations == nil { + return false + } + _, ok := annotations[manager.SourceUIDAnnotation] + return ok +} diff --git a/internal/cache/resource_cache.go b/internal/cache/resource_cache.go index 8c50cb36..da647824 100644 --- a/internal/cache/resource_cache.go +++ b/internal/cache/resource_cache.go @@ -77,6 +77,15 @@ func (c *ResourceCache[T]) Get(sourceUID types.UID) (T, bool) { return item, ok } +// Contains checks if a resource is in the cache +func (c *ResourceCache[T]) Contains(sourceUID types.UID) bool { + c.lock.RLock() + defer c.lock.RUnlock() + + _, ok := c.items[sourceUID] + return ok +} + // Delete removes a resource from the cache func (c *ResourceCache[T]) Delete(sourceUID types.UID) { c.lock.Lock() @@ -103,27 +112,3 @@ func (c *ResourceCache[T]) Len() int { defer c.lock.RUnlock() return len(c.items) } - -// Keys returns all keys in the cache -func (c *ResourceCache[T]) Keys() []types.UID { - c.lock.RLock() - defer c.lock.RUnlock() - - keys := make([]types.UID, 0, len(c.items)) - for key := range c.items { - keys = append(keys, key) - } - return keys -} - -// ForEach iterates over all items in the cache -func (c *ResourceCache[T]) ForEach(fn func(types.UID, T) bool) { - c.lock.RLock() - defer c.lock.RUnlock() - - for key, value := range c.items { - if !fn(key, value) { - break - } - } -} diff --git a/internal/manager/appproject/appproject.go b/internal/manager/appproject/appproject.go index 58e9a038..7d0007b0 100644 --- a/internal/manager/appproject/appproject.go +++ b/internal/manager/appproject/appproject.go @@ -392,8 +392,6 @@ func (m *AppProjectManager) RevertAppProjectChanges(ctx context.Context, project } return true } else { - logCtx.Trace(cachedSpec) - logCtx.Trace(project.Spec) logCtx.Debugf("AppProject %s is already in sync with source cache", project.Name) } } else { diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 27f7380b..c59b4074 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -15,8 +15,14 @@ package manager import ( + "context" "fmt" "sync" + + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ) type ManagerRole int @@ -187,3 +193,59 @@ func (o *ObservedResources) Len() int { defer o.mu.RUnlock() return len(o.observed) } + +type kubeResource interface { + runtime.Object + metav1.Object +} + +type resourceCache[R kubeResource] interface { + Contains(uid types.UID) bool +} + +type resourceManager[R kubeResource] interface { + Create(ctx context.Context, obj R) (R, error) +} + +// RevertUserInitiatedDeletion detects if a resource deletion was unauthorized and recreates the resource. +// Returns true if the resource was recreated, false otherwise. +func RevertUserInitiatedDeletion[R kubeResource](ctx context.Context, + outbound R, + resCache resourceCache[R], + mgr resourceManager[R], + logCtx *logrus.Entry, +) bool { + + logCtx = logCtx.WithFields(logrus.Fields{ + "resource": outbound.GetName(), + "kind": outbound.GetObjectKind().GroupVersionKind().Kind, + }) + + sourceUID, exists := outbound.GetAnnotations()[SourceUIDAnnotation] + if !exists { + logCtx.Errorf("Source UID annotation not found for resource") + return false + } + + // If the resource is not in the cache, it means it was deleted by an incoming delete event. + // So no need to recreate it. + if !resCache.Contains(types.UID(sourceUID)) { + logCtx.Debugf("Expected deletion detected - allowing it to proceed") + return false + } + + logCtx.Warnf("Unauthorized deletion detected - recreating") + // This is an unauthorized deletion (user-initiated), recreate the resource + resource := outbound.DeepCopyObject().(R) + resource.SetResourceVersion("") + resource.SetDeletionTimestamp(nil) + resource.SetUID(types.UID(sourceUID)) + _, err := mgr.Create(ctx, resource) + if err != nil { + logCtx.WithError(err).Error("failed to recreate resource after unauthorized deletion") + } else { + logCtx.Infof("Recreated resource after unauthorized deletion") + } + + return true +} diff --git a/internal/manager/manager_test.go b/internal/manager/manager_test.go index 907dda5e..dd3a1e97 100644 --- a/internal/manager/manager_test.go +++ b/internal/manager/manager_test.go @@ -15,9 +15,17 @@ package manager import ( + "context" "testing" + "github.com/argoproj-labs/argocd-agent/internal/cache" + argoapp "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) func Test_IsPrincipal(t *testing.T) { @@ -42,3 +50,120 @@ func Test_IsManaged(t *testing.T) { assert.True(t, m.IsManaged()) assert.False(t, m.IsAutonomous()) } + +type fakeManager[R kubeResource] struct { + created R + retErr error +} + +func (f *fakeManager[R]) Create(ctx context.Context, obj R) (R, error) { + f.created = obj + return obj, f.retErr +} + +func newLogger() *logrus.Entry { + l := logrus.New() + l.SetLevel(logrus.DebugLevel) + return logrus.NewEntry(l) +} + +func TestRevertUserInitiatedDeletion(t *testing.T) { + t.Run("Application", func(t *testing.T) { + requires := require.New(t) + + // No annotation -> no recreate + app := &argoapp.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "app", + Namespace: "ns", + }, + } + mgr := &fakeManager[*argoapp.Application]{} + sc := cache.NewSourceCache() + ok := RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + // With annotation but cache miss -> no recreate + app.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("u1"))} + mgr = &fakeManager[*argoapp.Application]{} + ok = RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + // With annotation and cache hit -> recreate + app.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("u2"))} + sc.Application.Set(types.UID("u2"), argoapp.ApplicationSpec{}) + mgr = &fakeManager[*argoapp.Application]{} + ok = RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + requires.True(ok) + requires.NotNil(mgr.created) + requires.Equal(types.UID("u2"), mgr.created.GetUID()) + requires.Equal("", mgr.created.GetResourceVersion()) + requires.Nil(mgr.created.GetDeletionTimestamp()) + }) + + t.Run("AppProject", func(t *testing.T) { + requires := require.New(t) + + proj := &argoapp.AppProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "proj", + Namespace: "argocd", + }, + } + sc := cache.NewSourceCache() + mgr := &fakeManager[*argoapp.AppProject]{} + ok := RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + proj.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("p1"))} + mgr = &fakeManager[*argoapp.AppProject]{} + ok = RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + proj.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("p2"))} + sc.AppProject.Set(types.UID("p2"), argoapp.AppProjectSpec{}) + mgr = &fakeManager[*argoapp.AppProject]{} + ok = RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + requires.True(ok) + requires.NotNil(mgr.created) + requires.Equal(types.UID("p2"), mgr.created.GetUID()) + requires.Equal("", mgr.created.GetResourceVersion()) + requires.Nil(mgr.created.GetDeletionTimestamp()) + }) + + t.Run("Repository Secret", func(t *testing.T) { + requires := require.New(t) + + repo := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo", + Namespace: "argocd", + }, + } + sc := cache.NewSourceCache() + mgr := &fakeManager[*corev1.Secret]{} + ok := RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + repo.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("r1"))} + mgr = &fakeManager[*corev1.Secret]{} + ok = RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + requires.False(ok) + requires.Nil(mgr.created) + + repo.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("r2"))} + sc.Repository.Set(types.UID("r2"), map[string][]byte{"k": {}}) + mgr = &fakeManager[*corev1.Secret]{} + ok = RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + requires.True(ok) + requires.NotNil(mgr.created) + requires.Equal(types.UID("r2"), mgr.created.GetUID()) + requires.Equal("", mgr.created.GetResourceVersion()) + requires.Nil(mgr.created.GetDeletionTimestamp()) + }) +} diff --git a/internal/manager/repository/repository.go b/internal/manager/repository/repository.go index ffb212c3..27784044 100644 --- a/internal/manager/repository/repository.go +++ b/internal/manager/repository/repository.go @@ -18,15 +18,18 @@ import ( "context" "encoding/json" "fmt" + "reflect" "time" "github.com/argoproj-labs/argocd-agent/internal/backend" + "github.com/argoproj-labs/argocd-agent/internal/cache" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/sirupsen/logrus" "github.com/wI2L/jsondiff" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ) @@ -229,6 +232,41 @@ func (m *RepositoryManager) update(ctx context.Context, upsert bool, incoming *c return updated, err } +// RevertRepositoryChanges compares the actual spec with expected spec stored in cache, +// if actual spec doesn't match with cache, then it is reverted to be in sync with cache, which is same as the source cluster. +func (m *RepositoryManager) RevertRepositoryChanges(ctx context.Context, repo *corev1.Secret, repoCache *cache.ResourceCache[map[string][]byte]) bool { + logCtx := log().WithFields(logrus.Fields{ + "component": "RevertRepositoryChanges", + "repository": repo.Name, + "resourceVersion": repo.ResourceVersion, + }) + + sourceUID, exists := repo.Annotations[manager.SourceUIDAnnotation] + if !exists { + return false + } + + if cachedData, ok := repoCache.Get(types.UID(sourceUID)); ok { + logCtx.Debugf("Repository is available in agent cache") + + if isEqual := reflect.DeepEqual(cachedData, repo.Data); !isEqual { + repo.Data = cachedData + logCtx.Infof("Reverting modifications done to repository") + if _, err := m.UpdateManagedRepository(ctx, repo); err != nil { + logCtx.Errorf("Unable to revert modifications done in repository. Error: %v", err) + return false + } + return true + } else { + logCtx.Debugf("Repository is already in sync with source cache") + } + } else { + logCtx.Errorf("Repository is not available in agent cache") + } + + return false +} + func log() *logrus.Entry { return logrus.WithField("component", "RepositoryManager") } diff --git a/principal/callbacks.go b/principal/callbacks.go index 0368e4a0..b8ba749d 100644 --- a/principal/callbacks.go +++ b/principal/callbacks.go @@ -15,8 +15,6 @@ package principal import ( - "fmt" - "github.com/argoproj-labs/argocd-agent/internal/event" "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/appproject" @@ -27,7 +25,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" ) // newAppCallback is executed when a new application event was emitted from @@ -128,7 +125,7 @@ func (s *Server) deleteAppCallback(outbound *v1alpha1.Application) { // Revert user-initiated deletion on autonomous agent applications if isResourceFromAutonomousAgent(outbound) { - if s.revertUserInitiatedDeletion(outbound, logCtx) { + if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.sourceCache.Application, s.appManager, logCtx) { logCtx.Trace("Deleted application is recreated") return } @@ -219,13 +216,10 @@ func (s *Server) updateAppProjectCallback(old *v1alpha1.AppProject, new *v1alpha // Check if this AppProject was created by an autonomous agent if isResourceFromAutonomousAgent(new) { - logCtx.Trace("Reverting modifications on autonomous agent appProjects/////////////////") // Revert modifications on autonomous agent appProjects if s.projectManager.RevertAppProjectChanges(s.ctx, new, s.sourceCache.AppProject) { - logCtx.Trace("Modifications to the appProject are reverted*************") return } - logCtx.Trace("No revertions!!!!!!!!!!!!!") } if len(new.Finalizers) > 0 && len(new.Finalizers) != len(old.Finalizers) { @@ -270,10 +264,10 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) { // Revert user-initiated deletion on autonomous agent applications if isResourceFromAutonomousAgent(outbound) { - if s.revertUserInitiatedProjectDeletion(outbound, logCtx) { + if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.sourceCache.AppProject, s.projectManager, logCtx) { logCtx.Trace("Deleted appProject is recreated") + return } - return } s.resources.Remove(outbound.Namespace, resources.NewResourceKeyFromAppProject(outbound)) @@ -630,57 +624,3 @@ func isResourceFromAutonomousAgent(resource metav1.Object) bool { _, ok := annotations[manager.SourceUIDAnnotation] return ok } - -func (s *Server) revertUserInitiatedDeletion(outbound *v1alpha1.Application, logCtx *logrus.Entry) bool { - appKey := fmt.Sprintf("%s/%s", outbound.Namespace, outbound.Name) - - // Check if this deletion was expected (agent-initiated) - if s.isExpectedDeletion(appKey) { - logCtx.Debugf("Expected deletion from autonomous agent - allowing it to proceed") - // This is a legitimate deletion from the agent, let it proceed normally - return false - } - - logCtx.Warnf("Unauthorized deletion detected for autonomous agent application - recreating") - // This is an unauthorized deletion (user-initiated), recreate the app - app := outbound.DeepCopy() - app.ResourceVersion = "" - app.DeletionTimestamp = nil - sourceUID := outbound.Annotations[manager.SourceUIDAnnotation] - app.SetUID(k8stypes.UID(sourceUID)) - _, err := s.appManager.Create(s.ctx, app) - if err != nil { - logCtx.WithError(err).Error("failed to recreate application after unauthorized deletion") - } else { - logCtx.Infof("Recreated application %s after unauthorized deletion", outbound.Name) - } - - return true -} - -func (s *Server) revertUserInitiatedProjectDeletion(outbound *v1alpha1.AppProject, logCtx *logrus.Entry) bool { - appKey := fmt.Sprintf("%s/%s", outbound.Namespace, outbound.Name) - - // Check if this deletion was expected (agent-initiated) - if s.isExpectedDeletion(appKey) { - logCtx.Debugf("Expected deletion from autonomous agent - allowing it to proceed") - // This is a legitimate deletion from the agent, let it proceed normally - return false - } - - logCtx.Warnf("Unauthorized deletion detected for autonomous agent application - recreating") - // This is an unauthorized deletion (user-initiated), recreate the app - app := outbound.DeepCopy() - app.ResourceVersion = "" - app.DeletionTimestamp = nil - sourceUID := outbound.Annotations[manager.SourceUIDAnnotation] - app.SetUID(k8stypes.UID(sourceUID)) - _, err := s.projectManager.Create(s.ctx, app) - if err != nil { - logCtx.WithError(err).Error("failed to recreate application after unauthorized deletion") - } else { - logCtx.Infof("Recreated application %s after unauthorized deletion", outbound.Name) - } - - return true -} diff --git a/principal/callbacks_test.go b/principal/callbacks_test.go index 9f8a5ab3..dd84e313 100644 --- a/principal/callbacks_test.go +++ b/principal/callbacks_test.go @@ -996,12 +996,11 @@ func TestServer_deleteRepositoryCallback(t *testing.T) { func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { tests := []struct { - name string - app *v1alpha1.Application - expectedDeleted bool // Whether deletion was expected (agent-initiated) - shouldRecreate bool // Whether app should be recreated (unauthorized deletion) - shouldError bool // Whether recreation should fail - setupMocks func(*mocks.Application) + name string + app *v1alpha1.Application + shouldRecreate bool // Whether app should be recreated (unauthorized deletion) + shouldError bool // Whether recreation should fail + setupMocks func(*mocks.Application) }{ { name: "legitimate deletion from autonomous agent - should allow", @@ -1018,9 +1017,8 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { Project: "default", }, }, - expectedDeleted: true, - shouldRecreate: false, - shouldError: false, + shouldRecreate: false, + shouldError: false, setupMocks: func(mockBackend *mocks.Application) { // No mocks needed - deletion should proceed normally }, @@ -1040,9 +1038,8 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { Project: "default", }, }, - expectedDeleted: false, - shouldRecreate: true, - shouldError: false, + shouldRecreate: true, + shouldError: false, setupMocks: func(mockBackend *mocks.Application) { // Mock successful recreation mockBackend.On("Create", mock.Anything, mock.MatchedBy(func(app *v1alpha1.Application) bool { @@ -1074,9 +1071,8 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { Project: "default", }, }, - expectedDeleted: false, - shouldRecreate: true, - shouldError: true, + shouldRecreate: true, + shouldError: true, setupMocks: func(mockBackend *mocks.Application) { // Mock recreation failure mockBackend.On("Create", mock.Anything, mock.Anything).Return(nil, errors.NewInternalError(fmt.Errorf("creation failed"))) @@ -1095,9 +1091,8 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { Project: "default", }, }, - expectedDeleted: false, - shouldRecreate: false, - shouldError: false, + shouldRecreate: false, + shouldError: false, setupMocks: func(mockBackend *mocks.Application) { // No mocks needed - should follow normal deletion flow }, @@ -1116,22 +1111,21 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { // Create server with dependencies s := &Server{ - ctx: context.Background(), - queues: queue.NewSendRecvQueues(), - events: event.NewEventSource("test"), - resources: resources.NewAgentResources(), - appManager: appManager, - expectedDeletions: make(map[string]bool), + ctx: context.Background(), + queues: queue.NewSendRecvQueues(), + events: event.NewEventSource("test"), + resources: resources.NewAgentResources(), + appManager: appManager, + sourceCache: cache.NewSourceCache(), } // Create send queue for the agent err = s.queues.Create(tt.app.Namespace) require.NoError(t, err) - // Pre-mark deletion as expected if needed - if tt.expectedDeleted { - appKey := fmt.Sprintf("%s/%s", tt.app.Namespace, tt.app.Name) - s.markExpectedDeletion(appKey) + if tt.shouldRecreate { + sourceUID := tt.app.Annotations[manager.SourceUIDAnnotation] + s.sourceCache.Application.Set(k8stypes.UID(sourceUID), tt.app.Spec) } // Execute the callback @@ -1152,39 +1146,10 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { // If not autonomous agent app, normal deletion flow should add event to queue assert.Equal(t, 1, sendQ.Len(), "Queue should contain delete event for normal apps") } - - // Verify expected deletion tracking is cleaned up - appKey := fmt.Sprintf("%s/%s", tt.app.Namespace, tt.app.Name) - s.expectedDeletionsLock.RLock() - _, stillTracked := s.expectedDeletions[appKey] - s.expectedDeletionsLock.RUnlock() - assert.False(t, stillTracked, "Expected deletion should be cleaned up after processing") - - mockAppBackend.AssertExpectations(t) }) } } -// TestServer_ExpectedDeletionTracking tests the core deletion tracking functionality -func TestServer_ExpectedDeletionTracking(t *testing.T) { - s := &Server{ - expectedDeletions: make(map[string]bool), - } - - appKey := "autonomous-agent/test-app" - // Initially not expected - assert.False(t, s.isExpectedDeletion(appKey)) - - // Mark as expected - s.markExpectedDeletion(appKey) - - // Should now be expected and consumed - assert.True(t, s.isExpectedDeletion(appKey)) - - // Should be cleaned up after check - assert.False(t, s.isExpectedDeletion(appKey)) -} - func TestServer_updateAppCallback(t *testing.T) { t.Run("managed agent update sends event to queue", func(t *testing.T) { mockBackend := &mocks.Application{} diff --git a/principal/event.go b/principal/event.go index d9d11806..6891f477 100644 --- a/principal/event.go +++ b/principal/event.go @@ -227,16 +227,14 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, s.sourceCache.Application.Delete(incoming.UID) - // Mark this deletion as expected and perform the deletion - appKey := fmt.Sprintf("%s/%s", agentName, incoming.Name) - s.markExpectedDeletion(appKey) - deletionPropagation := backend.DeletePropagationForeground err = s.appManager.Delete(ctx, agentName, incoming, &deletionPropagation) if err != nil { if kerrors.IsNotFound(err) { return nil } + // Restore the cache if the deletion fails + s.sourceCache.Application.Set(incoming.UID, incoming.Spec) return fmt.Errorf("could not delete application %s: %w", incoming.QualifiedName(), err) } logCtx.Infof("Deleted application %s", incoming.QualifiedName()) @@ -373,6 +371,11 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e deletionPropagation := backend.DeletePropagationForeground err := s.projectManager.Delete(ctx, incoming, &deletionPropagation) if err != nil { + if kerrors.IsNotFound(err) { + return nil + } + // Restore the cache if the deletion fails + s.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) return fmt.Errorf("could not delete app-project %s: %w", incoming.Name, err) } logCtx.Infof("Deleted app-project %s", incoming.Name) diff --git a/principal/server.go b/principal/server.go index 8f4ed8c5..4ccf2d34 100644 --- a/principal/server.go +++ b/principal/server.go @@ -146,11 +146,6 @@ type Server struct { resources *resources.AgentResources // resyncStatus indicates whether an agent has been resyned after the principal restarts resyncStatus *resyncStatus - // expectedDeletions tracks applications that are expected to be deleted (agent-initiated) - // This is used to differentiate between user-initiated and agent-initiated deletions - expectedDeletions map[string]bool - // expectedDeletionsLock protects the expectedDeletions map - expectedDeletionsLock sync.RWMutex // notifyOnConnect will notify to run the handlers when an agent connects to the principal notifyOnConnect chan types.Agent // handlers to run when an agent connects to the principal @@ -180,20 +175,19 @@ const defaultRedisProxyListenerAddr = "0.0.0.0:6379" func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace string, opts ...ServerOption) (*Server, error) { s := &Server{ - options: defaultOptions(), - queues: queue.NewSendRecvQueues(), - namespace: namespace, - noauth: noAuthEndpoints, - version: version.New("argocd-agent"), - kubeClient: kubeClient, - resyncStatus: newResyncStatus(), - resources: resources.NewAgentResources(), - notifyOnConnect: make(chan types.Agent), - eventWriters: event.NewEventWritersMap(), - repoToAgents: NewMapToSet(), - projectToRepos: NewMapToSet(), - expectedDeletions: make(map[string]bool), - sourceCache: cache.NewSourceCache(), + options: defaultOptions(), + queues: queue.NewSendRecvQueues(), + namespace: namespace, + noauth: noAuthEndpoints, + version: version.New("argocd-agent"), + kubeClient: kubeClient, + resyncStatus: newResyncStatus(), + resources: resources.NewAgentResources(), + notifyOnConnect: make(chan types.Agent), + eventWriters: event.NewEventWritersMap(), + repoToAgents: NewMapToSet(), + projectToRepos: NewMapToSet(), + sourceCache: cache.NewSourceCache(), } s.ctx, s.ctxCancel = context.WithCancel(ctx) @@ -850,26 +844,6 @@ func (s *Server) TokenIssuerForE2EOnly() issuer.Issuer { return s.issuer } -// markExpectedDeletion marks an application as expected to be deleted (agent-initiated) -func (s *Server) markExpectedDeletion(appKey string) { - s.expectedDeletionsLock.Lock() - defer s.expectedDeletionsLock.Unlock() - s.expectedDeletions[appKey] = true - log().WithField("app_key", appKey).Trace("Marked application for expected deletion") -} - -// isExpectedDeletion checks if an application deletion was expected and removes it from tracking -func (s *Server) isExpectedDeletion(appKey string) bool { - s.expectedDeletionsLock.Lock() - defer s.expectedDeletionsLock.Unlock() - expected, exists := s.expectedDeletions[appKey] - if exists { - delete(s.expectedDeletions, appKey) - log().WithField("app_key", appKey).Trace("Expected deletion observed") - } - return expected -} - func log() *logrus.Entry { return logging.ModuleLogger("server") } diff --git a/test/e2e/appcache_test.go b/test/e2e/appcache_test.go index a9fed26f..218a97cb 100644 --- a/test/e2e/appcache_test.go +++ b/test/e2e/appcache_test.go @@ -25,6 +25,7 @@ import ( argoapp "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -92,6 +93,21 @@ func (suite *CacheTestSuite) Test_RevertManagedClusterChanges() { app.Spec.Destination.Name = "in-cluster" app.Spec.Destination.Server = "" validateAppReverted(suite.Ctx, suite.ManagedAgentClient, &app, agentKey, requires, suite.T()) + + // Case 4: Delete the application directly from the managed-cluster, + // but it should be recreated to be in sync with principal + agentApp := argoapp.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: app.Name, + Namespace: "argocd", + }, + Spec: app.Spec, + } + + err := suite.ManagedAgentClient.Delete(suite.Ctx, &agentApp, metav1.DeleteOptions{}) + requires.NoError(err) + + validateAppReverted(suite.Ctx, suite.ManagedAgentClient, &app, agentKey, requires, suite.T()) } // This test validates the scenario when agent is disconnected with principal and then user tries @@ -289,8 +305,16 @@ func (suite *CacheTestSuite) Test_RevertAppProjectUpdatesManagedMode() { requires.Eventually(func() bool { got := argoapp.AppProject{} err := suite.ManagedAgentClient.Get(suite.Ctx, projKey, &got, metav1.GetOptions{}) - fmt.Println("err", err) - fmt.Println("got", got.Spec.Description) + return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) + }, 30*time.Second, 1*time.Second) + + // Delete the appProject from the workload cluster and ensure it is recreated to be in sync with the control-plane + err = suite.ManagedAgentClient.Delete(suite.Ctx, appProject, metav1.DeleteOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := argoapp.AppProject{} + err := suite.ManagedAgentClient.Get(suite.Ctx, projKey, &got, metav1.GetOptions{}) return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) }, 30*time.Second, 1*time.Second) } @@ -327,10 +351,82 @@ func (suite *CacheTestSuite) Test_RevertAppProjectUpdatesAutonomousMode() { requires.Eventually(func() bool { got := argoapp.AppProject{} err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &got, metav1.GetOptions{}) - fmt.Println("err", err) - fmt.Println("got", got.Spec.Description) return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) }, 30*time.Second, 1*time.Second) + + // Delete the appProject from the control-plane and ensure it is recreated to be in sync with the agent + principalAppProject := argoapp.AppProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: autonomousProjName, + Namespace: "argocd", + }, + Spec: appProject.Spec, + } + err = suite.PrincipalClient.Delete(suite.Ctx, &principalAppProject, metav1.DeleteOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := argoapp.AppProject{} + err := suite.PrincipalClient.Get(suite.Ctx, principalKey, &got, metav1.GetOptions{}) + return err == nil && reflect.DeepEqual(appProject.Spec.Description, got.Spec.Description) + }, 30*time.Second, 1*time.Second) +} + +func (suite *CacheTestSuite) Test_RevertRepositoryUpdatesManagedMode() { + requires := suite.Require() + + // Create an appProject on the control-plane cluster and ensure it is synced to the workload cluster + appProject := sampleAppProject() + + err := suite.PrincipalClient.Create(suite.Ctx, appProject, metav1.CreateOptions{}) + requires.NoError(err) + + projKey := types.NamespacedName{Name: appProject.Name, Namespace: "argocd"} + + requires.Eventually(func() bool { + appProject := argoapp.AppProject{} + err := suite.ManagedAgentClient.Get(suite.Ctx, projKey, &appProject, metav1.GetOptions{}) + if err != nil { + fmt.Println("error getting appProject", err) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Create a repository on the control-plane cluster and ensure it is synced to the workload cluster + repository := sampleRepository() + err = suite.PrincipalClient.Create(suite.Ctx, repository, metav1.CreateOptions{}) + requires.NoError(err) + repoKey := types.NamespacedName{Name: repository.Name, Namespace: "argocd"} + + requires.Eventually(func() bool { + repository := corev1.Secret{} + err := suite.ManagedAgentClient.Get(suite.Ctx, repoKey, &repository, metav1.GetOptions{}) + return err == nil + }, 30*time.Second, 1*time.Second) + + // Modify the repository on the workload cluster and ensure the change is reverted to be in sync with the control-plane + err = suite.ManagedAgentClient.EnsureRepositoryUpdate(suite.Ctx, repoKey, func(repository *corev1.Secret) error { + repository.Data["url"] = []byte("https://github.com/example/repo-updated.git") + return nil + }, metav1.UpdateOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := corev1.Secret{} + err := suite.ManagedAgentClient.Get(suite.Ctx, repoKey, &got, metav1.GetOptions{}) + return err == nil && reflect.DeepEqual(repository.Data["url"], got.Data["url"]) + }, 30*time.Second, 1*time.Second) + + // Delete the repository from the workload cluster and ensure it is recreated to be in sync with the control-plane + err = suite.ManagedAgentClient.Delete(suite.Ctx, repository, metav1.DeleteOptions{}) + requires.NoError(err) + + requires.Eventually(func() bool { + got := corev1.Secret{} + err := suite.ManagedAgentClient.Get(suite.Ctx, repoKey, &got, metav1.GetOptions{}) + return err == nil && reflect.DeepEqual(repository.Data["url"], got.Data["url"]) + }, 30*time.Second, 1*time.Second) } func createApp(ctx context.Context, client fixture.KubeClient, requires *require.Assertions, opts ...struct{ Name, Namespace string }) argoapp.Application { diff --git a/test/e2e/fixture/fixture.go b/test/e2e/fixture/fixture.go index d1cc701e..6afc5bf8 100644 --- a/test/e2e/fixture/fixture.go +++ b/test/e2e/fixture/fixture.go @@ -259,6 +259,14 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient } } + isFromSource := func(annotations map[string]string) bool { + if annotations == nil { + return false + } + _, ok := annotations[manager.SourceUIDAnnotation] + return ok + } + // Delete all appProjects from the autonomous agent appProjectList := argoapp.AppProjectList{} err = autonomousAgentClient.List(ctx, "argocd", &appProjectList, metav1.ListOptions{}) @@ -266,7 +274,8 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, appProject := range appProjectList.Items { - if appProject.Name == appproject.DefaultAppProjectName { + if appProject.Name == appproject.DefaultAppProjectName || + isFromSource(appProject.GetAnnotations()) { continue } @@ -295,7 +304,8 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, appProject := range appProjectList.Items { - if appProject.Name == appproject.DefaultAppProjectName { + if appProject.Name == appproject.DefaultAppProjectName || + isFromSource(appProject.GetAnnotations()) { continue } @@ -348,6 +358,10 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, repo := range repoList.Items { + if isFromSource(repo.GetAnnotations()) { + continue + } + err = EnsureDeletion(ctx, principalClient, &repo) if err != nil { return err From 9306f9d0af385525c0cbcd18b524d42239c31c9b Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 8 Oct 2025 12:27:16 +0530 Subject: [PATCH 3/6] retry update if there a conflict Signed-off-by: Chetan Banavikalmutt --- test/e2e/fixture/fixture.go | 47 +++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/test/e2e/fixture/fixture.go b/test/e2e/fixture/fixture.go index 6afc5bf8..63d3d95f 100644 --- a/test/e2e/fixture/fixture.go +++ b/test/e2e/fixture/fixture.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ) const ( @@ -129,8 +130,10 @@ func EnsureDeletion(ctx context.Context, kclient KubeClient, obj KubeObject) err // After X seconds, give up waiting for the child objects to be deleted, and remove any finalizers on the object if len(obj.GetFinalizers()) > 0 { - obj.SetFinalizers(nil) - if err := kclient.Update(ctx, obj, metav1.UpdateOptions{}); err != nil { + err := EnsureUpdate(ctx, kclient, obj, func(obj KubeObject) { + obj.SetFinalizers(nil) + }) + if err != nil { return err } } @@ -167,6 +170,33 @@ func WaitForDeletion(ctx context.Context, kclient KubeClient, obj KubeObject) er return fmt.Errorf("WaitForDeletion: timeout waiting for deletion of %s/%s", key.Namespace, key.Name) } +// EnsureUpdate will ensure that the object is updated by retrying if there is a conflict. +func EnsureUpdate(ctx context.Context, kclient KubeClient, obj KubeObject, updateFn func(obj KubeObject)) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + key := types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()} + err := kclient.Get(ctx, key, obj, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + // Apply the update function to the object + updateFn(obj) + + err = kclient.Update(ctx, obj, metav1.UpdateOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + return nil + }) +} + func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient KubeClient, autonomousAgentClient KubeClient, clusterDetails *ClusterDetails) error { var list argoapp.ApplicationList @@ -375,19 +405,6 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient } } - // Delete all repositories from the autonomous agent - repoList = corev1.SecretList{} - err = autonomousAgentClient.List(ctx, "argocd", &repoList, repoListOpts) - if err != nil { - return err - } - for _, repo := range repoList.Items { - err = EnsureDeletion(ctx, autonomousAgentClient, &repo) - if err != nil { - return err - } - } - // Delete all repositories from the managed agent repoList = corev1.SecretList{} err = managedAgentClient.List(ctx, "argocd", &repoList, repoListOpts) From 42dca77578eb039b73b32f39f3b24e972c6b9657 Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Wed, 8 Oct 2025 20:36:16 +0530 Subject: [PATCH 4/6] use a deletion tracker to revert invalid deletions Signed-off-by: Chetan Banavikalmutt Assisted-by: Cursor --- agent/agent.go | 11 ++++-- agent/inbound.go | 37 +++++++++++-------- agent/outbound.go | 6 +-- internal/manager/application/application.go | 16 ++++++++ internal/manager/manager.go | 41 +++++++++++++++++++-- internal/manager/manager_test.go | 35 +++++++++--------- principal/callbacks.go | 4 +- principal/callbacks_test.go | 7 +++- principal/event.go | 16 ++++---- principal/server.go | 6 +++ 10 files changed, 124 insertions(+), 55 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 806b147e..3ec0cf4a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -107,6 +107,10 @@ type Agent struct { // sourceCache is a cache of resources from the source. We use it to revert any changes made to the local resources. sourceCache *cache.SourceCache + + // deletions tracks valid deletions from the source. + // This is used to differentiate between valid and invalid deletions + deletions *manager.DeletionTracker } const defaultQueueName = "default" @@ -132,7 +136,9 @@ type AgentOption func(*Agent) error // options. func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) { a := &Agent{ - version: version.New("argocd-agent"), + version: version.New("argocd-agent"), + deletions: manager.NewDeletionTracker(), + sourceCache: cache.NewSourceCache(), } a.infStopCh = make(chan struct{}) a.namespace = namespace @@ -200,6 +206,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri appManagerOpts := []application.ApplicationManagerOption{ application.WithRole(manager.ManagerRoleAgent), application.WithMode(managerMode), + application.WithDeletionTracker(a.deletions), } if a.options.metricsPort > 0 { @@ -313,8 +320,6 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri } a.clusterCache = clusterCache - a.sourceCache = cache.NewSourceCache() - return a, nil } diff --git a/agent/inbound.go b/agent/inbound.go index 03921f57..aa9609f8 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -178,6 +178,9 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error { logCtx.Errorf("Error updating application: %v", err) } case event.Delete: + if a.mode == types.AgentModeManaged { + a.deletions.MarkExpected(incomingApp.UID) + } err = a.deleteApplication(incomingApp) if err != nil { logCtx.Errorf("Error deleting application: %v", err) @@ -261,6 +264,9 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error { logCtx.Errorf("Error updating appproject: %v", err) } case event.Delete: + if a.mode == types.AgentModeManaged { + a.deletions.MarkExpected(incomingAppProject.UID) + } err = a.deleteAppProject(incomingAppProject) if err != nil { logCtx.Errorf("Error deleting appproject: %v", err) @@ -346,6 +352,9 @@ func (a *Agent) processIncomingRepository(ev *event.Event) error { } case event.Delete: + if a.mode == types.AgentModeManaged { + a.deletions.MarkExpected(incomingRepo.UID) + } err = a.deleteRepository(incomingRepo) if err != nil { logCtx.Errorf("Error deleting repository: %v", err) @@ -543,24 +552,23 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { logCtx.Infof("Deleting application") - if a.mode == types.AgentModeManaged { - a.sourceCache.Application.Delete(app.UID) - } - deletionPropagation := backend.DeletePropagationBackground err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("application is not found, perhaps it is already deleted") + if a.mode == types.AgentModeManaged { + a.sourceCache.Application.Delete(app.UID) + } return nil } - // Restore the cache if the deletion fails - if a.mode == types.AgentModeManaged { - a.sourceCache.Application.Set(app.UID, app.Spec) - } return err } + if a.mode == types.AgentModeManaged { + a.sourceCache.Application.Delete(app.UID) + } + err = a.appManager.Unmanage(app.QualifiedName()) if err != nil { log().Warnf("Could not unmanage app %s: %v", app.QualifiedName(), err) @@ -660,8 +668,6 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { logCtx.Infof("Deleting appProject") - a.sourceCache.AppProject.Delete(project.UID) - deletionPropagation := backend.DeletePropagationBackground err := a.projectManager.Delete(a.context, project, &deletionPropagation) if err != nil { @@ -670,11 +676,11 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { a.sourceCache.AppProject.Delete(project.UID) return nil } - // Restore the cache if the deletion fails - a.sourceCache.AppProject.Set(project.UID, project.Spec) return err } + a.sourceCache.AppProject.Delete(project.UID) + err = a.projectManager.Unmanage(project.Name) if err != nil { log().Warnf("Could not unmanage appProject %s: %v", project.Name, err) @@ -770,20 +776,19 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error { logCtx.Infof("Deleting repository") - a.sourceCache.Repository.Delete(repo.UID) - deletionPropagation := backend.DeletePropagationBackground err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("repository is not found, perhaps it is already deleted") + a.sourceCache.Repository.Delete(repo.UID) return nil } - // Restore the cache if the deletion fails - a.sourceCache.Repository.Set(repo.UID, repo.Data) return err } + a.sourceCache.Repository.Delete(repo.UID) + err = a.repoManager.Unmanage(repo.Name) if err != nil { log().Warnf("Could not unmanage repository %s: %v", repo.Name, err) diff --git a/agent/outbound.go b/agent/outbound.go index 3e403093..bf96384a 100644 --- a/agent/outbound.go +++ b/agent/outbound.go @@ -119,7 +119,7 @@ func (a *Agent) addAppDeletionToQueue(app *v1alpha1.Application) { logCtx.Debugf("Delete app event") if isResourceFromPrincipal(app) { - if manager.RevertUserInitiatedDeletion(a.context, app, a.sourceCache.Application, a.appManager, logCtx) { + if manager.RevertUserInitiatedDeletion(a.context, app, a.deletions, a.appManager, logCtx) { logCtx.Trace("Deleted app is recreated") return } @@ -258,7 +258,7 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) { logCtx.Debugf("Delete appProject event") if isResourceFromPrincipal(appProject) { - if manager.RevertUserInitiatedDeletion(a.context, appProject, a.sourceCache.AppProject, a.projectManager, logCtx) { + if manager.RevertUserInitiatedDeletion(a.context, appProject, a.deletions, a.projectManager, logCtx) { logCtx.Trace("Deleted appProject is recreated") return } @@ -392,7 +392,7 @@ func (a *Agent) handleRepositoryDeletion(repo *corev1.Secret) { logCtx.Debugf("Delete repository event") - if manager.RevertUserInitiatedDeletion(a.context, repo, a.sourceCache.Repository, a.repoManager, logCtx) { + if manager.RevertUserInitiatedDeletion(a.context, repo, a.deletions, a.repoManager, logCtx) { logCtx.Trace("Deleted repository is recreated") return } diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 2b430c10..a446ac4e 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -58,6 +58,8 @@ type ApplicationManager struct { manager.ManagedResources // ObservedResources, key is qualified name of the application, value is the Application's .metadata.resourceValue field manager.ObservedResources + // deletions tracks valid deletions from the source. + deletions *manager.DeletionTracker } // ApplicationManagerOption is a callback function to set an option to the Application @@ -85,6 +87,13 @@ func WithMode(mode manager.ManagerMode) ApplicationManagerOption { } } +// WithDeletionTracker is used to track valid deletions from the source. +func WithDeletionTracker(d *manager.DeletionTracker) ApplicationManagerOption { + return func(m *ApplicationManager) { + m.deletions = d + } +} + // NewApplicationManager initializes and returns a new Manager with the given backend and // options. func NewApplicationManager(be backend.Application, namespace string, opts ...ApplicationManagerOption) (*ApplicationManager, error) { @@ -258,6 +267,13 @@ func (m *ApplicationManager) UpdateManagedApp(ctx context.Context, incoming *v1a if deletionTimestampChanged { logCtx.Infof("deletionTimestamp of managed agent changed from nil to non-nil, so deleting Application") + // Mark this as a valid deletion so the callback does not treat it as a user-initiated deletion. + if m.deletions != nil { + if v, ok := updated.Annotations[manager.SourceUIDAnnotation]; ok { + m.deletions.MarkExpected(ty.UID(v)) + } + } + if err := m.applicationBackend.Delete(ctx, incoming.Name, incoming.Namespace, ptr.To(backend.DeletePropagationForeground)); err != nil { return nil, err } diff --git a/internal/manager/manager.go b/internal/manager/manager.go index c59b4074..b30dfee1 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -211,7 +211,7 @@ type resourceManager[R kubeResource] interface { // Returns true if the resource was recreated, false otherwise. func RevertUserInitiatedDeletion[R kubeResource](ctx context.Context, outbound R, - resCache resourceCache[R], + deletions *DeletionTracker, mgr resourceManager[R], logCtx *logrus.Entry, ) bool { @@ -227,9 +227,8 @@ func RevertUserInitiatedDeletion[R kubeResource](ctx context.Context, return false } - // If the resource is not in the cache, it means it was deleted by an incoming delete event. - // So no need to recreate it. - if !resCache.Contains(types.UID(sourceUID)) { + // Check if this deletion is coming from the source + if deletions.IsExpected(types.UID(sourceUID)) { logCtx.Debugf("Expected deletion detected - allowing it to proceed") return false } @@ -249,3 +248,37 @@ func RevertUserInitiatedDeletion[R kubeResource](ctx context.Context, return true } + +// DeletionTracker tracks expected deletions from the source. +type DeletionTracker struct { + mu sync.RWMutex + expected map[types.UID]bool +} + +func NewDeletionTracker() *DeletionTracker { + return &DeletionTracker{ + expected: make(map[types.UID]bool), + } +} + +func (d *DeletionTracker) MarkExpected(uid types.UID) { + d.mu.Lock() + defer d.mu.Unlock() + d.expected[uid] = true +} + +func (d *DeletionTracker) IsExpected(uid types.UID) bool { + d.mu.Lock() + defer d.mu.Unlock() + _, exists := d.expected[uid] + if exists { + delete(d.expected, uid) + } + return exists +} + +func (d *DeletionTracker) Unmark(uid types.UID) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.expected, uid) +} diff --git a/internal/manager/manager_test.go b/internal/manager/manager_test.go index dd3a1e97..81ae5a33 100644 --- a/internal/manager/manager_test.go +++ b/internal/manager/manager_test.go @@ -18,7 +18,6 @@ import ( "context" "testing" - "github.com/argoproj-labs/argocd-agent/internal/cache" argoapp "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -79,23 +78,23 @@ func TestRevertUserInitiatedDeletion(t *testing.T) { }, } mgr := &fakeManager[*argoapp.Application]{} - sc := cache.NewSourceCache() - ok := RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + deletions := NewDeletionTracker() + ok := RevertUserInitiatedDeletion(context.Background(), app, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) - // With annotation but cache miss -> no recreate + // With annotation but valid deletion -> no recreate app.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("u1"))} mgr = &fakeManager[*argoapp.Application]{} - ok = RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + deletions.MarkExpected(types.UID("u1")) + ok = RevertUserInitiatedDeletion(context.Background(), app, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) - // With annotation and cache hit -> recreate + // With annotation and invalid deletion -> recreate app.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("u2"))} - sc.Application.Set(types.UID("u2"), argoapp.ApplicationSpec{}) mgr = &fakeManager[*argoapp.Application]{} - ok = RevertUserInitiatedDeletion(context.Background(), app, sc.Application, mgr, newLogger()) + ok = RevertUserInitiatedDeletion(context.Background(), app, deletions, mgr, newLogger()) requires.True(ok) requires.NotNil(mgr.created) requires.Equal(types.UID("u2"), mgr.created.GetUID()) @@ -112,22 +111,22 @@ func TestRevertUserInitiatedDeletion(t *testing.T) { Namespace: "argocd", }, } - sc := cache.NewSourceCache() + deletions := NewDeletionTracker() mgr := &fakeManager[*argoapp.AppProject]{} - ok := RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + ok := RevertUserInitiatedDeletion(context.Background(), proj, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) proj.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("p1"))} mgr = &fakeManager[*argoapp.AppProject]{} - ok = RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + deletions.MarkExpected(types.UID("p1")) + ok = RevertUserInitiatedDeletion(context.Background(), proj, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) proj.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("p2"))} - sc.AppProject.Set(types.UID("p2"), argoapp.AppProjectSpec{}) mgr = &fakeManager[*argoapp.AppProject]{} - ok = RevertUserInitiatedDeletion(context.Background(), proj, sc.AppProject, mgr, newLogger()) + ok = RevertUserInitiatedDeletion(context.Background(), proj, deletions, mgr, newLogger()) requires.True(ok) requires.NotNil(mgr.created) requires.Equal(types.UID("p2"), mgr.created.GetUID()) @@ -144,22 +143,22 @@ func TestRevertUserInitiatedDeletion(t *testing.T) { Namespace: "argocd", }, } - sc := cache.NewSourceCache() + deletions := NewDeletionTracker() mgr := &fakeManager[*corev1.Secret]{} - ok := RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + ok := RevertUserInitiatedDeletion(context.Background(), repo, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) repo.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("r1"))} mgr = &fakeManager[*corev1.Secret]{} - ok = RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + deletions.MarkExpected(types.UID("r1")) + ok = RevertUserInitiatedDeletion(context.Background(), repo, deletions, mgr, newLogger()) requires.False(ok) requires.Nil(mgr.created) repo.Annotations = map[string]string{SourceUIDAnnotation: string(types.UID("r2"))} - sc.Repository.Set(types.UID("r2"), map[string][]byte{"k": {}}) mgr = &fakeManager[*corev1.Secret]{} - ok = RevertUserInitiatedDeletion(context.Background(), repo, sc.Repository, mgr, newLogger()) + ok = RevertUserInitiatedDeletion(context.Background(), repo, deletions, mgr, newLogger()) requires.True(ok) requires.NotNil(mgr.created) requires.Equal(types.UID("r2"), mgr.created.GetUID()) diff --git a/principal/callbacks.go b/principal/callbacks.go index b8ba749d..acb798f7 100644 --- a/principal/callbacks.go +++ b/principal/callbacks.go @@ -125,7 +125,7 @@ func (s *Server) deleteAppCallback(outbound *v1alpha1.Application) { // Revert user-initiated deletion on autonomous agent applications if isResourceFromAutonomousAgent(outbound) { - if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.sourceCache.Application, s.appManager, logCtx) { + if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.deletions, s.appManager, logCtx) { logCtx.Trace("Deleted application is recreated") return } @@ -264,7 +264,7 @@ func (s *Server) deleteAppProjectCallback(outbound *v1alpha1.AppProject) { // Revert user-initiated deletion on autonomous agent applications if isResourceFromAutonomousAgent(outbound) { - if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.sourceCache.AppProject, s.projectManager, logCtx) { + if manager.RevertUserInitiatedDeletion(s.ctx, outbound, s.deletions, s.projectManager, logCtx) { logCtx.Trace("Deleted appProject is recreated") return } diff --git a/principal/callbacks_test.go b/principal/callbacks_test.go index dd84e313..08b9547e 100644 --- a/principal/callbacks_test.go +++ b/principal/callbacks_test.go @@ -1117,15 +1117,18 @@ func TestServer_deleteAppCallback_AutonomousAgent(t *testing.T) { resources: resources.NewAgentResources(), appManager: appManager, sourceCache: cache.NewSourceCache(), + deletions: manager.NewDeletionTracker(), } // Create send queue for the agent err = s.queues.Create(tt.app.Namespace) require.NoError(t, err) + sourceUID := tt.app.Annotations[manager.SourceUIDAnnotation] if tt.shouldRecreate { - sourceUID := tt.app.Annotations[manager.SourceUIDAnnotation] - s.sourceCache.Application.Set(k8stypes.UID(sourceUID), tt.app.Spec) + s.deletions.Unmark(k8stypes.UID(sourceUID)) + } else { + s.deletions.MarkExpected(k8stypes.UID(sourceUID)) } // Execute the callback diff --git a/principal/event.go b/principal/event.go index 6891f477..39131d64 100644 --- a/principal/event.go +++ b/principal/event.go @@ -224,19 +224,20 @@ func (s *Server) processApplicationEvent(ctx context.Context, agentName string, // App deletion case event.Delete.String(): if agentMode.IsAutonomous() { - - s.sourceCache.Application.Delete(incoming.UID) + s.deletions.MarkExpected(incoming.UID) deletionPropagation := backend.DeletePropagationForeground err = s.appManager.Delete(ctx, agentName, incoming, &deletionPropagation) if err != nil { if kerrors.IsNotFound(err) { + s.sourceCache.Application.Delete(incoming.UID) return nil } - // Restore the cache if the deletion fails - s.sourceCache.Application.Set(incoming.UID, incoming.Spec) return fmt.Errorf("could not delete application %s: %w", incoming.QualifiedName(), err) } + + s.sourceCache.Application.Delete(incoming.UID) + logCtx.Infof("Deleted application %s", incoming.QualifiedName()) } else if agentMode.IsManaged() { @@ -366,18 +367,19 @@ func (s *Server) processAppProjectEvent(ctx context.Context, agentName string, e incoming.SetNamespace(s.namespace) - s.sourceCache.AppProject.Delete(incoming.UID) + s.deletions.MarkExpected(incoming.UID) deletionPropagation := backend.DeletePropagationForeground err := s.projectManager.Delete(ctx, incoming, &deletionPropagation) if err != nil { if kerrors.IsNotFound(err) { + s.sourceCache.AppProject.Delete(incoming.UID) return nil } - // Restore the cache if the deletion fails - s.sourceCache.AppProject.Set(incoming.UID, incoming.Spec) return fmt.Errorf("could not delete app-project %s: %w", incoming.Name, err) } + + s.sourceCache.AppProject.Delete(incoming.UID) logCtx.Infof("Deleted app-project %s", incoming.Name) default: return fmt.Errorf("unable to process event of type %s", ev.Type()) diff --git a/principal/server.go b/principal/server.go index 4ccf2d34..d836c0cb 100644 --- a/principal/server.go +++ b/principal/server.go @@ -153,7 +153,12 @@ type Server struct { eventWriters *event.EventWritersMap + // sourceCache is a cache of resources from the source. We use it to revert any changes made to the local resources. sourceCache *cache.SourceCache + + // deletions tracks valid deletions from the source. + // This is used to differentiate between valid and invalid deletions + deletions *manager.DeletionTracker } type handlersOnConnect func(agent types.Agent) error @@ -188,6 +193,7 @@ func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace repoToAgents: NewMapToSet(), projectToRepos: NewMapToSet(), sourceCache: cache.NewSourceCache(), + deletions: manager.NewDeletionTracker(), } s.ctx, s.ctxCancel = context.WithCancel(ctx) From ea2554df6e3a91240e8f6430805d9a271cfdea9d Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 9 Oct 2025 16:04:12 +0530 Subject: [PATCH 5/6] use source UID of the existing resource Signed-off-by: Chetan Banavikalmutt --- agent/inbound.go | 43 ++++++++++++++++------- agent/inbound_test.go | 40 ++++++++++----------- internal/manager/manager.go | 4 --- internal/manager/repository/repository.go | 4 +++ internal/resync/resync.go | 3 ++ 5 files changed, 58 insertions(+), 36 deletions(-) diff --git a/agent/inbound.go b/agent/inbound.go index aa9609f8..5860c1bb 100644 --- a/agent/inbound.go +++ b/agent/inbound.go @@ -29,6 +29,7 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" ) @@ -178,9 +179,6 @@ func (a *Agent) processIncomingApplication(ev *event.Event) error { logCtx.Errorf("Error updating application: %v", err) } case event.Delete: - if a.mode == types.AgentModeManaged { - a.deletions.MarkExpected(incomingApp.UID) - } err = a.deleteApplication(incomingApp) if err != nil { logCtx.Errorf("Error deleting application: %v", err) @@ -264,9 +262,6 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error { logCtx.Errorf("Error updating appproject: %v", err) } case event.Delete: - if a.mode == types.AgentModeManaged { - a.deletions.MarkExpected(incomingAppProject.UID) - } err = a.deleteAppProject(incomingAppProject) if err != nil { logCtx.Errorf("Error deleting appproject: %v", err) @@ -352,9 +347,6 @@ func (a *Agent) processIncomingRepository(ev *event.Event) error { } case event.Delete: - if a.mode == types.AgentModeManaged { - a.deletions.MarkExpected(incomingRepo.UID) - } err = a.deleteRepository(incomingRepo) if err != nil { logCtx.Errorf("Error deleting repository: %v", err) @@ -552,8 +544,17 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error { logCtx.Infof("Deleting application") + // Fetch the source UID of the existing app to mark it as expected deletion. + app, err := a.appManager.Get(a.context, app.Name, app.Namespace) + if err != nil { + return err + } + + sourceUID := app.Annotations[manager.SourceUIDAnnotation] + a.deletions.MarkExpected(ktypes.UID(sourceUID)) + deletionPropagation := backend.DeletePropagationBackground - err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation) + err = a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("application is not found, perhaps it is already deleted") @@ -668,8 +669,17 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error { logCtx.Infof("Deleting appProject") + // Fetch the source UID of the existing appProject to mark it as expected deletion. + project, err := a.projectManager.Get(a.context, project.Name, project.Namespace) + if err != nil { + return err + } + + sourceUID := project.Annotations[manager.SourceUIDAnnotation] + a.deletions.MarkExpected(ktypes.UID(sourceUID)) + deletionPropagation := backend.DeletePropagationBackground - err := a.projectManager.Delete(a.context, project, &deletionPropagation) + err = a.projectManager.Delete(a.context, project, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("appProject not found, perhaps it is already deleted") @@ -776,8 +786,17 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error { logCtx.Infof("Deleting repository") + // Fetch the source UID of the existing repository to mark it as expected deletion. + repo, err := a.repoManager.Get(a.context, repo.Name, repo.Namespace) + if err != nil { + return err + } + + sourceUID := repo.Annotations[manager.SourceUIDAnnotation] + a.deletions.MarkExpected(ktypes.UID(sourceUID)) + deletionPropagation := backend.DeletePropagationBackground - err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation) + err = a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation) if err != nil { if apierrors.IsNotFound(err) { logCtx.Debug("repository is not found, perhaps it is already deleted") diff --git a/agent/inbound_test.go b/agent/inbound_test.go index 10d0dbce..be964133 100644 --- a/agent/inbound_test.go +++ b/agent/inbound_test.go @@ -171,7 +171,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old app, and create a new app. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -179,7 +179,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new app has the updated source UID annotation. - appInterface := be.Calls[2].ReturnArguments[0] + appInterface := be.Calls[3].ReturnArguments[0] latestApp, ok := appInterface.(*v1alpha1.Application) require.True(t, ok) require.Equal(t, string(incomingApp.UID), latestApp.Annotations[manager.SourceUIDAnnotation]) @@ -229,7 +229,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old app, and create a new app. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -237,7 +237,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new app has the updated source UID annotation. - appInterface := be.Calls[2].ReturnArguments[0] + appInterface := be.Calls[3].ReturnArguments[0] latestApp, ok := appInterface.(*v1alpha1.Application) require.True(t, ok) @@ -334,7 +334,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID and delete old app - expectedCalls := []string{"Get", "Delete"} + expectedCalls := []string{"Get", "Get", "Delete"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -414,7 +414,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old appProject, and create a new appProject. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -422,7 +422,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new app has the updated source UID annotation. - appInterface := be.Calls[2].ReturnArguments[0] + appInterface := be.Calls[3].ReturnArguments[0] latestAppProject, ok := appInterface.(*v1alpha1.AppProject) require.True(t, ok) require.Equal(t, string(incomingAppProject.UID), latestAppProject.Annotations[manager.SourceUIDAnnotation]) @@ -472,7 +472,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old appProject, and create a new appProject. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -480,7 +480,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new appProject has the updated source UID annotation. - appInterface := be.Calls[2].ReturnArguments[0] + appInterface := be.Calls[3].ReturnArguments[0] latestAppProject, ok := appInterface.(*v1alpha1.AppProject) require.True(t, ok) @@ -537,7 +537,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID and delete old appProject - expectedCalls := []string{"Get", "Delete"} + expectedCalls := []string{"Get", "Get", "Delete"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -584,7 +584,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.NoError(t, err) // Verify the sequence: Get (to compare UID), Delete (existing), Create (new) - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range beMissing.Calls { gotCalls = append(gotCalls, call.Method) @@ -592,7 +592,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Verify the created AppProject has the correct source UID annotation - appInterface := beMissing.Calls[2].ReturnArguments[0] + appInterface := beMissing.Calls[3].ReturnArguments[0] latestAppProject, ok := appInterface.(*v1alpha1.AppProject) require.True(t, ok) require.Equal(t, string(incomingAppProject.UID), latestAppProject.Annotations[manager.SourceUIDAnnotation]) @@ -635,7 +635,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.NoError(t, err) // Verify the sequence: Get (to compare UID), Delete (existing), Create (new) - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range beMissing.Calls { gotCalls = append(gotCalls, call.Method) @@ -643,7 +643,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Verify the created AppProject has the correct source UID annotation - appInterface := beMissing.Calls[2].ReturnArguments[0] + appInterface := beMissing.Calls[3].ReturnArguments[0] latestAppProject, ok := appInterface.(*v1alpha1.AppProject) require.True(t, ok) require.Equal(t, string(incomingAppProject.UID), latestAppProject.Annotations[manager.SourceUIDAnnotation]) @@ -684,7 +684,7 @@ func Test_ProcessIncomingAppProjectWithUIDMismatch(t *testing.T) { require.NoError(t, err) // Verify the sequence: Get (to compare UID), Delete (existing) - expectedCalls := []string{"Get", "Delete"} + expectedCalls := []string{"Get", "Get", "Delete"} gotCalls := []string{} for _, call := range beMissing.Calls { gotCalls = append(gotCalls, call.Method) @@ -1141,7 +1141,7 @@ func Test_ProcessIncomingRepositoryWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old repo, and create a new repo. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -1149,7 +1149,7 @@ func Test_ProcessIncomingRepositoryWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new repo has the updated source UID annotation. - repoInterface := be.Calls[2].ReturnArguments[0] + repoInterface := be.Calls[3].ReturnArguments[0] latestRepo, ok := repoInterface.(*corev1.Secret) require.True(t, ok) require.Equal(t, string(incomingRepo.UID), latestRepo.Annotations[manager.SourceUIDAnnotation]) @@ -1228,7 +1228,7 @@ func Test_ProcessIncomingRepositoryWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID, delete old repo, and create a new repo. - expectedCalls := []string{"Get", "Delete", "Create"} + expectedCalls := []string{"Get", "Get", "Delete", "Create"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) @@ -1236,7 +1236,7 @@ func Test_ProcessIncomingRepositoryWithUIDMismatch(t *testing.T) { require.Equal(t, expectedCalls, gotCalls) // Check if the new repository has the same source UID annotation as the incoming repository. - repoInterface := be.Calls[2].ReturnArguments[0] + repoInterface := be.Calls[3].ReturnArguments[0] latestRepo, ok := repoInterface.(*corev1.Secret) require.True(t, ok) @@ -1299,7 +1299,7 @@ func Test_ProcessIncomingRepositoryWithUIDMismatch(t *testing.T) { // Check if the API calls were made in the same order: // compare the UID and delete old repository - expectedCalls := []string{"Get", "Delete"} + expectedCalls := []string{"Get", "Get", "Delete"} gotCalls := []string{} for _, call := range be.Calls { gotCalls = append(gotCalls, call.Method) diff --git a/internal/manager/manager.go b/internal/manager/manager.go index b30dfee1..b86b410e 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -199,10 +199,6 @@ type kubeResource interface { metav1.Object } -type resourceCache[R kubeResource] interface { - Contains(uid types.UID) bool -} - type resourceManager[R kubeResource] interface { Create(ctx context.Context, obj R) (R, error) } diff --git a/internal/manager/repository/repository.go b/internal/manager/repository/repository.go index 27784044..0575a944 100644 --- a/internal/manager/repository/repository.go +++ b/internal/manager/repository/repository.go @@ -118,6 +118,10 @@ func (m *RepositoryManager) List(ctx context.Context, selector backend.Repositor return m.backend.List(ctx, selector) } +func (m *RepositoryManager) Get(ctx context.Context, name, namespace string) (*corev1.Secret, error) { + return m.backend.Get(ctx, name, namespace) +} + // UpdateManagedRepository updates the Repository resource on the agent when it is in // managed mode. // diff --git a/internal/resync/resync.go b/internal/resync/resync.go index 8379352a..61a472a0 100644 --- a/internal/resync/resync.go +++ b/internal/resync/resync.go @@ -34,6 +34,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/util/workqueue" ) @@ -382,6 +383,7 @@ func (r *RequestHandler) handleDeletedResource(logCtx *logrus.Entry, reqUpdate * ObjectMeta: v1.ObjectMeta{ Name: reqUpdate.Name, Namespace: reqUpdate.Namespace, + UID: ktypes.UID(reqUpdate.UID), }, } @@ -395,6 +397,7 @@ func (r *RequestHandler) handleDeletedResource(logCtx *logrus.Entry, reqUpdate * ObjectMeta: v1.ObjectMeta{ Name: reqUpdate.Name, Namespace: reqUpdate.Namespace, + UID: ktypes.UID(reqUpdate.UID), }, } ev := r.events.AppProjectEvent(event.Delete, appProject) From 0f1b8e17c340a9760b611f82c65ab38586f075da Mon Sep 17 00:00:00 2001 From: Chetan Banavikalmutt Date: Thu, 9 Oct 2025 16:43:10 +0530 Subject: [PATCH 6/6] don't cleanup repos with UID annotation Signed-off-by: Chetan Banavikalmutt --- agent/inbound_test.go | 12 ++++++++++++ test/e2e/fixture/fixture.go | 31 +++++++++++++++---------------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/agent/inbound_test.go b/agent/inbound_test.go index be964133..f17a71fd 100644 --- a/agent/inbound_test.go +++ b/agent/inbound_test.go @@ -1003,6 +1003,9 @@ func Test_UpdateAppProject(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: "test-project", Namespace: "wrong-namespace", // This should be overridden + Annotations: map[string]string{ + manager.SourceUIDAnnotation: "uid-1", + }, }, Spec: v1alpha1.AppProjectSpec{ SourceNamespaces: []string{"default"}, @@ -1012,6 +1015,9 @@ func Test_UpdateAppProject(t *testing.T) { // Set up project as managed a.projectManager.Manage(project.Name) + getMock := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(project, nil) + defer getMock.Unset() + // Mock the backend Delete method and capture the namespace passed to it var capturedNamespace string deleteMock := be.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { @@ -1041,6 +1047,9 @@ func Test_UpdateAppProject(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: "test-project", Namespace: "principal-namespace", // Different from agent + Annotations: map[string]string{ + manager.SourceUIDAnnotation: "uid-1", + }, }, Spec: v1alpha1.AppProjectSpec{ SourceNamespaces: []string{"default"}, @@ -1065,6 +1074,9 @@ func Test_UpdateAppProject(t *testing.T) { // Test Delete a.projectManager.Manage(project.Name) + getMock := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(project, nil) + defer getMock.Unset() + var capturedDeleteNamespace string deleteMock := be.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { capturedDeleteNamespace = args[2].(string) diff --git a/test/e2e/fixture/fixture.go b/test/e2e/fixture/fixture.go index 63d3d95f..1ca74b4f 100644 --- a/test/e2e/fixture/fixture.go +++ b/test/e2e/fixture/fixture.go @@ -289,14 +289,6 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient } } - isFromSource := func(annotations map[string]string) bool { - if annotations == nil { - return false - } - _, ok := annotations[manager.SourceUIDAnnotation] - return ok - } - // Delete all appProjects from the autonomous agent appProjectList := argoapp.AppProjectList{} err = autonomousAgentClient.List(ctx, "argocd", &appProjectList, metav1.ListOptions{}) @@ -304,8 +296,7 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, appProject := range appProjectList.Items { - if appProject.Name == appproject.DefaultAppProjectName || - isFromSource(appProject.GetAnnotations()) { + if appProject.Name == appproject.DefaultAppProjectName { continue } @@ -334,8 +325,7 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, appProject := range appProjectList.Items { - if appProject.Name == appproject.DefaultAppProjectName || - isFromSource(appProject.GetAnnotations()) { + if appProject.Name == appproject.DefaultAppProjectName { continue } @@ -388,10 +378,6 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient return err } for _, repo := range repoList.Items { - if isFromSource(repo.GetAnnotations()) { - continue - } - err = EnsureDeletion(ctx, principalClient, &repo) if err != nil { return err @@ -405,6 +391,19 @@ func CleanUp(ctx context.Context, principalClient KubeClient, managedAgentClient } } + // Delete all repositories from the autonomous agent + repoList = corev1.SecretList{} + err = autonomousAgentClient.List(ctx, "argocd", &repoList, repoListOpts) + if err != nil { + return err + } + for _, repo := range repoList.Items { + err = EnsureDeletion(ctx, autonomousAgentClient, &repo) + if err != nil { + return err + } + } + // Delete all repositories from the managed agent repoList = corev1.SecretList{} err = managedAgentClient.List(ctx, "argocd", &repoList, repoListOpts)