Skip to content

Commit 699beab

Browse files
authored
Merge pull request #34 from hbelmiro/RHOAIENG-4252
<UPSTREAM>: <carry>: Added OwnerReferences to ScheduledWorkflow
2 parents 95cfc15 + 7bb3148 commit 699beab

File tree

5 files changed

+38
-8
lines changed

5 files changed

+38
-8
lines changed

backend/src/apiserver/resource/resource_manager.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
967967

968968
// TODO(gkcalat): consider changing the flow. Other resource UUIDs are assigned by their respective stores (DB).
969969
// Convert modelJob into scheduledWorkflow.
970-
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job)
970+
scheduledWorkflow, err := tmpl.ScheduledWorkflow(job, r.getOwnerReferences())
971971
if err != nil {
972972
return nil, util.Wrap(err, "Failed to create a recurring run during scheduled workflow creation")
973973
}
@@ -1012,6 +1012,27 @@ func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model
10121012
return r.jobStore.CreateJob(job)
10131013
}
10141014

1015+
func (r *ResourceManager) getOwnerReferences() []v1.OwnerReference {
1016+
ownerName := common.GetStringConfigWithDefault("OWNER_NAME", "")
1017+
ownerAPIVersion := common.GetStringConfigWithDefault("OWNER_API_VERSION", "")
1018+
ownerKind := common.GetStringConfigWithDefault("OWNER_KIND", "")
1019+
ownerUID := types.UID(common.GetStringConfigWithDefault("OWNER_UID", ""))
1020+
1021+
if ownerName == "" || ownerAPIVersion == "" || ownerKind == "" || ownerUID == "" {
1022+
glog.Info("Missing ScheduledWorkflow owner fields. Proceeding without OwnerReferences")
1023+
return []v1.OwnerReference{}
1024+
} else {
1025+
return []v1.OwnerReference{
1026+
{
1027+
APIVersion: ownerAPIVersion,
1028+
Kind: ownerKind,
1029+
Name: ownerName,
1030+
UID: ownerUID,
1031+
},
1032+
}
1033+
}
1034+
}
1035+
10151036
// Enables or disables a recurring run with given id.
10161037
func (r *ResourceManager) ChangeJobMode(ctx context.Context, jobId string, enable bool) error {
10171038
job, err := r.GetJob(jobId)

backend/src/apiserver/template/argo_template.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ type Argo struct {
9494
wf *util.Workflow
9595
}
9696

97-
func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
97+
func (t *Argo) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) {
9898
workflow := util.NewWorkflow(t.wf.Workflow.DeepCopy())
9999
// Overwrite namespace from the job object
100100
if modelJob.Namespace != "" {
@@ -137,7 +137,10 @@ func (t *Argo) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Schedu
137137
APIVersion: "kubeflow.org/v1beta1",
138138
Kind: "ScheduledWorkflow",
139139
},
140-
ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName},
140+
ObjectMeta: metav1.ObjectMeta{
141+
GenerateName: swfGeneratedName,
142+
OwnerReferences: ownerReferences,
143+
},
141144
Spec: scheduledworkflow.ScheduledWorkflowSpec{
142145
Enabled: modelJob.Enabled,
143146
MaxConcurrency: &modelJob.MaxConcurrency,

backend/src/apiserver/template/template.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ type Template interface {
128128
// Get workflow
129129
RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (util.ExecutionSpec, error)
130130

131-
ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error)
131+
ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error)
132132
}
133133

134134
type RunWorkflowOptions struct {

backend/src/apiserver/template/template_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,10 @@ func TestScheduledWorkflow(t *testing.T) {
202202
APIVersion: "kubeflow.org/v2beta1",
203203
Kind: "ScheduledWorkflow",
204204
},
205-
ObjectMeta: metav1.ObjectMeta{GenerateName: "name1"},
205+
ObjectMeta: metav1.ObjectMeta{
206+
GenerateName: "name1",
207+
OwnerReferences: []metav1.OwnerReference{},
208+
},
206209
Spec: scheduledworkflow.ScheduledWorkflowSpec{
207210
Enabled: true,
208211
MaxConcurrency: util.Int64Pointer(1),
@@ -221,7 +224,7 @@ func TestScheduledWorkflow(t *testing.T) {
221224
},
222225
}
223226

224-
actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob)
227+
actualScheduledWorkflow, err := v2Template.ScheduledWorkflow(modelJob, []metav1.OwnerReference{})
225228
assert.Nil(t, err)
226229

227230
// We don't compare this field because it changes with every driver/launcher image release.

backend/src/apiserver/template/v2_template.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type V2Spec struct {
4141
}
4242

4343
// Converts modelJob to ScheduledWorkflow.
44-
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.ScheduledWorkflow, error) {
44+
func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job, ownerReferences []metav1.OwnerReference) (*scheduledworkflow.ScheduledWorkflow, error) {
4545
job := &pipelinespec.PipelineJob{}
4646

4747
bytes, err := protojson.Marshal(t.spec)
@@ -108,7 +108,10 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche
108108
APIVersion: "kubeflow.org/v2beta1",
109109
Kind: "ScheduledWorkflow",
110110
},
111-
ObjectMeta: metav1.ObjectMeta{GenerateName: swfGeneratedName},
111+
ObjectMeta: metav1.ObjectMeta{
112+
GenerateName: swfGeneratedName,
113+
OwnerReferences: ownerReferences,
114+
},
112115
Spec: scheduledworkflow.ScheduledWorkflowSpec{
113116
Enabled: modelJob.Enabled,
114117
MaxConcurrency: &modelJob.MaxConcurrency,

0 commit comments

Comments
 (0)