Skip to content

Commit 829de82

Browse files
authored
Merge branch 'argoproj-labs:main' into main
2 parents 5cbd95c + e9c382b commit 829de82

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+7301
-887
lines changed

agent/agent.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,20 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
2526
"github.com/argoproj-labs/argocd-agent/internal/backend"
2627
kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application"
2728
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
2829
kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace"
30+
kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository"
2931
"github.com/argoproj-labs/argocd-agent/internal/event"
3032
"github.com/argoproj-labs/argocd-agent/internal/informer"
3133
"github.com/argoproj-labs/argocd-agent/internal/kube"
3234
"github.com/argoproj-labs/argocd-agent/internal/logging"
3335
"github.com/argoproj-labs/argocd-agent/internal/manager"
3436
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
3537
"github.com/argoproj-labs/argocd-agent/internal/manager/appproject"
38+
"github.com/argoproj-labs/argocd-agent/internal/manager/repository"
3639
"github.com/argoproj-labs/argocd-agent/internal/metrics"
3740
"github.com/argoproj-labs/argocd-agent/internal/queue"
3841
"github.com/argoproj-labs/argocd-agent/internal/resources"
@@ -47,6 +50,8 @@ import (
4750

4851
appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
4952
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
53+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
54+
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
5055
ty "k8s.io/apimachinery/pkg/types"
5156
)
5257

@@ -69,6 +74,7 @@ type Agent struct {
6974
remote *client.Remote
7075
appManager *application.ApplicationManager
7176
projectManager *appproject.AppProjectManager
77+
repoManager *repository.RepositoryManager
7278
namespaceManager *kubenamespace.KubernetesBackend
7379
mode types.AgentMode
7480
// queues is a queue of create/update/delete events to send to the principal
@@ -94,6 +100,9 @@ type Agent struct {
94100

95101
// enableResourceProxy determines if the agent should proxy resources to the principal
96102
enableResourceProxy bool
103+
104+
cacheRefreshInterval time.Duration
105+
clusterCache *appstatecache.Cache
97106
}
98107

99108
const defaultQueueName = "default"
@@ -242,6 +251,27 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
242251
return nil, err
243252
}
244253

254+
repoInformerOptions := []informer.InformerOption[*corev1.Secret]{
255+
informer.WithListHandler[*corev1.Secret](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
256+
return client.Clientset.CoreV1().Secrets(a.namespace).List(ctx, opts)
257+
}),
258+
informer.WithWatchHandler[*corev1.Secret](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
259+
return client.Clientset.CoreV1().Secrets(a.namespace).Watch(ctx, opts)
260+
}),
261+
informer.WithAddHandler[*corev1.Secret](a.handleRepositoryCreation),
262+
informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate),
263+
informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion),
264+
informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)),
265+
}
266+
267+
repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...)
268+
if err != nil {
269+
return nil, fmt.Errorf("could not instantiate repository informer: %w", err)
270+
}
271+
272+
repoBackened := kuberepository.NewKubernetesBackend(client.Clientset, a.namespace, repoInformer, true)
273+
a.repoManager = repository.NewManager(repoBackened, a.namespace, true)
274+
245275
nsInformerOpts := []informer.InformerOption[*corev1.Namespace]{
246276
informer.WithListHandler[*corev1.Namespace](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
247277
return client.Clientset.CoreV1().Namespaces().List(ctx, opts)
@@ -273,6 +303,13 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
273303
connMap: map[string]connectionEntry{},
274304
}
275305

306+
clusterCache, err := cluster.NewClusterCacheInstance(ctx, client.Clientset,
307+
a.namespace, a.redisProxyMsgHandler.redisAddress, cacheutil.RedisCompressionGZip)
308+
if err != nil {
309+
return nil, fmt.Errorf("failed to create cluster cache instance: %v", err)
310+
}
311+
a.clusterCache = clusterCache
312+
276313
return a, nil
277314
}
278315

@@ -329,6 +366,12 @@ func (a *Agent) Start(ctx context.Context) error {
329366
return fmt.Errorf("failed to sync applications: %w", err)
330367
}
331368

