Skip to content

Commit 0bb4701

Browse files
Merge pull request #5990 from devtron-labs/force-abort-fix
fix: Force abort fix
2 parents 92b080b + 4e07b50 commit 0bb4701

File tree

11 files changed

+246
-64
lines changed

11 files changed

+246
-64
lines changed

api/restHandler/app/pipeline/configure/BuildPipelineRestHandler.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,12 +1562,16 @@ func (handler *PipelineConfigRestHandlerImpl) CancelWorkflow(w http.ResponseWrit
15621562
return
15631563
}
15641564
var forceAbort bool
1565-
forceAbort, err = strconv.ParseBool(queryVars.Get("forceAbort"))
1566-
if err != nil {
1567-
handler.Logger.Errorw("request err, CancelWorkflow", "err", err)
1568-
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
1569-
return
1565+
forceAbortQueryParam := queryVars.Get("forceAbort")
1566+
if len(forceAbortQueryParam) > 0 {
1567+
forceAbort, err = strconv.ParseBool(forceAbortQueryParam)
1568+
if err != nil {
1569+
handler.Logger.Errorw("request err, CancelWorkflow", "err", err)
1570+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
1571+
return
1572+
}
15701573
}
1574+
15711575
handler.Logger.Infow("request payload, CancelWorkflow", "workflowId", workflowId, "pipelineId", pipelineId)
15721576

15731577
ciPipeline, err := handler.ciPipelineRepository.FindById(pipelineId)

api/restHandler/app/pipeline/configure/DeploymentPipelineRestHandler.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,16 @@ func (handler *PipelineConfigRestHandlerImpl) CancelStage(w http.ResponseWriter,
20712071
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
20722072
return
20732073
}
2074+
var forceAbort bool
2075+
forceAbortQueryParam := r.URL.Query().Get("forceAbort")
2076+
if len(forceAbortQueryParam) > 0 {
2077+
forceAbort, err = strconv.ParseBool(forceAbortQueryParam)
2078+
if err != nil {
2079+
handler.Logger.Errorw("request err, CancelWorkflow", "err", err)
2080+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
2081+
return
2082+
}
2083+
}
20742084
handler.Logger.Infow("request payload, CancelStage", "pipelineId", pipelineId, "workflowRunnerId", workflowRunnerId)
20752085

20762086
//RBAC
@@ -2082,7 +2092,7 @@ func (handler *PipelineConfigRestHandlerImpl) CancelStage(w http.ResponseWriter,
20822092
}
20832093
//RBAC
20842094

2085-
resp, err := handler.cdHandler.CancelStage(workflowRunnerId, userId)
2095+
resp, err := handler.cdHandler.CancelStage(workflowRunnerId, forceAbort, userId)
20862096
if err != nil {
20872097
handler.Logger.Errorw("service err, CancelStage", "err", err, "pipelineId", pipelineId, "workflowRunnerId", workflowRunnerId)
20882098
if util.IsErrNoRows(err) {

pkg/pipeline/CdHandler.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/adapter/cdWorkflow"
24+
bean2 "github.com/devtron-labs/devtron/pkg/bean"
2425
common2 "github.com/devtron-labs/devtron/pkg/deployment/common"
2526
util2 "github.com/devtron-labs/devtron/pkg/pipeline/util"
2627
"os"
@@ -64,7 +65,7 @@ type CdHandler interface {
6465
FetchCdWorkflowDetails(appId int, environmentId int, pipelineId int, buildId int) (types.WorkflowResponse, error)
6566
DownloadCdWorkflowArtifacts(buildId int) (*os.File, error)
6667
FetchCdPrePostStageStatus(pipelineId int) ([]pipelineBean.CdWorkflowWithArtifact, error)
67-
CancelStage(workflowRunnerId int, userId int32) (int, error)
68+
CancelStage(workflowRunnerId int, forceAbort bool, userId int32) (int, error)
6869
FetchAppWorkflowStatusForTriggerView(appId int) ([]*pipelineConfig.CdWorkflowStatus, error)
6970
FetchAppWorkflowStatusForTriggerViewForEnvironment(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.CdWorkflowStatus, error)
7071
FetchAppDeploymentStatusForEnvironments(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.AppDeploymentStatus, error)
@@ -133,16 +134,12 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService,
133134
return cdh
134135
}
135136

136-
func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int, error) {
137+
func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, forceAbort bool, userId int32) (int, error) {
137138
workflowRunner, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(workflowRunnerId)
138139
if err != nil {
139140
impl.Logger.Errorw("err", "err", err)
140141
return 0, err
141142
}
142-
if !(string(v1alpha1.NodePending) == workflowRunner.Status || string(v1alpha1.NodeRunning) == workflowRunner.Status) {
143-
impl.Logger.Info("cannot cancel stage, stage not in progress")
144-
return 0, errors.New("cannot cancel stage, stage not in progress")
145-
}
146143
pipeline, err := impl.pipelineRepository.FindById(workflowRunner.CdWorkflow.PipelineId)
147144
if err != nil {
148145
impl.Logger.Errorw("error while fetching cd pipeline", "err", err)
@@ -175,11 +172,35 @@ func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int,
175172
}
176173
}
177174
// Terminate workflow
178-
err = impl.workflowService.TerminateWorkflow(workflowRunner.ExecutorType, workflowRunner.Name, workflowRunner.Namespace, restConfig, isExtCluster, nil)
179-
if err != nil {
175+
cancelWfDtoRequest := &types.CancelWfRequestDto{
176+
ExecutorType: workflowRunner.ExecutorType,
177+
WorkflowName: workflowRunner.Name,
178+
Namespace: workflowRunner.Namespace,
179+
RestConfig: restConfig,
180+
IsExt: isExtCluster,
181+
Environment: nil,
182+
}
183+
err = impl.workflowService.TerminateWorkflow(cancelWfDtoRequest)
184+
if err != nil && forceAbort {
185+
impl.Logger.Errorw("error in terminating workflow, with force abort flag as true", "workflowName", workflowRunner.Name, "err", err)
186+
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s", workflowRunnerId, workflowRunner.Name)
187+
err1 := impl.workflowService.TerminateDanglingWorkflows(cancelWfDtoRequest)
188+
if err1 != nil {
189+
impl.Logger.Errorw("error in terminating dangling workflows", "cancelWfDtoRequest", cancelWfDtoRequest, "err", err)
190+
// ignoring error here in case of force abort, confirmed from product
191+
}
192+
} else if err != nil {
180193
impl.Logger.Error("cannot terminate wf runner", "err", err)
181194
return 0, err
182195
}
196+
if forceAbort {
197+
err = impl.handleForceAbortCaseForCdStage(workflowRunner, forceAbort)
198+
if err != nil {
199+
impl.Logger.Errorw("error in handleForceAbortCaseForCdStage", "forceAbortFlag", forceAbort, "workflowRunner", workflowRunner, "err", err)
200+
return 0, err
201+
}
202+
return workflowRunner.Id, nil
203+
}
183204
if len(workflowRunner.ImagePathReservationIds) > 0 {
184205
err := impl.customTagService.DeactivateImagePathReservationByImageIds(workflowRunner.ImagePathReservationIds)
185206
if err != nil {
@@ -198,6 +219,34 @@ func (impl *CdHandlerImpl) CancelStage(workflowRunnerId int, userId int32) (int,
198219
return workflowRunner.Id, nil
199220
}
200221

222+
func (impl *CdHandlerImpl) updateWorkflowRunnerForForceAbort(workflowRunner *pipelineConfig.CdWorkflowRunner) error {
223+
workflowRunner.Status = executors.WorkflowCancel
224+
workflowRunner.PodStatus = string(bean2.Failed)
225+
workflowRunner.Message = FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE
226+
err := impl.cdWorkflowRepository.UpdateWorkFlowRunner(workflowRunner)
227+
if err != nil {
228+
impl.Logger.Errorw("error in updating workflow status in cd workflow runner in force abort case", "err", err)
229+
return err
230+
}
231+
return nil
232+
}
233+
234+
func (impl *CdHandlerImpl) handleForceAbortCaseForCdStage(workflowRunner *pipelineConfig.CdWorkflowRunner, forceAbort bool) error {
235+
isWorkflowInNonTerminalStage := workflowRunner.Status == string(v1alpha1.NodePending) || workflowRunner.Status == string(v1alpha1.NodeRunning)
236+
if !isWorkflowInNonTerminalStage {
237+
if forceAbort {
238+
return impl.updateWorkflowRunnerForForceAbort(workflowRunner)
239+
} else {
240+
return &util.ApiError{Code: "200", HttpStatusCode: 400, UserMessage: "cannot cancel stage, stage not in progress"}
241+
}
242+
}
243+
//this arises when someone deletes the workflow in resource browser and wants to force abort a cd stage(pre/post)
244+
if workflowRunner.Status == string(v1alpha1.NodeRunning) && forceAbort {
245+
return impl.updateWorkflowRunnerForForceAbort(workflowRunner)
246+
}
247+
return nil
248+
}
249+
201250
func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) {
202251
wfStatusRs := impl.extractWorkfowStatus(workflowStatus)
203252
workflowName, status, podStatus, message, podName := wfStatusRs.WorkflowName, wfStatusRs.Status, wfStatusRs.PodStatus, wfStatusRs.Message, wfStatusRs.PodName

pkg/pipeline/CiHandler.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ const Running = "Running"
163163
const Starting = "Starting"
164164
const POD_DELETED_MESSAGE = "pod deleted"
165165
const TERMINATE_MESSAGE = "workflow shutdown with strategy: Terminate"
166-
const ABORT_MESSAGE_AFTER_STARTING_STAGE = "workflow shutdown with strategy: Force Abort"
166+
const FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE = "workflow shutdown with strategy: Force Abort"
167167

168168
func (impl *CiHandlerImpl) CheckAndReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error {
169169

@@ -592,20 +592,9 @@ func (impl *CiHandlerImpl) GetBuildHistory(pipelineId int, appId int, offset int
592592
func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, error) {
593593
workflow, err := impl.ciWorkflowRepository.FindById(workflowId)
594594
if err != nil {
595-
impl.Logger.Errorw("err", "err", err)
595+
impl.Logger.Errorw("error in finding ci-workflow by workflow id", "ciWorkflowId", workflowId, "err", err)
596596
return 0, err
597597
}
598-
if !(string(v1alpha1.NodePending) == workflow.Status || string(v1alpha1.NodeRunning) == workflow.Status) {
599-
if forceAbort {
600-
return impl.cancelBuildAfterStartWorkflowStage(workflow)
601-
} else {
602-
return 0, &util.ApiError{Code: "200", HttpStatusCode: 400, UserMessage: "cannot cancel build, build not in progress"}
603-
}
604-
}
605-
//this arises when someone deletes the workflow in resource browser and wants to force abort a ci
606-
if workflow.Status == string(v1alpha1.NodeRunning) && forceAbort {
607-
return impl.cancelBuildAfterStartWorkflowStage(workflow)
608-
}
609598
isExt := workflow.Namespace != DefaultCiWorkflowNamespace
610599
var env *repository3.Environment
611600
var restConfig *rest.Config
@@ -615,15 +604,40 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
615604
return 0, err
616605
}
617606
}
618-
619607
// Terminate workflow
620-
err = impl.workflowService.TerminateWorkflow(workflow.ExecutorType, workflow.Name, workflow.Namespace, restConfig, isExt, env)
621-
if err != nil && strings.Contains(err.Error(), "cannot find workflow") {
608+
cancelWfDtoRequest := &types.CancelWfRequestDto{
609+
ExecutorType: workflow.ExecutorType,
610+
WorkflowName: workflow.Name,
611+
Namespace: workflow.Namespace,
612+
RestConfig: restConfig,
613+
IsExt: isExt,
614+
Environment: env,
615+
}
616+
// Terminate workflow
617+
err = impl.workflowService.TerminateWorkflow(cancelWfDtoRequest)
618+
if err != nil && forceAbort {
619+
impl.Logger.Errorw("error in terminating workflow, with force abort flag flag as true", "workflowName", workflow.Name, "err", err)
620+
621+
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s", workflowId, workflow.Name)
622+
err1 := impl.workflowService.TerminateDanglingWorkflows(cancelWfDtoRequest)
623+
if err1 != nil {
624+
impl.Logger.Errorw("error in terminating dangling workflows", "cancelWfDtoRequest", cancelWfDtoRequest, "err", err)
625+
// ignoring error here in case of force abort, confirmed from product
626+
}
627+
} else if err != nil && strings.Contains(err.Error(), "cannot find workflow") {
622628
return 0, &util.ApiError{Code: "200", HttpStatusCode: http.StatusBadRequest, UserMessage: err.Error()}
623629
} else if err != nil {
624630
impl.Logger.Errorw("cannot terminate wf", "err", err)
625631
return 0, err
626632
}
633+
if forceAbort {
634+
err = impl.handleForceAbortCaseForCi(workflow, forceAbort)
635+
if err != nil {
636+
impl.Logger.Errorw("error in handleForceAbortCaseForCi", "forceAbortFlag", forceAbort, "workflow", workflow, "err", err)
637+
return 0, err
638+
}
639+
return workflow.Id, nil
640+
}
627641

628642
workflow.Status = executors.WorkflowCancel
629643
if workflow.ExecutorType == cdWorkflow.WORKFLOW_EXECUTOR_TYPE_SYSTEM {
@@ -652,16 +666,32 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
652666
return workflow.Id, nil
653667
}
654668

655-
func (impl *CiHandlerImpl) cancelBuildAfterStartWorkflowStage(workflow *pipelineConfig.CiWorkflow) (int, error) {
669+
func (impl *CiHandlerImpl) handleForceAbortCaseForCi(workflow *pipelineConfig.CiWorkflow, forceAbort bool) error {
670+
isWorkflowInNonTerminalStage := workflow.Status == string(v1alpha1.NodePending) || workflow.Status == string(v1alpha1.NodeRunning)
671+
if !isWorkflowInNonTerminalStage {
672+
if forceAbort {
673+
return impl.updateWorkflowForForceAbort(workflow)
674+
} else {
675+
return &util.ApiError{Code: "200", HttpStatusCode: 400, UserMessage: "cannot cancel build, build not in progress"}
676+
}
677+
}
678+
//this arises when someone deletes the workflow in resource browser and wants to force abort a ci
679+
if workflow.Status == string(v1alpha1.NodeRunning) && forceAbort {
680+
return impl.updateWorkflowForForceAbort(workflow)
681+
}
682+
return nil
683+
}
684+
685+
func (impl *CiHandlerImpl) updateWorkflowForForceAbort(workflow *pipelineConfig.CiWorkflow) error {
656686
workflow.Status = executors.WorkflowCancel
657687
workflow.PodStatus = string(bean.Failed)
658-
workflow.Message = ABORT_MESSAGE_AFTER_STARTING_STAGE
688+
workflow.Message = FORCE_ABORT_MESSAGE_AFTER_STARTING_STAGE
659689
err := impl.ciWorkflowRepository.UpdateWorkFlow(workflow)
660690
if err != nil {
661691
impl.Logger.Errorw("error in updating workflow status", "err", err)
662-
return 0, err
692+
return err
663693
}
664-
return workflow.Id, nil
694+
return nil
665695
}
666696

667697
func (impl *CiHandlerImpl) getRestConfig(workflow *pipelineConfig.CiWorkflow) (*rest.Config, error) {

pkg/pipeline/WorkflowService.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
v1alpha12 "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
2424
"github.com/argoproj/argo-workflows/v3/workflow/util"
25+
"github.com/devtron-labs/common-lib/utils"
2526
"github.com/devtron-labs/common-lib/utils/k8s"
2627
"github.com/devtron-labs/common-lib/utils/k8s/commonBean"
2728
"github.com/devtron-labs/devtron/api/bean"
@@ -39,6 +40,8 @@ import (
3940
v12 "k8s.io/api/core/v1"
4041
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4142
"k8s.io/client-go/rest"
43+
"net/http"
44+
"strconv"
4245
"strings"
4346
)
4447

@@ -51,7 +54,8 @@ type WorkflowService interface {
5154
GetWorkflowStatus(executorType cdWorkflow.WorkflowExecutorType, name string, namespace string, restConfig *rest.Config) (*types.WorkflowStatus, error)
5255
// ListAllWorkflows(namespace string) (*v1alpha1.WorkflowList, error)
5356
// UpdateWorkflow(wf *v1alpha1.Workflow) (*v1alpha1.Workflow, error)
54-
TerminateWorkflow(executorType cdWorkflow.WorkflowExecutorType, name string, namespace string, restConfig *rest.Config, isExt bool, environment *repository.Environment) error
57+
TerminateWorkflow(cancelWfDtoRequest *types.CancelWfRequestDto) error
58+
TerminateDanglingWorkflows(cancelWfDtoRequest *types.CancelWfRequestDto) error
5559
}
5660

5761
type WorkflowServiceImpl struct {
@@ -157,12 +161,10 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplate(workflowRequest *types.W
157161
}
158162

159163
workflowMainContainer, err := workflowRequest.GetWorkflowMainContainer(impl.ciCdConfig, infraConfiguration, workflowJson, &workflowTemplate, workflowConfigMaps, workflowSecrets)
160-
161164
if err != nil {
162165
impl.Logger.Errorw("error occurred while getting workflow main container", "err", err)
163166
return bean3.WorkflowTemplate{}, err
164167
}
165-
166168
workflowTemplate.Containers = []v12.Container{workflowMainContainer}
167169
impl.updateBlobStorageConfig(workflowRequest, &workflowTemplate)
168170
if workflowRequest.Type == bean3.CI_WORKFLOW_PIPELINE_TYPE || workflowRequest.Type == bean3.JOB_WORKFLOW_PIPELINE_TYPE {
@@ -352,27 +354,42 @@ func (impl *WorkflowServiceImpl) GetWorkflowStatus(executorType cdWorkflow.Workf
352354
return wfStatus, err
353355
}
354356

355-
func (impl *WorkflowServiceImpl) TerminateWorkflow(executorType cdWorkflow.WorkflowExecutorType, name string, namespace string, restConfig *rest.Config, isExt bool, environment *repository.Environment) error {
356-
impl.Logger.Debugw("terminating wf", "name", name)
357+
func (impl *WorkflowServiceImpl) TerminateWorkflow(cancelWfDtoRequest *types.CancelWfRequestDto) error {
358+
impl.Logger.Debugw("terminating wf", "name", cancelWfDtoRequest.WorkflowName)
357359
var err error
358-
if executorType != "" {
359-
workflowExecutor := impl.getWorkflowExecutor(executorType)
360+
if cancelWfDtoRequest.ExecutorType != "" {
361+
workflowExecutor := impl.getWorkflowExecutor(cancelWfDtoRequest.ExecutorType)
360362
if workflowExecutor == nil {
361363
return errors.New("workflow executor not found")
362364
}
363-
if restConfig == nil {
364-
restConfig = impl.config
365+
if cancelWfDtoRequest.RestConfig == nil {
366+
cancelWfDtoRequest.RestConfig = impl.config
365367
}
366-
err = workflowExecutor.TerminateWorkflow(name, namespace, restConfig)
368+
err = workflowExecutor.TerminateWorkflow(cancelWfDtoRequest.WorkflowName, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
367369
} else {
368-
wfClient, err := impl.getWfClient(environment, namespace, isExt)
370+
wfClient, err := impl.getWfClient(cancelWfDtoRequest.Environment, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.IsExt)
369371
if err != nil {
370372
return err
371373
}
372-
err = util.TerminateWorkflow(context.Background(), wfClient, name)
374+
err = util.TerminateWorkflow(context.Background(), wfClient, cancelWfDtoRequest.WorkflowName)
375+
}
376+
return err
377+
}
378+
379+
func (impl *WorkflowServiceImpl) TerminateDanglingWorkflows(cancelWfDtoRequest *types.CancelWfRequestDto) error {
380+
impl.Logger.Debugw("terminating dangling wf", "name", cancelWfDtoRequest.WorkflowName)
381+
var err error
382+
workflowExecutor := impl.getWorkflowExecutor(cancelWfDtoRequest.ExecutorType)
383+
if workflowExecutor == nil {
384+
return &utils.ApiError{HttpStatusCode: http.StatusNotFound, Code: strconv.Itoa(http.StatusNotFound), InternalMessage: "workflow executor not found", UserMessage: "workflow executor not found"}
373385
}
386+
if cancelWfDtoRequest.RestConfig == nil {
387+
cancelWfDtoRequest.RestConfig = impl.config
388+
}
389+
err = workflowExecutor.TerminateDanglingWorkflow(cancelWfDtoRequest.WorkflowGenerateName, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
374390
return err
375391
}
392+
376393
func (impl *WorkflowServiceImpl) getRuntimeEnvClientInstance(environment *repository.Environment) (v1alpha12.WorkflowInterface, error) {
377394
restConfig, err, _ := impl.k8sCommonService.GetRestConfigByClusterId(context.Background(), environment.ClusterId)
378395
if err != nil {

0 commit comments

Comments
 (0)