Skip to content

Commit c6e83f1

Browse files
authored
add execution policy for workflow job (#4350)
* add execution policy for workflow job Signed-off-by: Min Min <[email protected]> * add start timestamp as a valid parameter Signed-off-by: Min Min <[email protected]> * add job status output for each job Signed-off-by: Min Min <[email protected]> * change function name Signed-off-by: Min Min <[email protected]> * debug Signed-off-by: Min Min <[email protected]> * fix minor bug Signed-off-by: Min Min <[email protected]> * debug logs Signed-off-by: Min Min <[email protected]> * change stage & job execution logic Signed-off-by: Min Min <[email protected]> * fix Signed-off-by: Min Min <[email protected]> * debug Signed-off-by: Min Min <[email protected]> * Revert "debug" This reverts commit be64b513af8de757ece74833789066a8851b1e2f. * Revert "fix" This reverts commit 8c9fbb2a7d3ea5783fe6d8374228a446d83fff60. * Revert "change stage & job execution logic" This reverts commit deaefe38a9fe05d0c725ddcf647d2ce283d958c8. --------- Signed-off-by: Min Min <[email protected]>
1 parent 1000b0d commit c6e83f1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+939
-297
lines changed

pkg/microservice/aslan/config/consts.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,20 @@ const (
581581
JobErrorPolicyRetry JobErrorPolicy = "retry"
582582
)
583583

584+
type JobExecutePolicyType string
585+
586+
const (
587+
JobExecutePolicyTypeSkip JobExecutePolicyType = "skip"
588+
JobExecutePolicyTypeExecute JobExecutePolicyType = "execute"
589+
)
590+
591+
type JobExecutePolicyMatchRule string
592+
593+
const (
594+
JobExecutePolicyMatchRuleAll JobExecutePolicyMatchRule = "all" // AND logic - all rules must match
595+
JobExecutePolicyMatchRuleAny JobExecutePolicyMatchRule = "any" // OR logic - any rule must match
596+
)
597+
584598
const DefaultDeleteDeploymentTimeout = 10 * time.Minute
585599

586600
// Service creation source for openAPI

pkg/microservice/aslan/core/common/repository/models/wokflow_task_v4.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ type JobTask struct {
116116
Infrastructure string `bson:"infrastructure" json:"infrastructure"`
117117
VMLabels []string `bson:"vm_labels" json:"vm_labels"`
118118

119-
ErrorPolicy *JobErrorPolicy `bson:"error_policy" yaml:"error_policy" json:"error_policy"`
119+
ErrorPolicy *JobErrorPolicy `bson:"error_policy" yaml:"error_policy" json:"error_policy"`
120+
ExecutePolicy *JobExecutePolicy `bson:"execute_policy" yaml:"execute_policy" json:"execute_policy"`
120121
// ErrorHandler is the user ID who did the error handling
121122
ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"`
122123
ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"`

pkg/microservice/aslan/core/common/repository/models/workflow_v4.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ type Job struct {
296296
Spec interface{} `bson:"spec" yaml:"spec" json:"spec"`
297297
RunPolicy config.JobRunPolicy `bson:"run_policy" yaml:"run_policy" json:"run_policy"`
298298
ErrorPolicy *JobErrorPolicy `bson:"error_policy" yaml:"error_policy" json:"error_policy"`
299+
ExecutePolicy *JobExecutePolicy `bson:"execute_policy" yaml:"execute_policy" json:"execute_policy"`
299300
ServiceModules []*WorkflowServiceModule `bson:"service_modules" json:"service_modules"`
300301
}
301302

@@ -305,6 +306,18 @@ type JobErrorPolicy struct {
305306
ApprovalUsers []*User `bson:"approval_users" yaml:"approval_users" json:"approval_users"`
306307
}
307308

309+
type JobExecuteRule struct {
310+
Field string `bson:"field" json:"field"`
311+
Verb string `bson:"verb" json:"verb"`
312+
Value string `bson:"value" json:"value"`
313+
}
314+
315+
type JobExecutePolicy struct {
316+
Type config.JobExecutePolicyType `bson:"type" json:"type" yaml:"type"`
317+
MatchRule config.JobExecutePolicyMatchRule `bson:"match_rule" json:"match_rule" yaml:"match_rule"`
318+
Rules []*JobExecuteRule `bson:"rules" json:"rules" yaml:"rules"`
319+
}
320+
308321
type WorkflowServiceModule struct {
309322
ServiceWithModule `bson:",inline" json:",inline"`
310323
CodeInfo []*types.Repository `bson:"code_info" json:"code_info"`

pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job.go

Lines changed: 124 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
3434
commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models"
35+
"github.com/koderover/zadig/v2/pkg/tool/log"
3536
workflowtool "github.com/koderover/zadig/v2/pkg/tool/workflow"
3637
"github.com/koderover/zadig/v2/pkg/util"
3738
"github.com/koderover/zadig/v2/pkg/util/rand"
@@ -121,6 +122,29 @@ func initJobCtl(job *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTas
121122
}
122123

123124
func runJob(ctx context.Context, job *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx, logger *zap.SugaredLogger, ack func()) {
125+
jobCtl := initJobCtl(job, workflowCtx, logger, ack)
126+
defer func(jobInfo *JobCtl) {
127+
if err := recover(); err != nil {
128+
errMsg := fmt.Sprintf("job: %s panic: %v", job.Name, err)
129+
logger.Errorf(errMsg)
130+
debug.PrintStack()
131+
job.Status = config.StatusFailed
132+
job.Error = errMsg
133+
setJobFinalStatusContext(job, workflowCtx)
134+
}
135+
job.EndTime = time.Now().Unix()
136+
logger.Infof("finish job: %s,status: %s", job.Name, job.Status)
137+
setJobFinalStatusContext(job, workflowCtx)
138+
ack()
139+
logger.Infof("updating job info into db...")
140+
err := jobCtl.SaveInfo(ctx)
141+
if err != nil {
142+
logger.Errorf("update job info: %s into db error: %v", err)
143+
}
144+
}(&jobCtl)
145+
146+
setJobStartTimeContext(job, workflowCtx)
147+
124148
// should skip passed job when workflow task be restarted
125149
if job.Status == config.StatusPassed || job.Status == config.StatusSkipped {
126150
return
@@ -154,30 +178,22 @@ func runJob(ctx context.Context, job *commonmodels.JobTask, workflowCtx *commonm
154178
return
155179
}
156180

181+
// Check execute policy before running the job
182+
if !shouldExecuteJob(job) {
183+
logger.Infof("skipping job: %s due to execute policy", job.Name)
184+
job.Status = config.StatusSkipped
185+
job.StartTime = time.Now().Unix()
186+
job.EndTime = time.Now().Unix()
187+
ack()
188+
return
189+
}
190+
157191
job.Status = config.StatusPrepare
158192
job.StartTime = time.Now().Unix()
159193
job.K8sJobName = getJobName(workflowCtx.WorkflowName, workflowCtx.TaskID)
160194
ack()
161195

162196
logger.Infof("start job: %s,status: %s", job.Name, job.Status)
163-
jobCtl := initJobCtl(job, workflowCtx, logger, ack)
164-
defer func(jobInfo *JobCtl) {
165-
if err := recover(); err != nil {
166-
errMsg := fmt.Sprintf("job: %s panic: %v", job.Name, err)
167-
logger.Errorf(errMsg)
168-
debug.PrintStack()
169-
job.Status = config.StatusFailed
170-
job.Error = errMsg
171-
}
172-
job.EndTime = time.Now().Unix()
173-
logger.Infof("finish job: %s,status: %s", job.Name, job.Status)
174-
ack()
175-
logger.Infof("updating job info into db...")
176-
err := jobCtl.SaveInfo(ctx)
177-
if err != nil {
178-
logger.Errorf("update job info: %s into db error: %v", err)
179-
}
180-
}(&jobCtl)
181197

182198
jobCtl.Run(ctx)
183199

@@ -394,3 +410,93 @@ func getMatchedRegistries(image string, registries []*commonmodels.RegistryNames
394410
}
395411
return resp
396412
}
413+
414+
// evaluateExecuteRule evaluates a single execute rule against the global context
415+
func evaluateExecuteRule(rule *commonmodels.JobExecuteRule) bool {
416+
ruleValue := rule.Value
417+
value := rule.Field
418+
419+
log.Infof("value: %s", value)
420+
log.Infof("ruleValue: %s", ruleValue)
421+
422+
switch rule.Verb {
423+
case string(config.ApplicationFilterActionEq):
424+
return value == ruleValue
425+
case string(config.ApplicationFilterActionNe):
426+
return value != ruleValue
427+
case string(config.ApplicationFilterActionBeginsWith):
428+
return strings.HasPrefix(value, ruleValue)
429+
case string(config.ApplicationFilterActionNotBeginsWith):
430+
return !strings.HasPrefix(value, ruleValue)
431+
case string(config.ApplicationFilterActionEndsWith):
432+
return strings.HasSuffix(value, ruleValue)
433+
case string(config.ApplicationFilterActionNotEndsWith):
434+
return !strings.HasSuffix(value, ruleValue)
435+
case string(config.ApplicationFilterActionContains):
436+
return strings.Contains(value, ruleValue)
437+
case string(config.ApplicationFilterActionNotContains):
438+
return !strings.Contains(value, ruleValue)
439+
default:
440+
return false
441+
}
442+
}
443+
444+
// setJobStartTimeContext sets the global context variable for job start time
445+
// Format: .job.<jobKey>.util.startTime
446+
func setJobStartTimeContext(job *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx) {
447+
startTimeStr := fmt.Sprintf("%d", job.StartTime)
448+
contextKey := fmt.Sprintf("{{.job.%s.util.startTime}}", job.Key)
449+
workflowCtx.GlobalContextSet(contextKey, startTimeStr)
450+
}
451+
452+
// setJobStatusContext sets the global context variable for job status
453+
// Format: .job.<jobKey>.status
454+
func setJobFinalStatusContext(job *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx) {
455+
statusStr := string(job.Status)
456+
contextKey := fmt.Sprintf("{{.job.%s.status}}", job.Key)
457+
workflowCtx.GlobalContextSet(contextKey, statusStr)
458+
}
459+
460+
// shouldExecuteJob determines whether a job should be executed based on its execute policy
461+
func shouldExecuteJob(job *commonmodels.JobTask) bool {
462+
if job.ExecutePolicy == nil || len(job.ExecutePolicy.Rules) == 0 {
463+
// No execute policy means the job should run
464+
return true
465+
}
466+
467+
var rulesMatch bool
468+
469+
matchRule := job.ExecutePolicy.MatchRule
470+
if matchRule == "" {
471+
matchRule = config.JobExecutePolicyMatchRuleAll
472+
}
473+
474+
switch matchRule {
475+
case config.JobExecutePolicyMatchRuleAny:
476+
rulesMatch = false
477+
for _, rule := range job.ExecutePolicy.Rules {
478+
if evaluateExecuteRule(rule) {
479+
rulesMatch = true
480+
break
481+
}
482+
}
483+
case config.JobExecutePolicyMatchRuleAll:
484+
fallthrough
485+
default:
486+
rulesMatch = true
487+
for _, rule := range job.ExecutePolicy.Rules {
488+
if !evaluateExecuteRule(rule) {
489+
rulesMatch = false
490+
break
491+
}
492+
}
493+
}
494+
495+
if job.ExecutePolicy.Type == config.JobExecutePolicyTypeSkip {
496+
return !rulesMatch
497+
} else if job.ExecutePolicy.Type == config.JobExecutePolicyTypeExecute {
498+
return rulesMatch
499+
}
500+
501+
return true
502+
}

pkg/microservice/aslan/core/workflow/service/workflow/controller/job/interface.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ type Job interface {
6464
}
6565

6666
type BasicInfo struct {
67-
name string
68-
jobType config.JobType
69-
errorPolicy *commonmodels.JobErrorPolicy
67+
name string
68+
jobType config.JobType
69+
errorPolicy *commonmodels.JobErrorPolicy
70+
executePolicy *commonmodels.JobExecutePolicy
7071

7172
workflow *commonmodels.WorkflowV4
7273
}

pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_apollo.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package job
1919
import (
2020
"context"
2121
"fmt"
22+
"strings"
2223

2324
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
2425
"github.com/koderover/zadig/v2/pkg/types"
@@ -44,10 +45,11 @@ func CreateApolloJobController(job *commonmodels.Job, workflow *commonmodels.Wor
4445
}
4546

4647
basicInfo := &BasicInfo{
47-
name: job.Name,
48-
jobType: job.JobType,
49-
errorPolicy: job.ErrorPolicy,
50-
workflow: workflow,
48+
name: job.Name,
49+
jobType: job.JobType,
50+
errorPolicy: job.ErrorPolicy,
51+
executePolicy: job.ExecutePolicy,
52+
workflow: workflow,
5153
}
5254

5355
return ApolloJobController{
@@ -213,8 +215,9 @@ func (j ApolloJobController) ToTask(taskID int64) ([]*commonmodels.JobTask, erro
213215
return list
214216
}(),
215217
},
216-
Timeout: 0,
217-
ErrorPolicy: j.errorPolicy,
218+
Timeout: 0,
219+
ErrorPolicy: j.errorPolicy,
220+
ExecutePolicy: j.executePolicy,
218221
}
219222

220223
return []*commonmodels.JobTask{jobTask}, nil
@@ -229,7 +232,16 @@ func (j ApolloJobController) SetRepoCommitInfo() error {
229232
}
230233

231234
func (j ApolloJobController) GetVariableList(jobName string, getAggregatedVariables, getRuntimeVariables, getPlaceHolderVariables, getServiceSpecificVariables, useUserInputValue bool) ([]*commonmodels.KeyVal, error) {
232-
return make([]*commonmodels.KeyVal, 0), nil
235+
resp := make([]*commonmodels.KeyVal, 0)
236+
if getRuntimeVariables {
237+
resp = append(resp, &commonmodels.KeyVal{
238+
Key: strings.Join([]string{"job", j.name, "status"}, "."),
239+
Value: "",
240+
Type: "string",
241+
IsCredential: false,
242+
})
243+
}
244+
return resp, nil
233245
}
234246

235247
func (j ApolloJobController) GetUsedRepos() ([]*types.Repository, error) {

pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_approval.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package job
1818

1919
import (
2020
"fmt"
21+
"strings"
2122

2223
"github.com/samber/lo"
2324
"k8s.io/apimachinery/pkg/util/sets"
@@ -45,10 +46,11 @@ func CreateApprovalJobController(job *commonmodels.Job, workflow *commonmodels.W
4546
}
4647

4748
basicInfo := &BasicInfo{
48-
name: job.Name,
49-
jobType: job.JobType,
50-
errorPolicy: job.ErrorPolicy,
51-
workflow: workflow,
49+
name: job.Name,
50+
jobType: job.JobType,
51+
errorPolicy: job.ErrorPolicy,
52+
executePolicy: job.ExecutePolicy,
53+
workflow: workflow,
5254
}
5355

5456
return ApprovalJobController{
@@ -168,10 +170,11 @@ func (j ApprovalJobController) ToTask(taskID int64) ([]*commonmodels.JobTask, er
168170
JobInfo: map[string]string{
169171
JobNameKey: j.name,
170172
},
171-
JobType: string(config.JobApproval),
172-
Spec: jobSpec,
173-
Timeout: j.jobSpec.Timeout,
174-
ErrorPolicy: j.errorPolicy,
173+
JobType: string(config.JobApproval),
174+
Spec: jobSpec,
175+
Timeout: j.jobSpec.Timeout,
176+
ErrorPolicy: j.errorPolicy,
177+
ExecutePolicy: j.executePolicy,
175178
}
176179

177180
if j.jobSpec.Source == config.SourceFromJob {
@@ -380,7 +383,16 @@ func (j ApprovalJobController) SetRepoCommitInfo() error {
380383
}
381384

382385
func (j ApprovalJobController) GetVariableList(jobName string, getAggregatedVariables, getRuntimeVariables, getPlaceHolderVariables, getServiceSpecificVariables, useUserInputValue bool) ([]*commonmodels.KeyVal, error) {
383-
return make([]*commonmodels.KeyVal, 0), nil
386+
resp := make([]*commonmodels.KeyVal, 0)
387+
if getRuntimeVariables {
388+
resp = append(resp, &commonmodels.KeyVal{
389+
Key: strings.Join([]string{"job", j.name, "status"}, "."),
390+
Value: "",
391+
Type: "string",
392+
IsCredential: false,
393+
})
394+
}
395+
return resp, nil
384396
}
385397

386398
func (j ApprovalJobController) GetUsedRepos() ([]*types.Repository, error) {

0 commit comments

Comments
 (0)