Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace"
kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository"
"github.com/argoproj-labs/argocd-agent/internal/cache"
"github.com/argoproj-labs/argocd-agent/internal/config"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/informer"
Expand All @@ -49,7 +50,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"

appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
Expand Down Expand Up @@ -104,6 +104,13 @@ type Agent struct {

cacheRefreshInterval time.Duration
clusterCache *appstatecache.Cache

// sourceCache is a cache of resources from the source. We use it to revert any changes made to the local resources.
sourceCache *cache.SourceCache

// deletions tracks valid deletions from the source.
// This is used to differentiate between valid and invalid deletions
deletions *manager.DeletionTracker
}

const defaultQueueName = "default"
Expand All @@ -129,7 +136,9 @@ type AgentOption func(*Agent) error
// options.
func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace string, opts ...AgentOption) (*Agent, error) {
a := &Agent{
version: version.New("argocd-agent"),
version: version.New("argocd-agent"),
deletions: manager.NewDeletionTracker(),
sourceCache: cache.NewSourceCache(),
}
a.infStopCh = make(chan struct{})
a.namespace = namespace
Expand Down Expand Up @@ -197,6 +206,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
appManagerOpts := []application.ApplicationManagerOption{
application.WithRole(manager.ManagerRoleAgent),
application.WithMode(managerMode),
application.WithDeletionTracker(a.deletions),
}

if a.options.metricsPort > 0 {
Expand Down Expand Up @@ -319,21 +329,10 @@ func (a *Agent) Start(ctx context.Context) error {
a.context = infCtx
a.cancelFn = cancelFn

// For managed-agent we need to maintain a cache to keep applications in sync with last known state of
// principal in case agent is disconnected with principal or application in managed-cluster is modified.
// For managed-agent we need to maintain a cache to keep resources in sync with last known state of
// principal in case agent is disconnected with principal or resources in managed-cluster are modified.
if a.mode == types.AgentModeManaged {
log().Infof("Recreating application spec cache from existing resources on cluster")
appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}})
if err != nil {
log().Errorf("Error while fetching list of applications: %v", err)
}

for _, app := range appList {
sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation]
if exists {
appCache.SetApplicationSpec(ty.UID(sourceUID), app.Spec, log())
}
}
a.populateSourceCache(ctx)
}

if a.options.metricsPort > 0 {
Expand Down Expand Up @@ -482,3 +481,46 @@ func (a *Agent) healthzHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}
}

func (a *Agent) populateSourceCache(ctx context.Context) {
log().Infof("Recreating application spec cache from existing resources on cluster")
appList, err := a.appManager.List(ctx, backend.ApplicationSelector{Namespaces: []string{a.namespace}})
if err != nil {
log().Errorf("Error while fetching list of applications: %v", err)
}

for _, app := range appList {
sourceUID, exists := app.Annotations[manager.SourceUIDAnnotation]
if exists {
a.sourceCache.Application.Set(ty.UID(sourceUID), app.Spec)
}
}

log().Infof("Recreating appProject spec cache from existing resources on cluster")
appProjectList, err := a.projectManager.List(ctx, backend.AppProjectSelector{Namespace: a.namespace})
if err != nil {
log().Errorf("Error while fetching list of appProjects: %v", err)
}

for _, appProject := range appProjectList {
sourceUID, exists := appProject.Annotations[manager.SourceUIDAnnotation]
if exists {
a.sourceCache.AppProject.Set(ty.UID(sourceUID), appProject.Spec)
}
}

log().Infof("Recreating repository spec cache from existing resources on cluster")
repoList, err := a.repoManager.List(ctx, backend.RepositorySelector{Namespace: a.namespace})
if err != nil {
log().Errorf("Error while fetching list of repositories: %v", err)
}

for _, repo := range repoList {
sourceUID, exists := repo.Annotations[manager.SourceUIDAnnotation]
if exists {
a.sourceCache.Repository.Set(ty.UID(sourceUID), repo.Data)
}
}

log().Infof("Source cache populated successfully")
}
58 changes: 50 additions & 8 deletions agent/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/argoproj-labs/argocd-agent/internal/backend"
appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
"github.com/argoproj-labs/argocd-agent/internal/checkpoint"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/manager"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
)

Expand Down Expand Up @@ -459,7 +459,7 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App

if a.mode == types.AgentModeManaged {
// Store app spec in cache
appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx)
a.sourceCache.Application.Set(incoming.UID, incoming.Spec)
}

