Skip to content
This repository was archived by the owner on Oct 19, 2024. It is now read-only.

Commit b2994cc

Browse files
author
Alexander Matyushentsev
authored
refactor: move notification controller implementation to github.com/argoproj/notifications-engine (#276)
Signed-off-by: Alexander Matyushentsev <[email protected]>
1 parent 9f89acb commit b2994cc

22 files changed

+369
-1207
lines changed

bot/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"net/http"
99
"strings"
1010

11-
"github.com/argoproj/notifications-engine/pkg/controller"
11+
"github.com/argoproj/notifications-engine/pkg/subscriptions"
1212

1313
"github.com/argoproj-labs/argocd-notifications/shared/k8s"
1414

@@ -106,7 +106,7 @@ func (s *server) updateSubscription(service string, recipient string, subscribe
106106
return "", err
107107
}
108108
oldAnnotations := copyStringMap(obj.GetAnnotations())
109-
annotations := controller.Subscriptions(obj.GetAnnotations())
109+
annotations := subscriptions.Annotations(obj.GetAnnotations())
110110
if subscribe {
111111
annotations.Subscribe(opts.Trigger, service, recipient)
112112
} else {
@@ -139,7 +139,7 @@ func (s *server) listSubscriptions(service string, recipient string) (string, er
139139
}
140140
var apps []string
141141
for _, app := range appList.Items {
142-
if controller.Subscriptions(app.GetAnnotations()).Has(service, recipient) {
142+
if subscriptions.Annotations(app.GetAnnotations()).Has(service, recipient) {
143143
apps = append(apps, fmt.Sprintf("%s/%s", app.GetNamespace(), app.GetName()))
144144
}
145145
}
@@ -149,7 +149,7 @@ func (s *server) listSubscriptions(service string, recipient string) (string, er
149149
}
150150
var appProjs []string
151151
for _, appProj := range appProjList.Items {
152-
if controller.Subscriptions(appProj.GetAnnotations()).Has(service, recipient) {
152+
if subscriptions.Annotations(appProj.GetAnnotations()).Has(service, recipient) {
153153
appProjs = append(appProjs, fmt.Sprintf("%s/%s", appProj.GetNamespace(), appProj.GetName()))
154154
}
155155
}

bot/server_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package bot
33
import (
44
"testing"
55

6-
"github.com/argoproj/notifications-engine/pkg/controller"
6+
"github.com/argoproj/notifications-engine/pkg/subscriptions"
77

88
. "github.com/argoproj-labs/argocd-notifications/testing"
99

@@ -26,7 +26,7 @@ func TestListRecipients_NoSubscriptions(t *testing.T) {
2626
func TestListSubscriptions_HasAppSubscription(t *testing.T) {
2727
client := NewFakeClient(
2828
NewApp("foo"),
29-
NewApp("bar", WithAnnotations(map[string]string{controller.SubscribeAnnotationKey("my-trigger", "slack"): "general"})))
29+
NewApp("bar", WithAnnotations(map[string]string{subscriptions.SubscribeAnnotationKey("my-trigger", "slack"): "general"})))
3030
s := NewServer(client, TestNamespace)
3131

3232
response, err := s.listSubscriptions("slack", "general")
@@ -39,7 +39,7 @@ func TestListSubscriptions_HasAppSubscription(t *testing.T) {
3939
func TestListSubscriptions_HasAppProjSubscription(t *testing.T) {
4040
client := NewFakeClient(
4141
NewApp("foo"),
42-
NewProject("bar", WithAnnotations(map[string]string{controller.SubscribeAnnotationKey("my-trigger", "slack"): "general"})))
42+
NewProject("bar", WithAnnotations(map[string]string{subscriptions.SubscribeAnnotationKey("my-trigger", "slack"): "general"})))
4343
s := NewServer(client, TestNamespace)
4444

4545
response, err := s.listSubscriptions("slack", "general")
@@ -51,7 +51,7 @@ func TestListSubscriptions_HasAppProjSubscription(t *testing.T) {
5151

5252
func TestUpdateSubscription_SubscribeToApp(t *testing.T) {
5353
client := NewFakeClient(NewApp("foo", WithAnnotations(map[string]string{
54-
controller.SubscribeAnnotationKey("my-trigger", "slack"): "channel1",
54+
subscriptions.SubscribeAnnotationKey("my-trigger", "slack"): "channel1",
5555
})))
5656

5757
var patches []map[string]interface{}
@@ -64,13 +64,13 @@ func TestUpdateSubscription_SubscribeToApp(t *testing.T) {
6464
assert.Equal(t, "subscription updated", resp)
6565
assert.Len(t, patches, 1)
6666

67-
val, _, _ := unstructured.NestedString(patches[0], "metadata", "annotations", controller.SubscribeAnnotationKey("my-trigger", "slack"))
67+
val, _, _ := unstructured.NestedString(patches[0], "metadata", "annotations", subscriptions.SubscribeAnnotationKey("my-trigger", "slack"))
6868
assert.Equal(t, val, "channel1;channel2")
6969
}
7070

7171
func TestUpdateSubscription_SubscribeToAppTrigger(t *testing.T) {
7272
client := NewFakeClient(NewApp("foo", WithAnnotations(map[string]string{
73-
controller.SubscribeAnnotationKey("my-trigger", "slack"): "channel1",
73+
subscriptions.SubscribeAnnotationKey("my-trigger", "slack"): "channel1",
7474
})))
7575

7676
var patches []map[string]interface{}
@@ -84,7 +84,7 @@ func TestUpdateSubscription_SubscribeToAppTrigger(t *testing.T) {
8484
assert.Len(t, patches, 1)
8585

8686
patch := patches[0]
87-
val, _, _ := unstructured.NestedString(patch, "metadata", "annotations", controller.SubscribeAnnotationKey("on-sync-failed", "slack"))
87+
val, _, _ := unstructured.NestedString(patch, "metadata", "annotations", subscriptions.SubscribeAnnotationKey("on-sync-failed", "slack"))
8888
assert.Equal(t, "channel2", val)
8989
}
9090

bot/slack/verify.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"errors"
55
"net/http"
66

7-
"github.com/argoproj-labs/argocd-notifications/shared/settings"
7+
"github.com/argoproj/notifications-engine/pkg/api"
88

99
slackclient "github.com/slack-go/slack"
1010
)
@@ -15,11 +15,15 @@ type HasSigningSecret interface {
1515

1616
type RequestVerifier func(data []byte, header http.Header) (string, error)
1717

18-
func NewVerifier(cfg settings.Config) RequestVerifier {
18+
func NewVerifier(apiFactory api.Factory) RequestVerifier {
1919
return func(data []byte, header http.Header) (string, error) {
2020
signingSecret := ""
2121
serviceName := ""
22-
for name, service := range cfg.API.GetNotificationServices() {
22+
api, err := apiFactory.GetAPI()
23+
if err != nil {
24+
return "", err
25+
}
26+
for name, service := range api.GetNotificationServices() {
2327
if hasSecret, ok := service.(HasSigningSecret); ok {
2428
signingSecret = hasSecret.GetSigningSecret()
2529
serviceName = name

bot/slack/verify_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"testing"
66
"time"
77

8-
"github.com/argoproj-labs/argocd-notifications/shared/settings"
9-
"github.com/argoproj/notifications-engine/pkg"
10-
"github.com/argoproj/notifications-engine/pkg/services"
8+
"github.com/argoproj/notifications-engine/pkg/mocks"
119

10+
"github.com/argoproj/notifications-engine/pkg/api"
11+
"github.com/argoproj/notifications-engine/pkg/services"
1212
"github.com/stretchr/testify/assert"
1313
)
1414

@@ -32,14 +32,14 @@ func TestNewVerifier_IncorrectConfig(t *testing.T) {
3232

3333
t.Run(k, func(t *testing.T) {
3434

35-
api, err := pkg.NewAPI(pkg.Config{})
35+
api, err := api.NewAPI(api.Config{}, nil)
3636
if !assert.NoError(t, err) {
3737
return
3838
}
3939
for k, v := range testCase.Services {
4040
api.AddNotificationService(k, v)
4141
}
42-
verifier := NewVerifier(settings.Config{API: api})
42+
verifier := NewVerifier(&mocks.FakeFactory{Api: api})
4343

4444
_, err = verifier(nil, nil)
4545

@@ -50,12 +50,12 @@ func TestNewVerifier_IncorrectConfig(t *testing.T) {
5050
}
5151

5252
func TestNewVerifier_IncorrectSignature(t *testing.T) {
53-
api, err := pkg.NewAPI(pkg.Config{})
53+
api, err := api.NewAPI(api.Config{}, nil)
5454
if !assert.NoError(t, err) {
5555
return
5656
}
5757
api.AddNotificationService("slack", services.NewSlackService(services.SlackOptions{SigningSecret: "hello world"}))
58-
verifier := NewVerifier(settings.Config{API: api})
58+
verifier := NewVerifier(&mocks.FakeFactory{Api: api})
5959

6060
now := time.Now()
6161
data := "hello world"

cmd/bot.go

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package main
22

33
import (
4-
"context"
5-
"log"
6-
"net/http"
7-
"sync"
4+
"github.com/argoproj/notifications-engine/pkg/api"
85

96
"github.com/spf13/cobra"
107
"k8s.io/client-go/dynamic"
@@ -14,7 +11,6 @@ import (
1411
"github.com/argoproj-labs/argocd-notifications/bot"
1512
"github.com/argoproj-labs/argocd-notifications/bot/slack"
1613
"github.com/argoproj-labs/argocd-notifications/shared/k8s"
17-
"github.com/argoproj-labs/argocd-notifications/shared/legacy"
1814
"github.com/argoproj-labs/argocd-notifications/shared/settings"
1915
)
2016

@@ -46,15 +42,13 @@ func newBotCommand() *cobra.Command {
4642
return err
4743
}
4844
}
49-
cfgSrc := make(chan settings.Config)
50-
if err = settings.WatchConfig(context.Background(), nil, clientset, namespace, func(config settings.Config) error {
51-
cfgSrc <- config
52-
return nil
53-
}, legacy.ApplyLegacyConfig); err != nil {
54-
log.Fatal(err)
55-
}
45+
46+
apiFactory := api.NewFactory(settings.GetFactorySettings(nil),
47+
namespace,
48+
k8s.NewSecretInformer(clientset, namespace), k8s.NewConfigMapInformer(clientset, namespace))
49+
5650
server := bot.NewServer(dynamicClient, namespace)
57-
server.AddAdapter("/slack", slack.NewSlackAdapter(getVerifier(cfgSrc)))
51+
server.AddAdapter("/slack", slack.NewSlackAdapter(slack.NewVerifier(apiFactory)))
5852
return server.Serve(port)
5953
},
6054
}
@@ -63,26 +57,3 @@ func newBotCommand() *cobra.Command {
6357
command.Flags().StringVar(&namespace, "namespace", "", "Namespace which bot handles. Current namespace if empty.")
6458
return &command
6559
}
66-
67-
func getVerifier(cfgSrc chan settings.Config) slack.RequestVerifier {
68-
cfg := <-cfgSrc
69-
verifier := slack.NewVerifier(cfg)
70-
71-
var lock sync.Mutex
72-
73-
go func() {
74-
for next := range cfgSrc {
75-
lock.Lock()
76-
verifier = slack.NewVerifier(next)
77-
lock.Unlock()
78-
}
79-
}()
80-
81-
return func(data []byte, header http.Header) (string, error) {
82-
var currentVerifier slack.RequestVerifier
83-
lock.Lock()
84-
currentVerifier = verifier
85-
lock.Unlock()
86-
return currentVerifier(data, header)
87-
}
88-
}

cmd/controller.go

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ import (
1010
"github.com/argoproj-labs/argocd-notifications/controller"
1111
"github.com/argoproj-labs/argocd-notifications/shared/argocd"
1212
"github.com/argoproj-labs/argocd-notifications/shared/k8s"
13-
"github.com/argoproj-labs/argocd-notifications/shared/legacy"
14-
"github.com/argoproj-labs/argocd-notifications/shared/settings"
1513

16-
"github.com/argoproj/notifications-engine/pkg/services"
14+
notificationscontroller "github.com/argoproj/notifications-engine/pkg/controller"
1715
"github.com/prometheus/client_golang/prometheus"
1816
"github.com/prometheus/client_golang/prometheus/promhttp"
1917
log "github.com/sirupsen/logrus"
@@ -84,7 +82,7 @@ func newControllerCommand() *cobra.Command {
8482
}
8583
defer argocdService.Close()
8684

87-
registry := controller.NewMetricsRegistry()
85+
registry := notificationscontroller.NewMetricsRegistry("argocd")
8886
http.Handle("/metrics", promhttp.HandlerFor(prometheus.Gatherers{registry, prometheus.DefaultGatherer}, promhttp.HandlerOpts{}))
8987

9088
go func() {
@@ -93,35 +91,13 @@ func newControllerCommand() *cobra.Command {
9391
log.Infof("serving metrics on port %d", metricsPort)
9492
log.Infof("loading configuration %d", metricsPort)
9593

96-
var cancelPrev context.CancelFunc
97-
err = settings.WatchConfig(context.Background(), argocdService, k8sClient, namespace, func(cfg settings.Config) error {
98-
if cancelPrev != nil {
99-
log.Info("Settings had been updated. Restarting controller...")
100-
cancelPrev()
101-
cancelPrev = nil
102-
}
103-
104-
// add console service that is useful for debugging
105-
cfg.API.AddNotificationService("console", services.NewConsoleService(os.Stdout))
106-
107-
ctrl, err := controller.NewController(dynamicClient, namespace, cfg, appLabelSelector, registry)
108-
if err != nil {
109-
return err
110-
}
111-
ctx, cancel := context.WithCancel(context.Background())
112-
cancelPrev = cancel
113-
114-
err = ctrl.Init(ctx)
115-
if err != nil {
116-
return err
117-
}
118-
119-
go ctrl.Run(ctx, processorsCount)
120-
return nil
121-
}, legacy.ApplyLegacyConfig)
94+
ctrl := controller.NewController(k8sClient, dynamicClient, argocdService, namespace, appLabelSelector, registry)
95+
err = ctrl.Init(context.Background())
12296
if err != nil {
123-
log.Fatal(err)
97+
return err
12498
}
99+
100+
go ctrl.Run(context.Background(), processorsCount)
125101
<-context.Background().Done()
126102
return nil
127103
},

cmd/tools/tools.go

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,42 @@
11
package tools
22

33
import (
4-
"github.com/argoproj-labs/argocd-notifications/expr"
4+
"log"
5+
56
"github.com/argoproj-labs/argocd-notifications/shared/argocd"
67
"github.com/argoproj-labs/argocd-notifications/shared/k8s"
7-
"github.com/argoproj-labs/argocd-notifications/shared/legacy"
8+
"github.com/argoproj-labs/argocd-notifications/shared/settings"
9+
"k8s.io/client-go/kubernetes"
10+
"k8s.io/client-go/tools/clientcmd"
811

912
"github.com/argoproj/notifications-engine/pkg/cmd"
10-
"github.com/argoproj/notifications-engine/pkg/services"
11-
"github.com/ghodss/yaml"
1213
"github.com/spf13/cobra"
13-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1414
)
1515

1616
func NewToolsCommand() *cobra.Command {
1717
var (
1818
argocdRepoServer string
1919
)
2020

21-
toolsCommand := cmd.NewToolsCommand("argocd-notifications", cmd.Config{
22-
Resource: k8s.Applications,
23-
SecretName: k8s.SecretName,
24-
ConfigMapName: k8s.ConfigMapName,
25-
CLIName: "argocd-notifications",
26-
CreateVars: func(obj map[string]interface{}, dest services.Destination, cmdContext cmd.CommandContext) (map[string]interface{}, error) {
27-
k8sClient, _, ns, err := cmdContext.GetK8SClients()
21+
var argocdService argocd.Service
22+
toolsCommand := cmd.NewToolsCommand(
23+
"argocd-notifications",
24+
"argocd-notifications",
25+
k8s.Applications,
26+
settings.GetFactorySettings(argocdService), func(clientConfig clientcmd.ClientConfig) {
27+
k8sCfg, err := clientConfig.ClientConfig()
2828
if err != nil {
29-
return nil, err
29+
log.Fatalf("Failed to parse k8s config: %v", err)
3030
}
31-
argocdService, err := argocd.NewArgoCDService(k8sClient, ns, argocdRepoServer)
31+
ns, _, err := clientConfig.Namespace()
3232
if err != nil {
33-
return nil, err
33+
log.Fatalf("Failed to parse k8s config: %v", err)
3434
}
35-
configMap, err := cmdContext.GetConfigMap()
35+
argocdService, err = argocd.NewArgoCDService(kubernetes.NewForConfigOrDie(k8sCfg), ns, argocdRepoServer)
3636
if err != nil {
37-
return nil, err
38-
}
39-
context := map[string]string{}
40-
if contextYaml, ok := configMap.Data["context"]; ok {
41-
if err := yaml.Unmarshal([]byte(contextYaml), &context); err != nil {
42-
return nil, err
43-
}
37+
log.Fatalf("Failed to initalize Argo CD service: %v", err)
4438
}
45-
46-
return expr.Spawn(&unstructured.Unstructured{Object: obj}, argocdService, map[string]interface{}{
47-
"app": obj,
48-
"context": legacy.InjectLegacyVar(context, dest.Service),
49-
}), nil
50-
},
51-
})
39+
})
5240
toolsCommand.PersistentFlags().StringVar(&argocdRepoServer, "argocd-repo-server", "argocd-repo-server:8081", "Argo CD repo server address")
5341
return toolsCommand
54-
5542
}

codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
ignore:
22
- ./**/mocks/*
3+
- ./hack/**/*
34
coverage:
45
status:
56
patch: off

0 commit comments

Comments
 (0)