Skip to content

Commit 79739fd

Browse files
fix: Mitigate the bug where items are re-added constantly to the workqueue. argoproj#1193 (argoproj#1243)
This will prevent argo from hanging for up to 16 minutes at a time while processing a rollout. Signed-off-by: Mark Robinson <[email protected]>
1 parent d9d1237 commit 79739fd

File tree

12 files changed

+72
-34
lines changed

12 files changed

+72
-34
lines changed

analysis/controller_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/argoproj/argo-rollouts/utils/queue"
10+
911
log "github.com/sirupsen/logrus"
1012
"github.com/stretchr/testify/assert"
1113
"github.com/undefinedlabs/go-mpatch"
@@ -87,7 +89,7 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
8789
i := informers.NewSharedInformerFactory(f.client, resync())
8890
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync())
8991

90-
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns")
92+
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
9193

9294
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
9395
Addr: "localhost:8080",

controller/controller.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/argoproj/argo-rollouts/utils/queue"
8+
79
"github.com/pkg/errors"
810
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
911
log "github.com/sirupsen/logrus"
@@ -130,11 +132,11 @@ func NewManager(
130132
K8SRequestProvider: k8sRequestProvider,
131133
})
132134

133-
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
134-
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments")
135-
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns")
136-
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
137-
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
135+
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
136+
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments")
137+
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
138+
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
139+
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")
138140

139141
refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, rolloutWorkqueue, rolloutsInformer.Informer())
140142

experiments/controller_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"testing"
99
"time"
1010

11-
"github.com/stretchr/testify/assert"
1211
"github.com/undefinedlabs/go-mpatch"
12+
13+
"github.com/argoproj/argo-rollouts/utils/queue"
14+
15+
"github.com/stretchr/testify/assert"
1316
appsv1 "k8s.io/api/apps/v1"
1417
corev1 "k8s.io/api/core/v1"
1518
"k8s.io/apimachinery/pkg/api/equality"
@@ -316,8 +319,8 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
316319
i := informers.NewSharedInformerFactory(f.client, resync())
317320
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, resync())
318321

319-
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
320-
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments")
322+
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
323+
experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Experiments")
321324

322325
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
323326
Addr: "localhost:8080",

ingress/ingress_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"sync"
55
"testing"
66

7+
"github.com/argoproj/argo-rollouts/utils/queue"
8+
79
"github.com/stretchr/testify/assert"
810
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
911
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -63,8 +65,8 @@ func newFakeIngressController(ing *extensionsv1beta1.Ingress, rollout *v1alpha1.
6365
i := informers.NewSharedInformerFactory(client, 0)
6466
k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
6567

66-
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
67-
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
68+
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
69+
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")
6870

6971
c := NewController(ControllerConfig{
7072
Client: kubeclient,

pkg/kubectl-argo-rollouts/viewcontroller/viewcontroller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"reflect"
66
"time"
77

8+
"github.com/argoproj/argo-rollouts/utils/queue"
9+
810
log "github.com/sirupsen/logrus"
911
"k8s.io/apimachinery/pkg/labels"
1012
"k8s.io/apimachinery/pkg/util/wait"
@@ -98,7 +100,7 @@ func newViewController(namespace string, name string, kubeClient kubernetes.Inte
98100
rolloutLister: rolloutsInformerFactory.Argoproj().V1alpha1().Rollouts().Lister().Rollouts(namespace),
99101
experimentLister: rolloutsInformerFactory.Argoproj().V1alpha1().Experiments().Lister().Experiments(namespace),
100102
analysisRunLister: rolloutsInformerFactory.Argoproj().V1alpha1().AnalysisRuns().Lister().AnalysisRuns(namespace),
101-
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
103+
workqueue: workqueue.NewRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter()),
102104
}
103105

104106
controller.cacheSyncs = append(controller.cacheSyncs,

rollout/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func (c *Controller) syncHandler(key string) error {
377377
resolveErr := c.refResolver.Resolve(r)
378378
roCtx, err := c.newRolloutContext(r)
379379
if err != nil {
380+
logCtx.Errorf("newRolloutContext err %v", err)
380381
return err
381382
}
382383
if resolveErr != nil {
@@ -388,6 +389,9 @@ func (c *Controller) syncHandler(key string) error {
388389
if roCtx.newRollout != nil {
389390
c.writeBackToInformer(roCtx.newRollout)
390391
}
392+
if err != nil {
393+
logCtx.Errorf("roCtx.reconcile err %v", err)
394+
}
391395
return err
392396
}
393397

rollout/controller_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/argoproj/argo-rollouts/utils/queue"
14+
1315
"github.com/ghodss/yaml"
1416
log "github.com/sirupsen/logrus"
1517
"github.com/stretchr/testify/assert"
@@ -487,9 +489,9 @@ func (f *fixture) newController(resync resyncFunc) (*Controller, informers.Share
487489
istioVirtualServiceInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer()
488490
istioDestinationRuleInformer := dynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer()
489491

490-
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
491-
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
492-
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Ingresses")
492+
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 10*time.Second), "Rollouts")
493+
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
494+
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")
493495

494496
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
495497
Addr: "localhost:8080",

rollout/trafficrouting/istio/controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/argoproj/argo-rollouts/utils/queue"
9+
810
log "github.com/sirupsen/logrus"
911
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1012
"k8s.io/apimachinery/pkg/api/meta"
@@ -58,7 +60,7 @@ type IstioController struct {
5860
func NewIstioController(cfg IstioControllerConfig) *IstioController {
5961
c := IstioController{
6062
IstioControllerConfig: cfg,
61-
destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DestinationRules"),
63+
destinationRuleWorkqueue: workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "DestinationRules"),
6264
VirtualServiceLister: dynamiclister.New(cfg.VirtualServiceInformer.GetIndexer(), istioutil.GetIstioVirtualServiceGVR()),
6365
DestinationRuleLister: dynamiclister.New(cfg.DestinationRuleInformer.GetIndexer(), istioutil.GetIstioDestinationRuleGVR()),
6466
}

service/service_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package service
33
import (
44
"testing"
55

6+
"github.com/argoproj/argo-rollouts/utils/queue"
7+
68
"github.com/stretchr/testify/assert"
79
corev1 "k8s.io/api/core/v1"
810
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -65,8 +67,8 @@ func newFakeServiceController(svc *corev1.Service, rollout *v1alpha1.Rollout) (*
6567
i := informers.NewSharedInformerFactory(client, 0)
6668
k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, 0)
6769

68-
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts")
69-
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services")
70+
rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Rollouts")
71+
serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Services")
7072
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
7173
Addr: "localhost:8080",
7274
K8SRequestProvider: &metrics.K8sRequestsCountProvider{},

utils/controller/controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func processNextWorkItem(workqueue workqueue.RateLimitingInterface, objType stri
160160
// Put the item back on
161161
// the workqueue to handle any transient errors.
162162
workqueue.AddRateLimited(key)
163+
164+
logCtx.Infof("%s syncHandler queue retries: %v : key \"%v\"", objType, workqueue.NumRequeues(key), key)
163165
return err
164166
}
165167
// Finally, if no error occurs we Forget this item so it does not

0 commit comments

Comments
 (0)