Skip to content

Commit 30f8f3e

Browse files
author
Shawn Hurley
authored
pkg/ansible: Changing the reconciliation loop to use re-queue. (#599)
* Adding annotation to override reconciliation time. * Adding default time as part of controller creation. * Adding option to specify duration as controller option.
1 parent 913cbf7 commit 30f8f3e

File tree

7 files changed

+59
-136
lines changed

7 files changed

+59
-136
lines changed

Gopkg.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

commands/operator-sdk/cmd/up/local.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func upLocalAnsible() {
134134
}
135135

136136
printVersion()
137+
logrus.Infof("watching namespace: %s", namespace)
137138
done := make(chan error)
138139

139140
// start the proxy
@@ -144,7 +145,7 @@ func upLocalAnsible() {
144145
})
145146

146147
// start the operator
147-
go ansibleOperator.Run(done, mgr, namespace)
148+
go ansibleOperator.Run(done, mgr)
148149

149150
// wait for either to finish
150151
err = <-done

pkg/ansible/controller/controller.go

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,30 @@ import (
3535

3636
// Options - options for your controller
3737
type Options struct {
38-
EventHandlers []events.EventHandler
39-
LoggingLevel events.LogLevel
40-
Runner runner.Runner
41-
Namespace string
42-
GVK schema.GroupVersionKind
43-
// StopChannel is used to deal with the bug:
44-
// https://github.com/kubernetes-sigs/controller-runtime/issues/103
45-
StopChannel <-chan struct{}
38+
EventHandlers []events.EventHandler
39+
LoggingLevel events.LogLevel
40+
Runner runner.Runner
41+
GVK schema.GroupVersionKind
42+
ReconcilePeriod time.Duration
4643
}
4744

4845
// Add - Creates a new ansible operator controller and adds it to the manager
4946
func Add(mgr manager.Manager, options Options) {
50-
logrus.Infof("Watching %s/%v, %s, %s", options.GVK.Group, options.GVK.Version, options.GVK.Kind, options.Namespace)
47+
logrus.Infof("Watching %s/%v, %s", options.GVK.Group, options.GVK.Version, options.GVK.Kind)
5148
if options.EventHandlers == nil {
5249
options.EventHandlers = []events.EventHandler{}
5350
}
5451
eventHandlers := append(options.EventHandlers, events.NewLoggingEventHandler(options.LoggingLevel))
52+
if options.ReconcilePeriod == time.Duration(0) {
53+
options.ReconcilePeriod = time.Minute
54+
}
5555

5656
aor := &AnsibleOperatorReconciler{
57-
Client: mgr.GetClient(),
58-
GVK: options.GVK,
59-
Runner: options.Runner,
60-
EventHandlers: eventHandlers,
57+
Client: mgr.GetClient(),
58+
GVK: options.GVK,
59+
Runner: options.Runner,
60+
EventHandlers: eventHandlers,
61+
ReconcilePeriod: options.ReconcilePeriod,
6162
}
6263

6364
// Register the GVK with the schema
@@ -79,13 +80,4 @@ func Add(mgr manager.Manager, options Options) {
7980
if err := c.Watch(&source.Kind{Type: u}, &crthandler.EnqueueRequestForObject{}); err != nil {
8081
log.Fatal(err)
8182
}
82-
83-
r := NewReconcileLoop(time.Minute*1, options.GVK, mgr.GetClient(), options.Namespace)
84-
r.Stop = options.StopChannel
85-
cs := &source.Channel{Source: r.Source}
86-
cs.InjectStopChannel(options.StopChannel)
87-
if err := c.Watch(cs, &crthandler.EnqueueRequestForObject{}); err != nil {
88-
log.Fatal(err)
89-
}
90-
r.Start()
9183
}

pkg/ansible/controller/reconcile.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/json"
2020
"errors"
2121
"os"
22+
"time"
2223

2324
"github.com/operator-framework/operator-sdk/pkg/ansible/events"
2425
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/kubeconfig"
@@ -34,12 +35,20 @@ import (
3435
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3536
)
3637

38+
const (
39+
// ReconcilePeriodAnnotation - annotation used by a user to specify the reconcilation interval for the CR.
40+
// To use create a CR with an annotation "ansible.operator-sdk/reconcile-period: 30s" or some other valid
41+
// Duration. This will override the operators/or controllers reconcile period for that particular CR.
42+
ReconcilePeriodAnnotation = "ansible.operator-sdk/reconcile-period"
43+
)
44+
3745
// AnsibleOperatorReconciler - object to reconcile runner requests
3846
type AnsibleOperatorReconciler struct {
39-
GVK schema.GroupVersionKind
40-
Runner runner.Runner
41-
Client client.Client
42-
EventHandlers []events.EventHandler
47+
GVK schema.GroupVersionKind
48+
Runner runner.Runner
49+
Client client.Client
50+
EventHandlers []events.EventHandler
51+
ReconcilePeriod time.Duration
4352
}
4453

4554
// Reconcile - handle the event.
@@ -53,6 +62,14 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
5362
if err != nil {
5463
return reconcile.Result{}, err
5564
}
65+
reconcileResult := reconcile.Result{RequeueAfter: r.ReconcilePeriod}
66+
if ds, ok := u.GetAnnotations()[ReconcilePeriodAnnotation]; ok {
67+
duration, err := time.ParseDuration(ds)
68+
if err != nil {
69+
return reconcileResult, err
70+
}
71+
reconcileResult.RequeueAfter = duration
72+
}
5673

5774
deleted := u.GetDeletionTimestamp() != nil
5875
finalizer, finalizerExists := r.Runner.GetFinalizer()
@@ -63,11 +80,11 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
6380
finalizers := append(pendingFinalizers, finalizer)
6481
u.SetFinalizers(finalizers)
6582
err := r.Client.Update(context.TODO(), u)
66-
return reconcile.Result{}, err
83+
return reconcileResult, err
6784
}
6885
if !contains(pendingFinalizers, finalizer) && deleted {
6986
logrus.Info("Resource is terminated, skipping reconcilation")
70-
return reconcile.Result{}, nil
87+
return reconcileResult, nil
7188
}
7289

7390
spec := u.Object["spec"]
@@ -77,9 +94,10 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
7794
u.Object["spec"] = map[string]interface{}{}
7895
err = r.Client.Update(context.TODO(), u)
7996
if err != nil {
80-
return reconcile.Result{}, err
97+
return reconcileResult, err
8198
}
82-
return reconcile.Result{Requeue: true}, nil
99+
reconcileResult.Requeue = true
100+
return reconcileResult, nil
83101
}
84102
status := u.Object["status"]
85103
_, ok = status.(map[string]interface{})
@@ -88,9 +106,10 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
88106
u.Object["status"] = map[string]interface{}{}
89107
err = r.Client.Update(context.TODO(), u)
90108
if err != nil {
91-
return reconcile.Result{}, err
109+
return reconcileResult, err
92110
}
93-
return reconcile.Result{Requeue: true}, nil
111+
reconcileResult.Requeue = true
112+
return reconcileResult, nil
94113
}
95114

