Skip to content

Commit e2ef906

Browse files
authored
feat: sync private repository secrets to managed agents (#526)
Signed-off-by: Chetan Banavikalmutt <[email protected]>
1 parent 398a271 commit e2ef906

30 files changed

+4863
-258
lines changed

agent/agent.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import (
2727
kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application"
2828
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
2929
kubenamespace "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/namespace"
30+
kuberepository "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/repository"
3031
"github.com/argoproj-labs/argocd-agent/internal/event"
3132
"github.com/argoproj-labs/argocd-agent/internal/informer"
3233
"github.com/argoproj-labs/argocd-agent/internal/kube"
3334
"github.com/argoproj-labs/argocd-agent/internal/logging"
3435
"github.com/argoproj-labs/argocd-agent/internal/manager"
3536
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
3637
"github.com/argoproj-labs/argocd-agent/internal/manager/appproject"
38+
"github.com/argoproj-labs/argocd-agent/internal/manager/repository"
3739
"github.com/argoproj-labs/argocd-agent/internal/metrics"
3840
"github.com/argoproj-labs/argocd-agent/internal/queue"
3941
"github.com/argoproj-labs/argocd-agent/internal/resources"
@@ -72,6 +74,7 @@ type Agent struct {
7274
remote *client.Remote
7375
appManager *application.ApplicationManager
7476
projectManager *appproject.AppProjectManager
77+
repoManager *repository.RepositoryManager
7578
namespaceManager *kubenamespace.KubernetesBackend
7679
mode types.AgentMode
7780
// queues is a queue of create/update/delete events to send to the principal
@@ -248,6 +251,27 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
248251
return nil, err
249252
}
250253

254+
repoInformerOptions := []informer.InformerOption[*corev1.Secret]{
255+
informer.WithListHandler[*corev1.Secret](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
256+
return client.Clientset.CoreV1().Secrets(a.namespace).List(ctx, opts)
257+
}),
258+
informer.WithWatchHandler[*corev1.Secret](func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
259+
return client.Clientset.CoreV1().Secrets(a.namespace).Watch(ctx, opts)
260+
}),
261+
informer.WithAddHandler[*corev1.Secret](a.handleRepositoryCreation),
262+
informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate),
263+
informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion),
264+
informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)),
265+
}
266+
267+
repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...)
268+
if err != nil {
269+
return nil, fmt.Errorf("could not instantiate repository informer: %w", err)
270+
}
271+
272+
repoBackened := kuberepository.NewKubernetesBackend(client.Clientset, a.namespace, repoInformer, true)
273+
a.repoManager = repository.NewManager(repoBackened, a.namespace, true)
274+
251275
nsInformerOpts := []informer.InformerOption[*corev1.Namespace]{
252276
informer.WithListHandler[*corev1.Namespace](func(ctx context.Context, opts v1.ListOptions) (runtime.Object, error) {
253277
return client.Clientset.CoreV1().Namespaces().List(ctx, opts)
@@ -342,6 +366,12 @@ func (a *Agent) Start(ctx context.Context) error {
342366
return fmt.Errorf("failed to sync applications: %w", err)
343367
}
344368

369+
// Wait for the appProject informer to be synced
370+
err = a.projectManager.EnsureSynced(waitForSyncedDuration)
371+
if err != nil {
372+
return fmt.Errorf("failed to sync appProjects: %w", err)
373+
}
374+
345375
// The namespace informer lives in its own go routine
346376
go func() {
347377
if err := a.namespaceManager.StartInformer(a.context); err != nil {
@@ -356,6 +386,20 @@ func (a *Agent) Start(ctx context.Context) error {
356386
}
357387
log().Infof("Namespace informer synced and ready")
358388

389+
// Start the Repository backend in the background
390+
go func() {
391+
if err := a.repoManager.StartBackend(a.context); err != nil {
392+
log().WithError(err).Error("Repository backend has exited non-successfully")
393+
} else {
394+
log().Info("Repository backend has exited")
395+
}
396+
}()
397+
398+
if err = a.repoManager.EnsureSynced(waitForSyncedDuration); err != nil {
399+
return fmt.Errorf("unable to sync Repository informer: %w", err)
400+
}
401+
log().Infof("Repository informer synced and ready")
402+
359403
if a.options.healthzPort > 0 {
360404
// Endpoint to check if the agent is up and running
361405
http.HandleFunc("/healthz", a.healthzHandler)

agent/inbound.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/argoproj-labs/argocd-agent/pkg/types"
2929
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
3030
"github.com/sirupsen/logrus"
31+
corev1 "k8s.io/api/core/v1"
3132
apierrors "k8s.io/apimachinery/pkg/api/errors"
3233
"k8s.io/client-go/dynamic"
3334
)
@@ -57,6 +58,8 @@ func (a *Agent) processIncomingEvent(ev *event.Event) error {
5758
err = a.processIncomingApplication(ev)
5859
case event.TargetAppProject:
5960
err = a.processIncomingAppProject(ev)
61+
case event.TargetRepository:
62+
err = a.processIncomingRepository(ev)
6063
case event.TargetResource:
6164
err = a.processIncomingResourceRequest(ev)
6265
case event.TargetResourceResync:
@@ -264,6 +267,88 @@ func (a *Agent) processIncomingAppProject(ev *event.Event) error {
264267
return err
265268
}
266269

270+
func (a *Agent) processIncomingRepository(ev *event.Event) error {
271+
logCtx := log().WithFields(logrus.Fields{
272+
"method": "processIncomingEvents",
273+
})
274+
275+
incomingRepo, err := ev.Repository()
276+
if err != nil {
277+
return err
278+
}
279+
280+
var exists, sourceUIDMatch bool
281+
282+
// Source UID annotation is not present for repos on the autonomous agent since it is the source of truth.
283+
if a.mode == types.AgentModeManaged {
284+
exists, sourceUIDMatch, err = a.repoManager.CompareSourceUID(a.context, incomingRepo)
285+
if err != nil {
286+
return fmt.Errorf("failed to compare the source UID of app: %w", err)
287+
}
288+
}
289+
290+
switch ev.Type() {
291+
case event.Create:
292+
if exists {
293+
if sourceUIDMatch {
294+
logCtx.Debug("Received a Create event for an existing repository. Updating the existing repository")
295+
_, err := a.updateRepository(incomingRepo)
296+
if err != nil {
297+
return fmt.Errorf("could not update the existing repository: %w", err)
298+
}
299+
return nil
300+
} else {
301+
logCtx.Debug("Repository already exists with a different source UID. Deleting the existing repository")
302+
if err := a.deleteRepository(incomingRepo); err != nil {
303+
return fmt.Errorf("could not delete existing repository prior to creation: %w", err)
304+
}
305+
}
306+
}
307+
308+
_, err = a.createRepository(incomingRepo)
309+
if err != nil {
310+
logCtx.Errorf("Error creating repository: %v", err)
311+
}
312+
313+
case event.SpecUpdate:
314+
if !exists {
315+
logCtx.Debug("Received an Update event for a repository that doesn't exist. Creating the incoming repository")
316+
if _, err := a.createRepository(incomingRepo); err != nil {
317+
return fmt.Errorf("could not create incoming repository: %w", err)
318+
}
319+
return nil
320+
}
321+
322+
if !sourceUIDMatch {
323+
logCtx.Debug("Source UID mismatch between the incoming repository and existing repository. Deleting the existing repository")
324+
if err := a.deleteRepository(incomingRepo); err != nil {
325+
return fmt.Errorf("could not delete existing repository prior to creation: %w", err)
326+
}
327+
328+
logCtx.Debug("Creating the incoming repository after deleting the existing repository")
329+
if _, err := a.createRepository(incomingRepo); err != nil {
330+
return fmt.Errorf("could not create incoming repository after deleting existing repository: %w", err)
331+
}
332+
return nil
333+
}
334+
335+
_, err = a.updateRepository(incomingRepo)
336+
if err != nil {
337+
logCtx.Errorf("Error updating repository: %v", err)
338+
}
339+
340+
case event.Delete:
341+
err = a.deleteRepository(incomingRepo)
342+
if err != nil {
343+
logCtx.Errorf("Error deleting repository: %v", err)
344+
}
345+
default:
346+
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
347+
}
348+
349+
return err
350+
}
351+
267352
// processIncomingResourceResyncEvent handles all the resync events that are
268353
// exchanged with the agent/principal restarts
269354
func (a *Agent) processIncomingResourceResyncEvent(ev *event.Event) error {
@@ -562,3 +647,98 @@ func (a *Agent) deleteAppProject(project *v1alpha1.AppProject) error {
562647
}
563648
return nil
564649
}
650+
651+
// createRepository creates a Repository upon an event in the agent's work queue.
652+
func (a *Agent) createRepository(incoming *corev1.Secret) (*corev1.Secret, error) {
653+
logCtx := log().WithFields(logrus.Fields{
654+
"method": "CreateRepository",
655+
"repo": incoming.Name,
656+
})
657+
658+
// In modes other than "managed", we don't process new repository events
659+
// that are incoming.
660+
if a.mode.IsAutonomous() {
661+
logCtx.Info("Discarding this event, because agent is not in managed mode")
662+
return nil, event.NewEventDiscardedErr("cannot create repository: agent is not in managed mode")
663+
}
664+
665+
// If we receive a new Repository event for a Repository we already manage, it usually
666+
// means that we're out-of-sync from the control plane.
667+
if a.repoManager.IsManaged(incoming.Name) {
668+
logCtx.Trace("Repository is already managed on this agent. Updating the existing Repository")
669+
return a.updateRepository(incoming)
670+
}
671+
672+
logCtx.Infof("Creating a new repository on behalf of an incoming event")
673+
674+
if incoming.Annotations == nil {
675+
incoming.Annotations = make(map[string]string)
676+
}
677+
678+
// Get rid of some fields that we do not want to have on the repository as we start fresh.
679+
delete(incoming.Annotations, "kubectl.kubernetes.io/last-applied-configuration")
680+
681+
created, err := a.repoManager.Create(a.context, incoming)
682+
if apierrors.IsAlreadyExists(err) {
683+
logCtx.Debug("repository already exists")
684+
return created, nil
685+
}
686+
687+
return created, err
688+
}
689+
690+
func (a *Agent) updateRepository(incoming *corev1.Secret) (*corev1.Secret, error) {
691+
incoming.SetNamespace(a.namespace)
692+
logCtx := log().WithFields(logrus.Fields{
693+
"method": "UpdateRepository",
694+
"repo": incoming.Name,
695+
"resourceVersion": incoming.ResourceVersion,
696+
})
697+
698+
if !a.repoManager.IsManaged(incoming.Name) {
699+
logCtx.Trace("Repository is not managed on this agent. Creating the new Repository")
700+
return a.createRepository(incoming)
701+
}
702+
703+
if a.repoManager.IsChangeIgnored(incoming.Name, incoming.ResourceVersion) {
704+
logCtx.Tracef("Discarding this event, because agent has seen this version %s already", incoming.ResourceVersion)
705+
return nil, event.NewEventDiscardedErr("the version %s has already been seen by this agent", incoming.ResourceVersion)
706+
} else {
707+
logCtx.Tracef("New resource version: %s", incoming.ResourceVersion)
708+
}
709+
710+
logCtx.Infof("Updating repository")
711+
712+
return a.repoManager.UpdateManagedRepository(a.context, incoming)
713+
}
714+
715+
func (a *Agent) deleteRepository(repo *corev1.Secret) error {
716+
repo.SetNamespace(a.namespace)
717+
logCtx := log().WithFields(logrus.Fields{
718+
"method": "DeleteRepository",
719+
"repo": repo.Name,
720+
})
721+
722+
if !a.repoManager.IsManaged(repo.Name) {
723+
return fmt.Errorf("repository %s is not managed", repo.Name)
724+
}
725+
726+
logCtx.Infof("Deleting repository")
727+
728+
deletionPropagation := backend.DeletePropagationBackground
729+
err := a.repoManager.Delete(a.context, repo.Name, repo.Namespace, &deletionPropagation)
730+
if err != nil {
731+
if apierrors.IsNotFound(err) {
732+
logCtx.Debug("repository is not found, perhaps it is already deleted")
733+
return nil
734+
}
735+
return err
736+
}
737+
738+
err = a.repoManager.Unmanage(repo.Name)
739+
if err != nil {
740+
log().Warnf("Could not unmanage repository %s: %v", repo.Name, err)
741+
}
742+
743+
return nil
744+
}

0 commit comments

Comments
 (0)