created, err := a.appManager.Create(a.context, incoming)
Expand Down Expand Up @@ -500,7 +500,7 @@ func (a *Agent) updateApplication(incoming *v1alpha1.Application) (*v1alpha1.App

// Update app spec in cache
logCtx.Tracef("Calling update spec for this event")
appCache.SetApplicationSpec(incoming.UID, incoming.Spec, logCtx)
a.sourceCache.Application.Set(incoming.UID, incoming.Spec)

napp, err = a.appManager.UpdateManagedApp(a.context, incoming)
case types.AgentModeAutonomous:
Expand Down Expand Up @@ -529,21 +529,30 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {

logCtx.Infof("Deleting application")

// Fetch the source UID of the existing app to mark it as expected deletion.
app, err := a.appManager.Get(a.context, app.Name, app.Namespace)
if err != nil {
return err
}

sourceUID := app.Annotations[manager.SourceUIDAnnotation]
a.deletions.MarkExpected(ktypes.UID(sourceUID))

deletionPropagation := backend.DeletePropagationBackground
err := a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
err = a.appManager.Delete(a.context, a.namespace, app, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("application is not found, perhaps it is already deleted")
if a.mode == types.AgentModeManaged {
appCache.DeleteApplicationSpec(app.UID, logCtx)
a.sourceCache.Application.Delete(app.UID)
}
return nil
}
return err
}

if a.mode == types.AgentModeManaged {
appCache.DeleteApplicationSpec(app.UID, logCtx)
a.sourceCache.Application.Delete(app.UID)
}

err = a.appManager.Unmanage(app.QualifiedName())
Expand Down Expand Up @@ -577,6 +586,8 @@ func (a *Agent) createAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr

logCtx.Infof("Creating a new AppProject on behalf of an incoming event")

a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec)

// Get rid of some fields that we do not want to have on the AppProject
// as we start fresh.
if incoming.Annotations != nil {
Expand Down Expand Up @@ -611,6 +622,8 @@ func (a *Agent) updateAppProject(incoming *v1alpha1.AppProject) (*v1alpha1.AppPr

logCtx.Infof("Updating appProject")

a.sourceCache.AppProject.Set(incoming.UID, incoming.Spec)

logCtx.Tracef("Calling update spec for this event")
return a.projectManager.UpdateAppProject(a.context, incoming)

Expand All @@ -632,15 +645,28 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {

logCtx.Infof("Deleting appProject")

// Fetch the source UID of the existing appProject to mark it as expected deletion.
project, err := a.projectManager.Get(a.context, project.Name, project.Namespace)
if err != nil {
return err
}

sourceUID := project.Annotations[manager.SourceUIDAnnotation]
a.deletions.MarkExpected(ktypes.UID(sourceUID))

deletionPropagation := backend.DeletePropagationBackground
err := a.projectManager.Delete(a.context, project, &deletionPropagation)
err = a.projectManager.Delete(a.context, project, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("appProject not found, perhaps it is already deleted")
a.sourceCache.AppProject.Delete(project.UID)
return nil
}
return err
}

a.sourceCache.AppProject.Delete(project.UID)

err = a.projectManager.Unmanage(project.Name)
if err != nil {
log().Warnf("Could not unmanage appProject %s: %v", project.Name, err)
Expand Down Expand Up @@ -675,6 +701,8 @@ func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error
incoming.Annotations = make(map[string]string)
}

a.sourceCache.Repository.Set(incoming.UID, incoming.Data)

// Get rid of some fields that we do not want to have on the repository as we start fresh.
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")

Expand Down Expand Up @@ -709,6 +737,8 @@ func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error

logCtx.Infof("Updating repository")

a.sourceCache.Repository.Set(incoming.UID, incoming.Data)

return a.repoManager.UpdateManagedRepository(a.context, incoming)
}

Expand All @@ -725,16 +755,28 @@ func (a *Agent) deleteRepository(repo *corev1.Secret) error {

logCtx.Infof("Deleting repository")

// Fetch the source UID of the existing repository to mark it as expected deletion.
repo, err := a.repoManager.Get(a.context, repo.Name, repo.Namespace)
if err != nil {
return err
}

sourceUID := repo.Annotations[manager.SourceUIDAnnotation]
a.deletions.MarkExpected(ktypes.UID(sourceUID))

deletionPropagation := backend.DeletePropagationBackground
err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
err = a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
if err != nil {
if apierrors.IsNotFound(err) {
logCtx.Debug("repository is not found, perhaps it is already deleted")
a.sourceCache.Repository.Delete(repo.UID)
return nil
}
return err
}

a.sourceCache.Repository.Delete(repo.UID)

err = a.repoManager.Unmanage(repo.Name)
if err != nil {
log().Warnf("Could not unmanage repository %s: %v", repo.Name, err)
Expand Down
Loading
Loading