1
1
package controller
2
2
3
3
import (
4
+ "encoding/json"
4
5
"fmt"
5
6
"time"
6
7
7
- "github.com/argoproj/argo-rollouts/utils/queue "
8
-
8
+ "github.com/argoproj/notifications-engine/pkg/api "
9
+ "github.com/argoproj/notifications-engine/pkg/controller"
9
10
"github.com/pkg/errors"
10
11
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
11
12
log "github.com/sirupsen/logrus"
12
13
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13
15
"k8s.io/apimachinery/pkg/util/runtime"
14
16
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
15
17
"k8s.io/apimachinery/pkg/util/wait"
@@ -28,11 +30,14 @@ import (
28
30
"github.com/argoproj/argo-rollouts/controller/metrics"
29
31
"github.com/argoproj/argo-rollouts/experiments"
30
32
"github.com/argoproj/argo-rollouts/ingress"
33
+ "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
31
34
clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
32
35
rolloutscheme "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/scheme"
33
36
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1"
34
37
"github.com/argoproj/argo-rollouts/rollout"
35
38
"github.com/argoproj/argo-rollouts/service"
39
+ "github.com/argoproj/argo-rollouts/utils/defaults"
40
+ "github.com/argoproj/argo-rollouts/utils/queue"
36
41
"github.com/argoproj/argo-rollouts/utils/record"
37
42
)
38
43
@@ -61,12 +66,13 @@ const (
61
66
62
67
// Manager is the controller implementation for Argo-Rollout resources
63
68
type Manager struct {
64
- metricsServer * metrics.MetricsServer
65
- rolloutController * rollout.Controller
66
- experimentController * experiments.Controller
67
- analysisController * analysis.Controller
68
- serviceController * service.Controller
69
- ingressController * ingress.Controller
69
+ metricsServer * metrics.MetricsServer
70
+ rolloutController * rollout.Controller
71
+ experimentController * experiments.Controller
72
+ analysisController * analysis.Controller
73
+ serviceController * service.Controller
74
+ ingressController * ingress.Controller
75
+ notificationsController controller.NotificationController
70
76
71
77
rolloutSynced cache.InformerSynced
72
78
experimentSynced cache.InformerSynced
@@ -77,6 +83,8 @@ type Manager struct {
77
83
ingressSynced cache.InformerSynced
78
84
jobSynced cache.InformerSynced
79
85
replicasSetSynced cache.InformerSynced
86
+ configMapSynced cache.InformerSynced
87
+ secretSynced cache.InformerSynced
80
88
81
89
rolloutWorkqueue workqueue.RateLimitingInterface
82
90
serviceWorkqueue workqueue.RateLimitingInterface
@@ -110,6 +118,8 @@ func NewManager(
110
118
clusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer ,
111
119
istioVirtualServiceInformer cache.SharedIndexInformer ,
112
120
istioDestinationRuleInformer cache.SharedIndexInformer ,
121
+ configMapInformer coreinformers.ConfigMapInformer ,
122
+ secretInformer coreinformers.SecretInformer ,
113
123
resyncPeriod time.Duration ,
114
124
instanceID string ,
115
125
metricsPort int ,
@@ -139,8 +149,22 @@ func NewManager(
139
149
ingressWorkqueue := workqueue .NewNamedRateLimitingQueue (queue .DefaultArgoRolloutsRateLimiter (), "Ingresses" )
140
150
141
151
refResolver := rollout .NewInformerBasedWorkloadRefResolver (namespace , dynamicclientset , discoveryClient , rolloutWorkqueue , rolloutsInformer .Informer ())
142
-
143
- recorder := record .NewEventRecorder (kubeclientset , metrics .MetricRolloutEventsTotal )
152
+ apiFactory := api .NewFactory (record .NewAPIFactorySettings (), defaults .Namespace (), secretInformer .Informer (), configMapInformer .Informer ())
153
+ recorder := record .NewEventRecorder (kubeclientset , metrics .MetricRolloutEventsTotal , apiFactory )
154
+ notificationsController := controller .NewController (dynamicclientset .Resource (v1alpha1 .RolloutGVR ), rolloutsInformer .Informer (), apiFactory ,
155
+ controller .WithToUnstructured (func (obj metav1.Object ) (* unstructured.Unstructured , error ) {
156
+ data , err := json .Marshal (obj )
157
+ if err != nil {
158
+ return nil , err
159
+ }
160
+ res := & unstructured.Unstructured {}
161
+ err = json .Unmarshal (data , res )
162
+ if err != nil {
163
+ return nil , err
164
+ }
165
+ return res , nil
166
+ }),
167
+ )
144
168
145
169
rolloutController := rollout .NewController (rollout.ControllerConfig {
146
170
Namespace : namespace ,
@@ -229,6 +253,8 @@ func NewManager(
229
253
analysisTemplateSynced : analysisTemplateInformer .Informer ().HasSynced ,
230
254
clusterAnalysisTemplateSynced : clusterAnalysisTemplateInformer .Informer ().HasSynced ,
231
255
replicasSetSynced : replicaSetInformer .Informer ().HasSynced ,
256
+ configMapSynced : configMapInformer .Informer ().HasSynced ,
257
+ secretSynced : secretInformer .Informer ().HasSynced ,
232
258
rolloutWorkqueue : rolloutWorkqueue ,
233
259
experimentWorkqueue : experimentWorkqueue ,
234
260
analysisRunWorkqueue : analysisRunWorkqueue ,
@@ -239,6 +265,7 @@ func NewManager(
239
265
ingressController : ingressController ,
240
266
experimentController : experimentController ,
241
267
analysisController : analysisController ,
268
+ notificationsController : notificationsController ,
242
269
dynamicClientSet : dynamicclientset ,
243
270
refResolver : refResolver ,
244
271
namespace : namespace ,
@@ -260,7 +287,7 @@ func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, ingressThreadiness
260
287
261
288
// Wait for the caches to be synced before starting workers
262
289
log .Info ("Waiting for controller's informer caches to sync" )
263
- if ok := cache .WaitForCacheSync (stopCh , c .serviceSynced , c .ingressSynced , c .jobSynced , c .rolloutSynced , c .experimentSynced , c .analysisRunSynced , c .analysisTemplateSynced , c .replicasSetSynced ); ! ok {
290
+ if ok := cache .WaitForCacheSync (stopCh , c .serviceSynced , c .ingressSynced , c .jobSynced , c .rolloutSynced , c .experimentSynced , c .analysisRunSynced , c .analysisTemplateSynced , c .replicasSetSynced , c . configMapSynced , c . secretSynced ); ! ok {
264
291
return fmt .Errorf ("failed to wait for caches to sync" )
265
292
}
266
293
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
@@ -277,6 +304,8 @@ func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, ingressThreadiness
277
304
go wait .Until (func () { c .ingressController .Run (ingressThreadiness , stopCh ) }, time .Second , stopCh )
278
305
go wait .Until (func () { c .experimentController .Run (experimentThreadiness , stopCh ) }, time .Second , stopCh )
279
306
go wait .Until (func () { c .analysisController .Run (analysisThreadiness , stopCh ) }, time .Second , stopCh )
307
+ go wait .Until (func () { c .notificationsController .Run (rolloutThreadiness , stopCh ) }, time .Second , stopCh )
308
+
280
309
log .Info ("Started controller" )
281
310
282
311
go func () {
0 commit comments