Skip to content

Commit cb6bbeb

Browse files
committed
set infra config to retriggered ci
1 parent 38e220d commit cb6bbeb

File tree

6 files changed

+201
-26
lines changed

6 files changed

+201
-26
lines changed

pkg/build/trigger/HandlerService.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,10 @@ func (impl *HandlerServiceImpl) triggerCiPipeline(trigger *types.CiTriggerReques
683683
return 0, err
684684
}
685685
savedCiWf, workflowRequest = trigger.RetriggerCiWorkflow, trigger.RetriggerWorkflowRequest
686+
if trigger.RetriggerCiWorkflow != nil {
687+
workflowRequest.ReferenceCiWorkflowId = trigger.RetriggerCiWorkflow.ReferenceCiWorkflowId
688+
}
689+
workflowRequest.IsReTrigger = true
686690
} else {
687691
variableSnapshot, savedCiWf, workflowRequest, err = impl.StartCiWorkflowAndPrepareWfRequest(trigger)
688692
if err != nil {

pkg/executor/WorkflowService.go

Lines changed: 137 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
repository2 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository"
3232
"github.com/devtron-labs/devtron/pkg/config/read"
3333
v1 "github.com/devtron-labs/devtron/pkg/infraConfig/bean/v1"
34+
infraConfigAudit "github.com/devtron-labs/devtron/pkg/infraConfig/service/audit"
3435
k8s2 "github.com/devtron-labs/devtron/pkg/k8s"
3536
"github.com/devtron-labs/devtron/pkg/pipeline"
3637
bean3 "github.com/devtron-labs/devtron/pkg/pipeline/bean"
@@ -42,6 +43,7 @@ import (
4243
"github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/hook"
4344
"go.uber.org/zap"
4445
v12 "k8s.io/api/core/v1"
46+
"k8s.io/apimachinery/pkg/api/resource"
4547
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4648
"k8s.io/client-go/rest"
4749
"net/http"
@@ -63,19 +65,20 @@ type WorkflowService interface {
6365
}
6466

6567
type WorkflowServiceImpl struct {
66-
Logger *zap.SugaredLogger
67-
config *rest.Config
68-
ciCdConfig *types.CiCdConfig
69-
configMapService read.ConfigReadService
70-
envRepository repository2.EnvironmentRepository
71-
globalCMCSService pipeline.GlobalCMCSService
72-
argoWorkflowExecutor executors.ArgoWorkflowExecutor
73-
systemWorkflowExecutor executors.SystemWorkflowExecutor
74-
k8sCommonService k8s2.K8sCommonService
75-
infraProvider infraProviders.InfraProvider
76-
ucid ucid.Service
77-
k8sUtil *k8s.K8sServiceImpl
78-
triggerAuditHook hook.TriggerAuditHook
68+
Logger *zap.SugaredLogger
69+
config *rest.Config
70+
ciCdConfig *types.CiCdConfig
71+
configMapService read.ConfigReadService
72+
envRepository repository2.EnvironmentRepository
73+
globalCMCSService pipeline.GlobalCMCSService
74+
argoWorkflowExecutor executors.ArgoWorkflowExecutor
75+
systemWorkflowExecutor executors.SystemWorkflowExecutor
76+
k8sCommonService k8s2.K8sCommonService
77+
infraProvider infraProviders.InfraProvider
78+
ucid ucid.Service
79+
k8sUtil *k8s.K8sServiceImpl
80+
triggerAuditHook hook.TriggerAuditHook
81+
infraConfigAuditService infraConfigAudit.InfraConfigAuditService
7982
}
8083

8184
// TODO: Move to bean
@@ -92,20 +95,22 @@ func NewWorkflowServiceImpl(Logger *zap.SugaredLogger,
9295
ucid ucid.Service,
9396
k8sUtil *k8s.K8sServiceImpl,
9497
triggerAuditHook hook.TriggerAuditHook,
98+
infraConfigAuditService infraConfigAudit.InfraConfigAuditService,
9599
) (*WorkflowServiceImpl, error) {
96100
commonWorkflowService := &WorkflowServiceImpl{
97-
Logger: Logger,
98-
ciCdConfig: ciCdConfig,
99-
configMapService: configMapService,
100-
envRepository: envRepository,
101-
globalCMCSService: globalCMCSService,
102-
argoWorkflowExecutor: argoWorkflowExecutor,
103-
k8sUtil: k8sUtil,
104-
systemWorkflowExecutor: systemWorkflowExecutor,
105-
k8sCommonService: k8sCommonService,
106-
infraProvider: infraProvider,
107-
ucid: ucid,
108-
triggerAuditHook: triggerAuditHook,
101+
Logger: Logger,
102+
ciCdConfig: ciCdConfig,
103+
configMapService: configMapService,
104+
envRepository: envRepository,
105+
globalCMCSService: globalCMCSService,
106+
argoWorkflowExecutor: argoWorkflowExecutor,
107+
k8sUtil: k8sUtil,
108+
systemWorkflowExecutor: systemWorkflowExecutor,
109+
k8sCommonService: k8sCommonService,
110+
infraProvider: infraProvider,
111+
ucid: ucid,
112+
triggerAuditHook: triggerAuditHook,
113+
infraConfigAuditService: infraConfigAuditService,
109114
}
110115
restConfig, err := k8sUtil.GetK8sInClusterRestConfig()
111116
if err != nil {
@@ -255,6 +260,17 @@ func (impl *WorkflowServiceImpl) createWorkflowTemplateAndAuditTrigger(workflowR
255260
return bean3.WorkflowTemplate{}, err
256261
}
257262
workflowTemplate.DevtronInstanceUID = devtronUCID
263+
264+
if workflowRequest.IsCiTypeWorkflowRequest() && workflowRequest.IsCiRetriggerType() {
265+
// here we need to update the workflow template with cpu request and limit, memory limit and request and Build timeout (in oss this is applicable on all ci builds i.e. applied globally)
266+
err = impl.updateWorkflowTemplateWithInfraConfigFromHistory(workflowRequest, &workflowTemplate)
267+
if err != nil {
268+
impl.Logger.Errorw("error occurred while updating workflow template with infra config from history", "err", err)
269+
return bean3.WorkflowTemplate{}, err
270+
}
271+
272+
}
273+
258274
return workflowTemplate, nil
259275
}
260276

@@ -507,3 +523,99 @@ func (impl *WorkflowServiceImpl) getWfClient(environment *repository2.Environmen
507523
}
508524
return wfClient, nil
509525
}
526+
527+
// updateWorkflowTemplateWithInfraConfigFromHistory updates the workflow template with CPU, memory limits/requests and timeout
528+
// from the infra_config_trigger_history table based on previous workflow ID.
529+
func (impl *WorkflowServiceImpl) updateWorkflowTemplateWithInfraConfigFromHistory(workflowRequest *types.WorkflowRequest, workflowTemplate *bean3.WorkflowTemplate) error {
530+
// Skip if no previous workflow ID is available or if this is not a CI/Job workflow
531+
if workflowRequest.ReferenceCiWorkflowId == 0 {
532+
impl.Logger.Debugw("skipping infra config history update", "referenceWorkflowId", workflowRequest.ReferenceCiWorkflowId, "workflowType", workflowRequest.Type)
533+
return nil
534+
}
535+
536+
// Get infra config from history based on previous workflow ID
537+
historicalInfraConfig, err := impl.infraConfigAuditService.GetInfraConfigByWorkflowId(workflowRequest.ReferenceCiWorkflowId, bean.CI_WORKFLOW_TYPE.String())
538+
if err != nil {
539+
impl.Logger.Warnw("could not retrieve infra config from history, using current config", "referenceWorkflowId", workflowRequest.ReferenceCiWorkflowId, "err", err)
540+
return nil // Don't fail the workflow, just use current config
541+
}
542+
543+
if historicalInfraConfig == nil {
544+
impl.Logger.Debugw("no historical infra config found, using current config", "referenceWorkflowId", workflowRequest.ReferenceCiWorkflowId)
545+
return nil
546+
}
547+
548+
impl.Logger.Infow("applying historical infra config to workflow template", "referenceWorkflowId", workflowRequest.ReferenceCiWorkflowId, "historicalConfig", historicalInfraConfig)
549+
550+
// apply historical infra configurations to a workflow template
551+
impl.applyInfraConfigToWorkflowTemplate(workflowRequest, workflowTemplate, historicalInfraConfig)
552+
553+
return nil
554+
}
555+
556+
// applyInfraConfigToWorkflowTemplate applies the historical infra configuration to the workflow template.
557+
// This function handles the core OSS functionality and can be extended in enterprise for additional fields.
558+
func (impl *WorkflowServiceImpl) applyInfraConfigToWorkflowTemplate(workflowRequest *types.WorkflowRequest, workflowTemplate *bean3.WorkflowTemplate, infraConfig *v1.InfraConfig) {
559+
// Apply timeout configuration
560+
if infraConfig.GetCiDefaultTimeout() > 0 {
561+
timeout := infraConfig.GetCiTimeoutInt()
562+
workflowTemplate.SetActiveDeadlineSeconds(timeout)
563+
impl.Logger.Debugw("applied historical timeout to workflow template", "timeout", timeout, "workflowId", workflowRequest.WorkflowId)
564+
}
565+
566+
// Apply CPU and memory resource configurations to the main container
567+
if len(workflowTemplate.Containers) > 0 {
568+
container := &workflowTemplate.Containers[0]
569+
570+
// Initialize resources if not present
571+
if container.Resources.Limits == nil {
572+
container.Resources.Limits = make(v12.ResourceList)
573+
}
574+
if container.Resources.Requests == nil {
575+
container.Resources.Requests = make(v12.ResourceList)
576+
}
577+
578+
// Apply CPU limits and requests
579+
if infraConfig.GetCiLimitCpu() != "" {
580+
if cpuLimit, err := resource.ParseQuantity(infraConfig.GetCiLimitCpu()); err == nil {
581+
container.Resources.Limits[v12.ResourceCPU] = cpuLimit
582+
impl.Logger.Debugw("applied historical CPU limit to workflow template", "cpuLimit", infraConfig.GetCiLimitCpu(), "workflowId", workflowRequest.WorkflowId)
583+
} else {
584+
impl.Logger.Warnw("failed to parse CPU limit from historical config", "cpuLimit", infraConfig.GetCiLimitCpu(), "err", err)
585+
}
586+
}
587+
if infraConfig.GetCiReqCpu() != "" {
588+
if cpuRequest, err := resource.ParseQuantity(infraConfig.GetCiReqCpu()); err == nil {
589+
container.Resources.Requests[v12.ResourceCPU] = cpuRequest
590+
impl.Logger.Debugw("applied historical CPU request to workflow template", "cpuRequest", infraConfig.GetCiReqCpu(), "workflowId", workflowRequest.WorkflowId)
591+
} else {
592+
impl.Logger.Warnw("failed to parse CPU request from historical config", "cpuRequest", infraConfig.GetCiReqCpu(), "err", err)
593+
}
594+
}
595+
596+
// Apply memory limits and requests
597+
if infraConfig.GetCiLimitMem() != "" {
598+
if memoryLimit, err := resource.ParseQuantity(infraConfig.GetCiLimitMem()); err == nil {
599+
container.Resources.Limits[v12.ResourceMemory] = memoryLimit
600+
impl.Logger.Debugw("applied historical memory limit to workflow template", "memoryLimit", infraConfig.GetCiLimitMem(), "workflowId", workflowRequest.WorkflowId)
601+
} else {
602+
impl.Logger.Warnw("failed to parse memory limit from historical config", "memoryLimit", infraConfig.GetCiLimitMem(), "err", err)
603+
}
604+
}
605+
if infraConfig.GetCiReqMem() != "" {
606+
if memoryRequest, err := resource.ParseQuantity(infraConfig.GetCiReqMem()); err == nil {
607+
container.Resources.Requests[v12.ResourceMemory] = memoryRequest
608+
impl.Logger.Debugw("applied historical memory request to workflow template", "memoryRequest", infraConfig.GetCiReqMem(), "workflowId", workflowRequest.WorkflowId)
609+
} else {
610+
impl.Logger.Warnw("failed to parse memory request from historical config", "memoryRequest", infraConfig.GetCiReqMem(), "err", err)
611+
}
612+
}
613+
}
614+
615+
impl.applyEnterpriseInfraConfigToWorkflowTemplate(workflowRequest, workflowTemplate, infraConfig)
616+
}
617+
618+
// applyEnterpriseInfraConfigToWorkflowTemplate is a placeholder for enterprise-specific infra config application.
619+
func (impl *WorkflowServiceImpl) applyEnterpriseInfraConfigToWorkflowTemplate(workflowRequest *types.WorkflowRequest, workflowTemplate *bean3.WorkflowTemplate, infraConfig *v1.InfraConfig) {
620+
impl.Logger.Debugw("enterprise infra config application (no-op in OSS)", "workflowId", workflowRequest.WorkflowId)
621+
}

pkg/infraConfig/repository/audit/infraConfigAuditRepository.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
type InfraConfigAuditRepository interface {
2626
SaveInfraConfigHistorySnapshot(tx *pg.Tx, infraConfigTriggerHistories []*InfraConfigTriggerHistory) error
27+
GetInfraConfigHistoryByWorkflowId(workflowId int, workflowType WorkflowType) ([]*InfraConfigTriggerHistory, error)
2728
}
2829

2930
type InfraConfigAuditRepositoryImpl struct {
@@ -83,3 +84,15 @@ func (impl *InfraConfigAuditRepositoryImpl) SaveInfraConfigHistorySnapshot(tx *p
8384
}
8485
return nil
8586
}
87+
88+
func (impl *InfraConfigAuditRepositoryImpl) GetInfraConfigHistoryByWorkflowId(workflowId int, workflowType WorkflowType) ([]*InfraConfigTriggerHistory, error) {
89+
var infraConfigHistories []*InfraConfigTriggerHistory
90+
err := impl.dbConnection.Model(&infraConfigHistories).
91+
Where("workflow_id = ?", workflowId).
92+
Where("workflow_type = ?", workflowType).
93+
Select()
94+
if err != nil {
95+
return nil, err
96+
}
97+
return infraConfigHistories, nil
98+
}

pkg/infraConfig/service/audit/infraConfigAudit.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import (
2323
"github.com/devtron-labs/devtron/pkg/sql"
2424
"github.com/go-pg/pg"
2525
"go.uber.org/zap"
26+
"strconv"
2627
)
2728

2829
type InfraConfigAuditService interface {
2930
SaveCiInfraConfigHistorySnapshot(tx *pg.Tx, workflowId int, triggeredBy int32, infraConfigs map[string]*infraBean.InfraConfig) error
31+
GetInfraConfigByWorkflowId(workflowId int, workflowType string) (*infraBean.InfraConfig, error)
3032
sql.TransactionWrapper
3133
}
3234

@@ -71,3 +73,36 @@ func (impl *InfraConfigAuditServiceImpl) SaveCiInfraConfigHistorySnapshot(tx *pg
7173
}
7274
return nil
7375
}
76+
77+
func (impl *InfraConfigAuditServiceImpl) GetInfraConfigByWorkflowId(workflowId int, workflowType string) (*infraBean.InfraConfig, error) {
78+
workflowTypeEnum := audit.WorkflowType(workflowType)
79+
infraConfigHistories, err := impl.infraConfigAuditRepository.GetInfraConfigHistoryByWorkflowId(workflowId, workflowTypeEnum)
80+
if err != nil {
81+
impl.logger.Errorw("failed to get infra config history by workflow id", "error", err, "workflowId", workflowId, "workflowType", workflowType)
82+
return nil, err
83+
}
84+
85+
// Create InfraConfig from history records
86+
infraConfig := &infraBean.InfraConfig{}
87+
for _, history := range infraConfigHistories {
88+
switch history.Key {
89+
case infraBean.CPULimitKey:
90+
infraConfig.CiLimitCpu = history.ValueString
91+
case infraBean.CPURequestKey:
92+
infraConfig.CiReqCpu = history.ValueString
93+
case infraBean.MemoryLimitKey:
94+
infraConfig.CiLimitMem = history.ValueString
95+
case infraBean.MemoryRequestKey:
96+
infraConfig.CiReqMem = history.ValueString
97+
case infraBean.TimeOutKey:
98+
// Convert string back to float64
99+
if timeout, parseErr := strconv.ParseFloat(history.ValueString, 64); parseErr == nil {
100+
infraConfig.CiDefaultTimeout = timeout
101+
} else {
102+
impl.logger.Warnw("failed to parse timeout value", "valueString", history.ValueString, "parseErr", parseErr)
103+
}
104+
}
105+
}
106+
107+
return infraConfig, nil
108+
}

pkg/pipeline/types/Workflow.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ type WorkflowRequest struct {
119119
IsExtRun bool `json:"isExtRun"`
120120
ImageRetryCount int `json:"imageRetryCount"`
121121
ImageRetryInterval int `json:"imageRetryInterval"`
122+
IsReTrigger bool `json:"isReTrigger"`
123+
ReferenceCiWorkflowId int `json:"referenceCiWorkflowId"` // data filled when retriggering a ci workflow
122124
// Data from CD Workflow service
123125
WorkflowRunnerId int `json:"workflowRunnerId"`
124126
CdPipelineId int `json:"cdPipelineId"`
@@ -164,6 +166,15 @@ func (workflowRequest *WorkflowRequest) IsCdStageTypePost() bool {
164166
return workflowRequest.StageType == POST
165167
}
166168

169+
func (workflowRequest *WorkflowRequest) IsCiTypeWorkflowRequest() bool {
170+
// pipelineId in workflowRequest refers to CiPipelineId, only filled for ci type workflowRequest
171+
return workflowRequest.PipelineId > 0
172+
}
173+
174+
func (workflowRequest *WorkflowRequest) IsCiRetriggerType() bool {
175+
return workflowRequest.IsReTrigger
176+
}
177+
167178
func (workflowRequest *WorkflowRequest) updateExternalRunMetadata() {
168179
pipeline := workflowRequest.Pipeline
169180
env := workflowRequest.Env

wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)