Skip to content

Commit 0107961

Browse files
committed
add label of generateName prefix in workfow template and cancel workflow using label selector
1 parent bba878a commit 0107961

File tree

7 files changed

+84
-44
lines changed

7 files changed

+84
-44
lines changed

api/restHandler/app/pipeline/configure/DeploymentPipelineRestHandler.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,13 @@ func (handler *PipelineConfigRestHandlerImpl) CancelStage(w http.ResponseWriter,
20712071
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
20722072
return
20732073
}
2074+
var forceAbort bool
2075+
forceAbort, err = strconv.ParseBool(r.URL.Query().Get("forceAbort"))
2076+
if err != nil {
2077+
handler.Logger.Errorw("request err, CancelWorkflow", "err", err)
2078+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
2079+
return
2080+
}
20742081
handler.Logger.Infow("request payload, CancelStage", "pipelineId", pipelineId, "workflowRunnerId", workflowRunnerId)
20752082

20762083
//RBAC
@@ -2082,7 +2089,7 @@ func (handler *PipelineConfigRestHandlerImpl) CancelStage(w http.ResponseWriter,
20822089
}
20832090
//RBAC
20842091

2085-
resp, err := handler.cdHandler.CancelStage(workflowRunnerId, userId)
2092+
resp, err := handler.cdHandler.CancelStage(workflowRunnerId, forceAbort, userId)
20862093
if err != nil {
20872094
handler.Logger.Errorw("service err, CancelStage", "err", err, "pipelineId", pipelineId, "workflowRunnerId", workflowRunnerId)
20882095
if util.IsErrNoRows(err) {

pkg/pipeline/CdHandler.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/adapter/cdWorkflow"
24+
bean2 "github.com/devtron-labs/devtron/pkg/bean"
2425
common2 "github.com/devtron-labs/devtron/pkg/deployment/common"
2526
util2 "github.com/devtron-labs/devtron/pkg/pipeline/util"
2627
"os"
@@ -64,7 +65,7 @@ type CdHandler interface {
6465
FetchCdWorkflowDetails(appId int, environmentId int, pipelineId int, buildId int) (types.WorkflowResponse, error)
6566
DownloadCdWorkflowArtifacts(buildId int) (*os.File, error)
6667
FetchCdPrePostStageStatus(pipelineId int) ([]pipelineBean.CdWorkflowWithArtifact, error)
67-
CancelStage(workflowRunnerId int, userId int32) (int, error)
68+
CancelStage(workflowRunnerId int, forceAbort bool, userId int32) (int, error)
6869
FetchAppWorkflowStatusForTriggerView(appId int) ([]*pipelineConfig.CdWorkflowStatus, error)
6970
FetchAppWorkflowStatusForTriggerViewForEnvironment(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.CdWorkflowStatus, error)
7071
FetchAppDeploymentStatusForEnvironments(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.AppDeploymentStatus, error)
@@ -133,16 +134,12 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService,
133134
return cdh
134135
}
135136

136-
func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int, error) {
137+
func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, forceAbort bool, userId int32) (int, error) {
137138
workflowRunner, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(workflowRunnerId)
138139
if err != nil {
139140
impl.Logger.Errorw("err", "err", err)
140141
return 0, err
141142
}
142-
if !(string(v1alpha1.NodePending) == workflowRunner.Status || string(v1alpha1.NodeRunning) == workflowRunner.Status) {
143-
impl.Logger.Info("cannot cancel stage, stage not in progress")
144-
return 0, errors.New("cannot cancel stage, stage not in progress")
145-
}
146143
pipeline, err := impl.pipelineRepository.FindById(workflowRunner.CdWorkflow.PipelineId)
147144
if err != nil {
148145
impl.Logger.Errorw("error while fetching cd pipeline", "err", err)
@@ -184,10 +181,26 @@ func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int,
184181
Environment: nil,
185182
}
186183
err = impl.workflowService.TerminateWorkflow(cancelWfDtoRequest)
187-
if err != nil {
184+
if err != nil && forceAbort {
185+
impl.Logger.Errorw("error in terminating workflow, with force abort flag as true", "workflowName", workflowRunner.Name, "err", err)
186+
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s", workflowRunnerId, workflowRunner.Name)
187+
err1 := impl.workflowService.TerminateDanglingWorkflows(cancelWfDtoRequest)
188+
if err1 != nil {
189+
impl.Logger.Errorw("error in terminating dangling workflows", "cancelWfDtoRequest", cancelWfDtoRequest, "err", err)
190+
// ignoring error here in case of force abort, confirmed from product
191+
}
192+
} else if err != nil {
188193
impl.Logger.Error("cannot terminate wf runner", "err", err)
189194
return 0, err
190195
}
196+
if forceAbort {
197+
err = impl.handleForceAbortCaseForCdStage(workflowRunner, forceAbort)
198+
if err != nil {
199+
impl.Logger.Errorw("error in handleForceAbortCaseForCdStage", "forceAbortFlag", forceAbort, "workflowRunner", workflowRunner, "err", err)
200+
return 0, err
201+
}
202+
return workflowRunner.Id, nil
203+
}
191204
if len(workflowRunner.ImagePathReservationIds) > 0 {
192205
err := impl.customTagService.DeactivateImagePathReservationByImageIds(workflowRunner.ImagePathReservationIds)
193206
if err != nil {
@@ -206,6 +219,34 @@ func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int,
206219
return workflowRunner.Id, nil
207220
}
208221

222+
func (impl *CdHandlerImpl) updateWorkflowRunnerForForceAbort(workflowRunner *pipelineConfig.CdWorkflowRunner) error {
223+
workflowRunner.Status = executors.WorkflowCancel
224+
workflowRunner.PodStatus = string(bean2.Failed)
225+
workflowRunner.Message = FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE
226+
err := impl.cdWorkflowRepository.UpdateWorkFlowRunner(workflowRunner)
227+
if err != nil {
228+
impl.Logger.Errorw("error in updating workflow status in cd workflow runner in force abort case", "err", err)
229+
return err
230+
}
231+
return nil
232+
}
233+
234+
func (impl *CdHandlerImpl) handleForceAbortCaseForCdStage(workflowRunner *pipelineConfig.CdWorkflowRunner, forceAbort bool) error {
235+
isWorkflowInNonTerminalStage := workflowRunner.Status == string(v1alpha1.NodePending) || workflowRunner.Status == string(v1alpha1.NodeRunning)
236+
if !isWorkflowInNonTerminalStage {
237+
if forceAbort {
238+
return impl.updateWorkflowRunnerForForceAbort(workflowRunner)
239+
} else {
240+
return &util.ApiError{Code: "200", HttpStatusCode: 400, UserMessage: "cannot cancel stage, stage not in progress"}
241+
}
242+
}
243+
//this arises when someone deletes the workflow in resource browser and wants to force abort a cd stage(pre/post)
244+
if workflowRunner.Status == string(v1alpha1.NodeRunning) && forceAbort {
245+
return impl.updateWorkflowRunnerForForceAbort(workflowRunner)
246+
}
247+
return nil
248+
}
249+
209250
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) {
210251
wfStatusRs := impl.extractWorkfowStatus(workflowStatus)
211252
workflowName, status, podStatus, message, podName := wfStatusRs.WorkflowName, wfStatusRs.Status, wfStatusRs.PodStatus, wfStatusRs.Message, wfStatusRs.PodName

pkg/pipeline/CiHandler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ const Running = "Running"
163163
const Starting = "Starting"
164164
const POD_DELETED_MESSAGE = "pod deleted"
165165
const TERMINATE_MESSAGE = "workflow shutdown with strategy: Terminate"
166-
const ABORT_MESSAGE_AFTER_STARTING_STAGE = "workflow shutdown with strategy: Force Abort"
166+
const FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE = "workflow shutdown with strategy: Force Abort"
167167

168168
func (impl *CiHandlerImpl) CheckAndReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error {
169169

@@ -618,7 +618,7 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
618618
if err != nil && forceAbort {
619619
impl.Logger.Errorw("error in terminating workflow, with force abort flag flag as true", "workflowName", workflow.Name, "err", err)
620620

621-
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s-", workflowId, workflow.Name)
621+
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s", workflowId, workflow.Name)
622622
err1 := impl.workflowService.TerminateDanglingWorkflows(cancelWfDtoRequest)
623623
if err1 != nil {
624624
impl.Logger.Errorw("error in terminating dangling workflows", "cancelWfDtoRequest", cancelWfDtoRequest, "err", err)
@@ -631,9 +631,9 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
631631
return 0, err
632632
}
633633
if forceAbort {
634-
err = impl.handleForceAbortCase(workflow, forceAbort)
634+
err = impl.handleForceAbortCaseForCi(workflow, forceAbort)
635635
if err != nil {
636-
impl.Logger.Errorw("error in handleForceAbortCase", "forceAbortFlag", forceAbort, "workflow", workflow, "err", err)
636+
impl.Logger.Errorw("error in handleForceAbortCaseForCi", "forceAbortFlag", forceAbort, "workflow", workflow, "err", err)
637637
return 0, err
638638
}
639639
return workflow.Id, nil
@@ -666,7 +666,7 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
666666
return workflow.Id, nil
667667
}
668668

669-
func (impl *CiHandlerImpl) handleForceAbortCase(workflow *pipelineConfig.CiWorkflow, forceAbort bool) error {
669+
func (impl *CiHandlerImpl) handleForceAbortCaseForCi(workflow *pipelineConfig.CiWorkflow, forceAbort bool) error {
670670
isWorkflowInNonTerminalStage := workflow.Status == string(v1alpha1.NodePending) || workflow.Status == string(v1alpha1.NodeRunning)
671671
if !isWorkflowInNonTerminalStage {
672672
if forceAbort {
@@ -685,7 +685,7 @@ func (impl *CiHandlerImpl) handleForceAbortCase(workflow *pipelineConfig.CiWorkf
685685
func (impl *CiHandlerImpl) updateWorkflowForForceAbort(workflow *pipelineConfig.CiWorkflow) error {
686686
workflow.Status = executors.WorkflowCancel
687687
workflow.PodStatus = string(bean.Failed)
688-
workflow.Message = ABORT_MESSAGE_AFTER_STARTING_STAGE
688+
workflow.Message = FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE
689689
err := impl.ciWorkflowRepository.UpdateWorkFlow(workflow)
690690
if err != nil {
691691
impl.Logger.Errorw("error in updating workflow status", "err", err)

pkg/pipeline/WorkflowService.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,12 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplate(workflowRequest *types.W
158158
}
159159

160160
workflowMainContainer, err := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, infraConfiguration, workflowJson, &workflowTemplate, workflowConfigMaps, workflowSecrets)
161-
162161
if err != nil {
163162
impl.Logger.Errorw("error occurred while getting workflow main container", "err", err)
164163
return bean3.WorkflowTemplate{}, err
165164
}
166-
165+
// if anyone wants to add extra labels in workflow template then leverage below func.
166+
workflowRequest.AddExtraLabelsInWorkflowTemplate()
167167
workflowTemplate.Containers = []v12.Container{workflowMainContainer}
168168
impl.updateBlobStorageConfig(workflowRequest, &workflowTemplate)
169169
if workflowRequest.Type == bean3.CI_WORKFLOW_PIPELINE_TYPE || workflowRequest.Type == bean3.JOB_WORKFLOW_PIPELINE_TYPE {

pkg/pipeline/executors/ArgoWorkflowExecutor.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,16 @@ func (impl *ArgoWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowGenerate
9797
impl.logger.Errorw("cannot build wf client", "workflowGenerateName", workflowGenerateName, "err", err)
9898
return err
9999
}
100-
wfList, err := wfClient.List(context.Background(), v1.ListOptions{})
100+
jobSelectorLabel := fmt.Sprintf("%s=%s", types.WorkflowGenerateNamePrefix, workflowGenerateName)
101+
wfList, err := wfClient.List(context.Background(), v1.ListOptions{LabelSelector: jobSelectorLabel})
101102
if err != nil {
102103
impl.logger.Errorw("error in fetching list of workflows", "namespace", namespace, "err", err)
103104
return err
104105
}
105-
var wfToDelete v1alpha1.Workflow
106106
for _, wf := range wfList.Items {
107-
if wf.GenerateName == workflowGenerateName {
108-
wfToDelete = wf
109-
break
110-
}
111-
}
112-
if len(wfToDelete.Name) > 0 {
113-
err = util.TerminateWorkflow(context.Background(), wfClient, wfToDelete.Name)
107+
err = util.TerminateWorkflow(context.Background(), wfClient, wf.Name)
114108
if err != nil {
115-
impl.logger.Errorw("error in terminating argo executor workflow", "name", wfToDelete.Name, "err", err)
109+
impl.logger.Errorw("error in terminating argo executor workflow", "name", wf.Name, "err", err)
116110
return err
117111
}
118112
}

pkg/pipeline/executors/SystemWorkflowExecutor.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,25 +120,19 @@ func (impl *SystemWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowGenera
120120
impl.logger.Errorw("error occurred while creating k8s client", "workflowGenerateName", workflowGenerateName, "namespace", namespace, "err", err)
121121
return err
122122
}
123-
jobList, err := clientset.BatchV1().Jobs(namespace).List(context.Background(), v12.ListOptions{})
123+
jobSelectorLabel := fmt.Sprintf("%s=%s", types2.WorkflowGenerateNamePrefix, workflowGenerateName)
124+
jobList, err := clientset.BatchV1().Jobs(namespace).List(context.Background(), v12.ListOptions{LabelSelector: jobSelectorLabel})
124125
if err != nil {
125126
impl.logger.Errorw("error occurred while fetching jobs list for terminating dangling workflows", "namespace", namespace, "err", err)
126127
return err
127128
}
128-
var jobToDelete v1.Job
129129
for _, job := range jobList.Items {
130-
if job.ObjectMeta.GenerateName == workflowGenerateName {
131-
jobToDelete = job
132-
break
133-
}
134-
}
135-
if len(jobToDelete.Name) > 0 {
136-
err = clientset.BatchV1().Jobs(namespace).Delete(context.Background(), jobToDelete.Name, v12.DeleteOptions{})
130+
err = clientset.BatchV1().Jobs(namespace).Delete(context.Background(), job.Name, v12.DeleteOptions{})
137131
if err != nil {
138132
if errors.IsNotFound(err) {
139-
err = fmt.Errorf("cannot find job workflow %s", jobToDelete.Name)
133+
err = fmt.Errorf("cannot find job workflow %s", job.Name)
140134
}
141-
impl.logger.Errorw("error occurred while deleting workflow", "workflowName", jobToDelete.Name, "namespace", namespace, "err", err)
135+
impl.logger.Errorw("error occurred while deleting workflow", "workflowName", job.Name, "namespace", namespace, "err", err)
142136
return err
143137
}
144138
}

pkg/pipeline/types/Workflow.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ type WorkflowRequest struct {
151151
HostUrl string `json:"hostUrl"`
152152
}
153153

154+
func (workflowRequest *WorkflowRequest) AddExtraLabelsInWorkflowTemplate() {
155+
workflowRequest.AppLabels[WorkflowGenerateNamePrefix] = workflowRequest.WorkflowNamePrefix
156+
}
157+
154158
func (workflowRequest *WorkflowRequest) updateExternalRunMetadata() {
155159
pipeline := workflowRequest.Pipeline
156160
env := workflowRequest.Env
@@ -589,13 +593,13 @@ func updateContainerEnvs(isCM bool, workflowMainContainer *v1.Container, configS
589593
}
590594
}
591595

592-
const PRE = "PRE"
593-
594-
const POST = "POST"
595-
596-
const CI_NODE_PVC_ALL_ENV = "devtron.ai/ci-pvc-all"
597-
598-
const CI_NODE_PVC_PIPELINE_PREFIX = "devtron.ai/ci-pvc"
596+
const (
597+
PRE = "PRE"
598+
POST = "POST"
599+
CI_NODE_PVC_ALL_ENV = "devtron.ai/ci-pvc-all"
600+
CI_NODE_PVC_PIPELINE_PREFIX = "devtron.ai/ci-pvc"
601+
WorkflowGenerateNamePrefix = "devtron.ai/generate-name-prefix"
602+
)
599603

600604
type CiArtifactDTO struct {
601605
Id int `json:"id"`

0 commit comments

Comments
 (0)