Skip to content

Commit 5709ee4

Browse files
committed
Update logic for workflows
There was a race condition in the previous implementation of the workflows that was caused by the dependency of the workflow feature on a ConfigMap that served as counter. This commit simplifies the logic of the Reconcile loop in two ways: 1. Removes the dependency for an external workflow counter 2. Introduces NextAction function which decides what next action should be performed by the Reconcile loop based on the current state of the OpenShift cluster. The input of this function is the instance which is currently being processed and a workflowLength. Using these two arguments it then tells the Reconcile loop which actions it should perform: Wait, CreateFirstJob, CreateNextJob, EndTesting, Failure. This approach should simplify the reconcile logic and move the test-operator towards a unified Reconcile loop.
1 parent cd41d06 commit 5709ee4

File tree

5 files changed

+382
-304
lines changed

5 files changed

+382
-304
lines changed

controllers/ansibletest_controller.go

Lines changed: 60 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
22+
"fmt"
2123
"strconv"
2224
"time"
2325

@@ -37,7 +39,6 @@ import (
3739
corev1 "k8s.io/api/core/v1"
3840
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
3941
ctrl "sigs.k8s.io/controller-runtime"
40-
"sigs.k8s.io/controller-runtime/pkg/client"
4142
"sigs.k8s.io/controller-runtime/pkg/log"
4243
)
4344

@@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
6869
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
6970
Log := r.GetLogger(ctx)
7071

71-
// How much time should we wait before calling Reconcile loop when there is a failure
72-
requeueAfter := time.Second * 60
73-
7472
// Fetch the ansible instance
7573
instance := &testv1beta1.AnsibleTest{}
7674
err := r.Client.Get(ctx, req.NamespacedName, instance)
@@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8179
return ctrl.Result{}, err
8280
}
8381

84-
workflowActive := false
85-
if len(instance.Spec.Workflow) > 0 {
86-
workflowActive = true
87-
}
88-
8982
// Create a helper
9083
helper, err := helper.NewHelper(
9184
instance,
@@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
142135

143136
}
144137

145-
// Ensure that there is an external counter and read its value
146-
// We use the external counter to keep track of the workflow steps
147-
r.WorkflowStepCounterCreate(ctx, instance, helper)
148-
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
149-
if externalWorkflowCounter == -1 {
150-
return ctrl.Result{RequeueAfter: requeueAfter}, nil
151-
}
138+
workflowLength := len(instance.Spec.Workflow)
139+
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)
152140

153-
// Each job that is being executed by the test operator has
154-
currentWorkflowStep := 0
155-
runningAnsibleJob := &batchv1.Job{}
156-
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
157-
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
158-
if err != nil && !k8s_errors.IsNotFound(err) {
141+
switch nextAction {
142+
case Failure:
159143
return ctrl.Result{}, err
160-
} else if err == nil {
161-
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
162-
}
163144

164-
if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
165-
// The job created by the instance was completed. Release the lock
166-
// so that other instances can spawn a job.
167-
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
168-
Log.Info("Job completed")
145+
case Wait:
146+
Log.Info(InfoWaitingOnJob)
147+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil
148+
149+
case EndTesting:
150+
// All jobs created by the instance were completed. Release the lock
151+
// so that other instances can spawn their jobs.
169152
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
170-
return ctrl.Result{}, err
153+
Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName))
154+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
171155
}
156+
157+
instance.Status.Conditions.MarkTrue(
158+
condition.DeploymentReadyCondition,
159+
condition.DeploymentReadyMessage)
160+
161+
Log.Info(InfoTestingCompleted)
162+
return ctrl.Result{}, nil
163+
164+
case CreateFirstJob:
165+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
166+
if !lockAcquired {
167+
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
168+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
169+
}
170+
171+
Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))
172+
173+
case CreateNextJob:
174+
// Confirm that we still hold the lock. This is useful to check if for
175+
// example somebody / something deleted the lock and it got claimed by
176+
// another instance. This is considered to be an error state.
177+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
178+
if !lockAcquired {
179+
Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName)
180+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
181+
}
182+
183+
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))
184+
185+
default:
186+
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
172187
}
173188

174189
serviceLabels := map[string]string{
175190
common.AppSelector: ansibletest.ServiceName,
176-
"workflowStep": strconv.Itoa(externalWorkflowCounter),
177-
"instanceName": instance.Name,
178-
"operator": "test-operator",
191+
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
192+
instanceNameLabel: instance.Name,
193+
operatorNameLabel: "test-operator",
179194
}
180195

181196
// Create PersistentVolumeClaim
@@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
194209
}
195210
// Create PersistentVolumeClaim - end
196211

197-
// If the current job is executing the last workflow step -> do not create another job
198-
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
199-
return ctrl.Result{}, nil
200-
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
201-
return ctrl.Result{}, nil
202-
}
203-
204-
// We are about to start job that spawns the pod with tests.
205-
// This lock ensures that there is always only one pod running.
206-
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
207-
if !lockAcquired {
208-
Log.Info("Can not acquire lock")
209-
requeueAfter := time.Second * 60
210-
return ctrl.Result{RequeueAfter: requeueAfter}, err
211-
}
212-
Log.Info("Lock acquired")
213-
214-
if workflowActive {
215-
r.WorkflowStepCounterIncrease(ctx, instance, helper)
216-
}
217-
218212
instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)
219213

220214
// Create a new job
221215
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
222-
jobName := r.GetJobName(instance, externalWorkflowCounter)
223-
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
216+
jobName := r.GetJobName(instance, nextWorkflowStep)
217+
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
224218
logsPVCName := r.GetPVCLogsName(instance, 0)
225219
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
226-
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
220+
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
227221
if err != nil {
228222
return ctrl.Result{}, err
229223
}
230224

231-
if externalWorkflowCounter < len(instance.Spec.Workflow) {
232-
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
233-
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
225+
if nextWorkflowStep < len(instance.Spec.Workflow) {
226+
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
227+
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
234228
}
235229

236-
if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
237-
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
230+
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
231+
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
238232
}
239233

240-
if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
241-
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
234+
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
235+
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
242236
}
243237
}
244238

@@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
260254
mountCerts,
261255
envVars,
262256
workflowOverrideParams,
263-
externalWorkflowCounter,
257+
nextWorkflowStep,
264258
containerImage,
265259
privileged,
266260
)
@@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
297291
return ctrlResult, nil
298292
}
299293
// Create a new job - end
300-
301294
Log.Info("Reconciled Service successfully")
302295
return ctrl.Result{}, nil
303296
}

0 commit comments

Comments
 (0)