Skip to content

Commit 66bdaa2

Browse files
Merge pull request openstack-k8s-operators#231 from lpiwowar/bugfix/OSPRH-10386
[OSPRH-10386] Update logic for workflows
2 parents 104d029 + 5709ee4 commit 66bdaa2

File tree

5 files changed

+382
-304
lines changed

5 files changed

+382
-304
lines changed

controllers/ansibletest_controller.go

Lines changed: 60 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
22+
"fmt"
2123
"strconv"
2224
"time"
2325

@@ -37,7 +39,6 @@ import (
3739
corev1 "k8s.io/api/core/v1"
3840
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
3941
ctrl "sigs.k8s.io/controller-runtime"
40-
"sigs.k8s.io/controller-runtime/pkg/client"
4142
"sigs.k8s.io/controller-runtime/pkg/log"
4243
)
4344

@@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
6869
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
6970
Log := r.GetLogger(ctx)
7071

71-
// How much time should we wait before calling Reconcile loop when there is a failure
72-
requeueAfter := time.Second * 60
73-
7472
// Fetch the ansible instance
7573
instance := &testv1beta1.AnsibleTest{}
7674
err := r.Client.Get(ctx, req.NamespacedName, instance)
@@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8179
return ctrl.Result{}, err
8280
}
8381

84-
workflowActive := false
85-
if len(instance.Spec.Workflow) > 0 {
86-
workflowActive = true
87-
}
88-
8982
// Create a helper
9083
helper, err := helper.NewHelper(
9184
instance,
@@ -142,40 +135,62 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
142135

143136
}
144137

145-
// Ensure that there is an external counter and read its value
146-
// We use the external counter to keep track of the workflow steps
147-
r.WorkflowStepCounterCreate(ctx, instance, helper)
148-
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
149-
if externalWorkflowCounter == -1 {
150-
return ctrl.Result{RequeueAfter: requeueAfter}, nil
151-
}
138+
workflowLength := len(instance.Spec.Workflow)
139+
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)
152140

153-
// Each job that is being executed by the test operator has
154-
currentWorkflowStep := 0
155-
runningAnsibleJob := &batchv1.Job{}
156-
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
157-
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
158-
if err != nil && !k8s_errors.IsNotFound(err) {
141+
switch nextAction {
142+
case Failure:
159143
return ctrl.Result{}, err
160-
} else if err == nil {
161-
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
162-
}
163144

164-
if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
165-
// The job created by the instance was completed. Release the lock
166-
// so that other instances can spawn a job.
167-
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
168-
Log.Info("Job completed")
145+
case Wait:
146+
Log.Info(InfoWaitingOnJob)
147+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil
148+
149+
case EndTesting:
150+
// All jobs created by the instance were completed. Release the lock
151+
// so that other instances can spawn their jobs.
169152
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
170-
return ctrl.Result{}, err
153+
Log.Info(fmt.Sprintf(InfoCanNotReleaseLock, testOperatorLockName))
154+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
171155
}
156+
157+
instance.Status.Conditions.MarkTrue(
158+
condition.DeploymentReadyCondition,
159+
condition.DeploymentReadyMessage)
160+
161+
Log.Info(InfoTestingCompleted)
162+
return ctrl.Result{}, nil
163+
164+
case CreateFirstJob:
165+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
166+
if !lockAcquired {
167+
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
168+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
169+
}
170+
171+
Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))
172+
173+
case CreateNextJob:
174+
// Confirm that we still hold the lock. This is useful to check if for
175+
// example somebody / something deleted the lock and it got claimed by
176+
// another instance. This is considered to be an error state.
177+
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
178+
if !lockAcquired {
179+
Log.Error(err, ErrConfirmLockOwnership, testOperatorLockName)
180+
return ctrl.Result{RequeueAfter: RequeueAfterValue}, err
181+
}
182+
183+
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))
184+
185+
default:
186+
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
172187
}
173188

174189
serviceLabels := map[string]string{
175190
common.AppSelector: ansibletest.ServiceName,
176-
"workflowStep": strconv.Itoa(externalWorkflowCounter),
177-
"instanceName": instance.Name,
178-
"operator": "test-operator",
191+
workflowStepLabel: strconv.Itoa(nextWorkflowStep),
192+
instanceNameLabel: instance.Name,
193+
operatorNameLabel: "test-operator",
179194
}
180195

181196
// Create PersistentVolumeClaim
@@ -194,51 +209,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
194209
}
195210
// Create PersistentVolumeClaim - end
196211

197-
// If the current job is executing the last workflow step -> do not create another job
198-
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
199-
return ctrl.Result{}, nil
200-
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
201-
return ctrl.Result{}, nil
202-
}
203-
204-
// We are about to start job that spawns the pod with tests.
205-
// This lock ensures that there is always only one pod running.
206-
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
207-
if !lockAcquired {
208-
Log.Info("Can not acquire lock")
209-
requeueAfter := time.Second * 60
210-
return ctrl.Result{RequeueAfter: requeueAfter}, err
211-
}
212-
Log.Info("Lock acquired")
213-
214-
if workflowActive {
215-
r.WorkflowStepCounterIncrease(ctx, instance, helper)
216-
}
217-
218212
instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)
219213

220214
// Create a new job
221215
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
222-
jobName := r.GetJobName(instance, externalWorkflowCounter)
223-
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
216+
jobName := r.GetJobName(instance, nextWorkflowStep)
217+
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
224218
logsPVCName := r.GetPVCLogsName(instance, 0)
225219
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
226-
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
220+
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
227221
if err != nil {
228222
return ctrl.Result{}, err
229223
}
230224

231-
if externalWorkflowCounter < len(instance.Spec.Workflow) {
232-
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
233-
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
225+
if nextWorkflowStep < len(instance.Spec.Workflow) {
226+
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
227+
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
234228
}
235229

236-
if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
237-
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
230+
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
231+
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
238232
}
239233

240-
if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
241-
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
234+
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
235+
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
242236
}
243237
}
244238

@@ -260,7 +254,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
260254
mountCerts,
261255
envVars,
262256
workflowOverrideParams,
263-
externalWorkflowCounter,
257+
nextWorkflowStep,
264258
containerImage,
265259
privileged,
266260
)
@@ -297,7 +291,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
297291
return ctrlResult, nil
298292
}
299293
// Create a new job - end
300-
301294
Log.Info("Reconciled Service successfully")
302295
return ctrl.Result{}, nil
303296
}

0 commit comments

Comments
 (0)