96115
// If status is an empty map we can assume CR was just created
@@ -101,9 +120,10 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
101120
}
102121
err = r.Client.Update(context.TODO(), u)
103122
if err != nil {
104-
return reconcile.Result{}, err
123+
return reconcileResult, err
105124
}
106-
return reconcile.Result{Requeue: true}, nil
125+
reconcileResult.Requeue = true
126+
return reconcileResult, nil
107127
}
108128

109129
ownerRef := metav1.OwnerReference{
@@ -115,12 +135,12 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
115135

116136
kc, err := kubeconfig.Create(ownerRef, "http://localhost:8888", u.GetNamespace())
117137
if err != nil {
118-
return reconcile.Result{}, err
138+
return reconcileResult, err
119139
}
120140
defer os.Remove(kc.Name())
121141
eventChan, err := r.Runner.Run(u, kc.Name())
122142
if err != nil {
123-
return reconcile.Result{}, err
143+
return reconcileResult, err
124144
}
125145

126146
// iterate events from ansible, looking for the final one
@@ -144,7 +164,7 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
144164
if statusEvent.Event == "" {
145165
err := errors.New("did not receive playbook_on_stats event")
146166
logrus.Error(err.Error())
147-
return reconcile.Result{}, err
167+
return reconcileResult, err
148168
}
149169

150170
// We only want to update the CustomResource once, so we'll track changes and do it at the end
@@ -186,9 +206,10 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
186206
err = r.Client.Update(context.TODO(), u)
187207
}
188208
if !runSuccessful {
189-
return reconcile.Result{Requeue: true}, err
209+
reconcileResult.Requeue = true
210+
return reconcileResult, err
190211
}
191-
return reconcile.Result{}, err
212+
return reconcileResult, err
192213
}
193214

194215
func contains(l []string, s string) bool {

pkg/ansible/controller/source.go

Lines changed: 0 additions & 82 deletions
This file was deleted.

pkg/ansible/operator/operator.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ import (
3030
// Run - A blocking function which starts a controller-runtime manager
3131
// It starts an Operator by reading in the values in `./watches.yaml`, adds a controller
3232
// to the manager, and finally running the manager.
33-
func Run(done chan error, mgr manager.Manager, namespace string) {
34-
if namespace == "" {
35-
namespace = "default"
36-
}
33+
func Run(done chan error, mgr manager.Manager) {
3734
watches, err := runner.NewFromWatches("./watches.yaml")
3835
if err != nil {
3936
logrus.Error("Failed to get watches")
@@ -45,10 +42,8 @@ func Run(done chan error, mgr manager.Manager, namespace string) {
4542

4643
for gvk, runner := range watches {
4744
controller.Add(mgr, controller.Options{
48-
GVK: gvk,
49-
Namespace: namespace,
50-
Runner: runner,
51-
StopChannel: c,
45+
GVK: gvk,
46+
Runner: runner,
5247
})
5348
}
5449
log.Fatal(mgr.Start(c))

test/ansible-operator/cmd/ansible-operator/main.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ func main() {
7171
}
7272

7373
func runSDK(done chan error, mgr manager.Manager) {
74-
namespace := "default"
7574
watches, err := runner.NewFromWatches("/opt/ansible/watches.yaml")
7675
if err != nil {
7776
logrus.Error("Failed to get watches")
@@ -83,10 +82,8 @@ func runSDK(done chan error, mgr manager.Manager) {
8382

8483
for gvk, runner := range watches {
8584
controller.Add(mgr, controller.Options{
86-
GVK: gvk,
87-
Namespace: namespace,
88-
Runner: runner,
89-
StopChannel: c,
85+
GVK: gvk,
86+
Runner: runner,
9087
})
9188
}
9289
log.Fatal(mgr.Start(c))

0 commit comments

Comments
 (0)