369+
// Wait for the appProject informer to be synced
370+
err = a.projectManager.EnsureSynced(waitForSyncedDuration)
371+
if err != nil {
372+
return fmt.Errorf("failed to sync appProjects: %w", err)
373+
}
374+
332375
// The namespace informer lives in its own go routine
333376
go func() {
334377
if err := a.namespaceManager.StartInformer(a.context); err != nil {
@@ -343,6 +386,20 @@ func (a *Agent) Start(ctx context.Context) error {
343386
}
344387
log().Infof("Namespace informer synced and ready")
345388

389+
// Start the Repository backend in the background
390+
go func() {
391+
if err := a.repoManager.StartBackend(a.context); err != nil {
392+
log().WithError(err).Error("Repository backend has exited non-successfully")
393+
} else {
394+
log().Info("Repository backend has exited")
395+
}
396+
}()
397+
398+
if err = a.repoManager.EnsureSynced(waitForSyncedDuration); err != nil {
399+
return fmt.Errorf("unable to sync Repository informer: %w", err)
400+
}
401+
log().Infof("Repository informer synced and ready")
402+
346403
if a.options.healthzPort > 0 {
347404
// Endpoint to check if the agent is up and running
348405
http.HandleFunc("/healthz", a.healthzHandler)
@@ -353,6 +410,23 @@ func (a *Agent) Start(ctx context.Context) error {
353410
go http.ListenAndServe(healthzAddr, nil)
354411
}
355412

413+
// Start the background process of periodic sync of cluster cache info.
414+
// This will send periodic updates of Application, Resource and API counts to principal.
415+
if a.mode == types.AgentModeManaged {
416+
go func() {
417+
ticker := time.NewTicker(a.cacheRefreshInterval)
418+
defer ticker.Stop()
419+
for {
420+
select {
421+
case <-ticker.C:
422+
a.addClusterCacheInfoUpdateToQueue()
423+
case <-a.context.Done():
424+
return
425+
}
426+
}
427+
}()
428+
}
429+
356430
if a.remote != nil {
357431
a.remote.SetClientMode(a.mode)
358432
// TODO: Right now, maintainConnection always returns nil. Revisit

agent/agent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func newAgent(t *testing.T) *Agent {
3131
t.Helper()
32-
kubec := kube.NewKubernetesFakeClientWithApps()
32+
kubec := kube.NewKubernetesFakeClientWithApps("argocd")
3333
remote, err := client.NewRemote("127.0.0.1", 8080)
3434
require.NoError(t, err)
3535
agent, err := NewAgent(context.TODO(), kubec, "argocd", WithRemote(remote))
@@ -38,7 +38,7 @@ func newAgent(t *testing.T) *Agent {
3838
}
3939

4040
func Test_NewAgent(t *testing.T) {
41-
kubec := kube.NewKubernetesFakeClientWithApps()
41+
kubec := kube.NewKubernetesFakeClientWithApps("agent")
4242
agent, err := NewAgent(context.TODO(), kubec, "agent", WithRemote(&client.Remote{}))
4343
require.NotNil(t, agent)
4444
require.NoError(t, err)

agent/inbound.go

Lines changed: 181 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/argoproj-labs/argocd-agent/pkg/types"
2929
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
3030
"github.com/sirupsen/logrus"
31+
corev1 "k8s.io/api/core/v1"
3132
apierrors "k8s.io/apimachinery/pkg/api/errors"
3233
"k8s.io/client-go/dynamic"
3334
)
@@ -57,6 +58,8 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error {
5758
err = a.processIncomingApplication(ev)
5859
case event.TargetAppProject:
5960
err = a.processIncomingAppProject(ev)
61+
case event.TargetRepository:
62+
err = a.processIncomingRepository(ev)
6063
case event.TargetResource:
6164
err = a.processIncomingResourceRequest(ev)
6265
case event.TargetResourceResync:
@@ -264,6 +267,88 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error {
264267
return err
265268
}
266269

270+
func (a *Agent) processIncomingRepository(ev *event.Event) error {
271+
logCtx := log().WithFields(logrus.Fields{
272+
"method": "processIncomingEvents",
273+
})
274+
275+
incomingRepo, err := ev.Repository()
276+
if err != nil {
277+
return err
278+
}
279+
280+
var exists, sourceUIDMatch bool
281+
282+
// Source UID annotation is not present for repos on the autonomous agent since it is the source of truth.
283+
if a.mode == types.AgentModeManaged {
284+
exists, sourceUIDMatch, err = a.repoManager.CompareSourceUID(a.context, incomingRepo)
285+
if err != nil {
286+
return fmt.Errorf("failed to compare the source UID of app: %w", err)
287+
}
288+
}
289+
290+
switch ev.Type() {
291+
case event.Create:
292+
if exists {
293+
if sourceUIDMatch {
294+
logCtx.Debug("Received a Create event for an existing repository. Updating the existing repository")
295+
_, err := a.updateRepository(incomingRepo)
296+
if err != nil {
297+
return fmt.Errorf("could not update the existing repository: %w", err)
298+
}
299+
return nil
300+
} else {
301+
logCtx.Debug("Repository already exists with a different source UID. Deleting the existing repository")
302+
if err := a.deleteRepository(incomingRepo); err != nil {
303+
return fmt.Errorf("could not delete existing repository prior to creation: %w", err)
304+
}
305+
}
306+
}
307+
308+
_, err = a.createRepository(incomingRepo)
309+
if err != nil {
310+
logCtx.Errorf("Error creating repository: %v", err)
311+
}
312+
313+
case event.SpecUpdate:
314+
if !exists {
315+
logCtx.Debug("Received an Update event for a repository that doesn't exist. Creating the incoming repository")
316+
if _, err := a.createRepository(incomingRepo); err != nil {
317+
return fmt.Errorf("could not create incoming repository: %w", err)
318+
}
319+
return nil
320+
}
321+
322+
if !sourceUIDMatch {
323+
logCtx.Debug("Source UID mismatch between the incoming repository and existing repository. Deleting the existing repository")
324+
if err := a.deleteRepository(incomingRepo); err != nil {
325+
return fmt.Errorf("could not delete existing repository prior to creation: %w", err)
326+
}
327+
328+
logCtx.Debug("Creating the incoming repository after deleting the existing repository")
329+
if _, err := a.createRepository(incomingRepo); err != nil {
330+
return fmt.Errorf("could not create incoming repository after deleting existing repository: %w", err)
331+
}
332+
return nil
333+
}
334+
335+
_, err = a.updateRepository(incomingRepo)
336+
if err != nil {
337+
logCtx.Errorf("Error updating repository: %v", err)
338+
}
339+
340+
case event.Delete:
341+
err = a.deleteRepository(incomingRepo)
342+
if err != nil {
343+
logCtx.Errorf("Error deleting repository: %v", err)
344+
}
345+
default:
346+
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
347+
}
348+
349+
return err
350+
}
351+
267352
// processIncomingResourceResyncEvent handles all the resync events that are
268353
// exchanged with the agent/principal restarts
269354
func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
@@ -457,7 +542,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
457542
return err
458543
}
459544

460-
if a.mode == types.AgentModeManaged && err == nil {
545+
if a.mode == types.AgentModeManaged {
461546
appCache.DeleteApplicationSpec(app.UID, logCtx)
462547
}
463548

@@ -562,3 +647,98 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
562647
}
563648
return nil
564649
}
650+
651+
// createRepository creates a Repository upon an event in the agent's work queue.
652+
func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error) {
653+
logCtx := log().WithFields(logrus.Fields{
654+
"method": "CreateRepository",
655+
"repo": incoming.Name,
656+
})
657+
658+
// In modes other than "managed", we don't process new repository events
659+
// that are incoming.
660+
if a.mode.IsAutonomous() {
661+
logCtx.Info("Discarding this event, because agent is not in managed mode")
662+
return nil, event.NewEventDiscardedErr("cannot create repository: agent is not in managed mode")
663+
}
664+
665+
// If we receive a new Repository event for a Repository we already manage, it usually
666+
// means that we're out-of-sync from the control plane.
667+
if a.repoManager.IsManaged(incoming.Name) {
668+
logCtx.Trace("Repository is already managed on this agent. Updating the existing Repository")
669+
return a.updateRepository(incoming)
670+
}
671+
672+
logCtx.Infof("Creating a new repository on behalf of an incoming event")
673+
674+
if incoming.Annotations == nil {
675+
incoming.Annotations = make(map[string]string)
676+
}
677+
678+
// Get rid of some fields that we do not want to have on the repository as we start fresh.
679+
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")
680+
681+
created, err := a.repoManager.Create(a.context, incoming)
682+
if apierrors.IsAlreadyExists(err) {
683+
logCtx.Debug("repository already exists")
684+
return created, nil
685+
}
686+
687+
return created, err
688+
}
689+
690+
func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error) {
691+
incoming.SetNamespace(a.namespace)
692+
logCtx := log().WithFields(logrus.Fields{
693+
"method": "UpdateRepository",
694+
"repo": incoming.Name,
695+
"resourceVersion": incoming.ResourceVersion,
696+
})
697+
698+
if !a.repoManager.IsManaged(incoming.Name) {
699+
logCtx.Trace("Repository is not managed on this agent. Creating the new Repository")
700+
return a.createRepository(incoming)
701+
}
702+
703+
if a.repoManager.IsChangeIgnored(incoming.Name, incoming.ResourceVersion) {
704+
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
705+
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
706+
} else {
707+
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
708+
}
709+
710+
logCtx.Infof("Updating repository")
711+
712+
return a.repoManager.UpdateManagedRepository(a.context, incoming)
713+
}
714+
715+
func (a *Agent) deleteRepository(repo *corev1.Secret) error {
716+
repo.SetNamespace(a.namespace)
717+
logCtx := log().WithFields(logrus.Fields{
718+
"method": "DeleteRepository",
719+
"repo": repo.Name,
720+
})
721+
722+
if !a.repoManager.IsManaged(repo.Name) {
723+
return fmt.Errorf("repository %s is not managed", repo.Name)
724+
}
725+
726+
logCtx.Infof("Deleting repository")
727+
728+
deletionPropagation := backend.DeletePropagationBackground
729+
err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
730+
if err != nil {
731+
if apierrors.IsNotFound(err) {
732+
logCtx.Debug("repository is not found, perhaps it is already deleted")
733+
return nil
734+
}
735+
return err
736+
}
737+
738+
err = a.repoManager.Unmanage(repo.Name)
739+
if err != nil {
740+
log().Warnf("Could not unmanage repository %s: %v", repo.Name, err)
741+
}
742+
743+
return nil
744+
}

0 commit comments

Comments
 (0)