Skip to content

Commit d9ec34e

Browse files
Merge pull request #124 from hbelmiro/cherry-pick-11469
UPSTREAM: 11469: fix(backend): Synced ScheduledWorkflow CRs on apiserver startup
2 parents f1b7367 + 8ab8dae commit d9ec34e

File tree

6 files changed

+145
-7
lines changed

6 files changed

+145
-7
lines changed

backend/src/apiserver/client/scheduled_workflow_fake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti
6666
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name)
6767
}
6868

69-
func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70-
glog.Error("This fake method is not yet implemented.")
71-
return nil, nil
69+
func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70+
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
71+
return scheduledWorkflow, nil
7272
}
7373

7474
func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error {

backend/src/apiserver/list/list.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/base64"
2323
"encoding/json"
2424
"fmt"
25+
"math"
2526
"reflect"
2627
"strings"
2728

@@ -97,6 +98,13 @@ type Options struct {
9798
*token
9899
}
99100

101+
func EmptyOptions() *Options {
102+
return &Options{
103+
math.MaxInt32,
104+
&token{},
105+
}
106+
}
107+
100108
// Matches returns trues if the sorting and filtering criteria in o matches that
101109
// of the one supplied in opts.
102110
func (o *Options) Matches(opts *Options) bool {
@@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
213221
if o.IsDesc {
214222
order = "DESC"
215223
}
216-
sqlBuilder = sqlBuilder.
217-
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
218-
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
224+
225+
if o.SortByFieldName != "" {
226+
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
227+
}
228+
229+
if o.KeyFieldName != "" {
230+
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
231+
}
219232

220233
return sqlBuilder
221234
}

backend/src/apiserver/list/list_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package list
1616

1717
import (
18+
"fmt"
19+
"math"
1820
"reflect"
1921
"strings"
2022
"testing"
@@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
645647
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
646648
wantArgs: nil,
647649
},
650+
{
651+
in: EmptyOptions(),
652+
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
653+
wantArgs: nil,
654+
},
648655
{
649656
in: &Options{
650657
PageSize: 123,

backend/src/apiserver/main.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"os"
2929
"strconv"
3030
"strings"
31+
"sync"
3132
"time"
3233

3334
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
@@ -137,10 +138,25 @@ func main() {
137138
}
138139
log.SetLevel(level)
139140

141+
backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
142+
defer backgroundCancel()
143+
wg := sync.WaitGroup{}
144+
wg.Add(1)
145+
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
140146
go startRpcServer(resourceManager, tlsConfig)
147+
// This is blocking
141148
startHttpProxy(resourceManager, tlsConfig)
142-
149+
backgroundCancel()
143150
clientManager.Close()
151+
wg.Wait()
152+
}
153+
154+
func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
155+
defer wg.Done()
156+
err := resourceManager.ReconcileSwfCrs(ctx)
157+
if err != nil {
158+
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
159+
}
144160
}
145161

146162
// A custom http request header matcher to pass on the user identity

backend/src/apiserver/resource/resource_manager.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import (
1818
"context"
1919
"encoding/json"
2020
"fmt"
21+
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
2122
"io"
2223
"net"
24+
"reflect"
2325
"strconv"
2426
"time"
2527

@@ -582,6 +584,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
582584
return newRun, nil
583585
}
584586

587+
// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
588+
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
589+
filterContext := &model.FilterContext{
590+
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
591+
}
592+
593+
opts := list.EmptyOptions()
594+
595+
jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)
596+
597+
if err != nil {
598+
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
599+
}
600+
601+
for i := range jobs {
602+
select {
603+
case <-ctx.Done():
604+
return nil
605+
default:
606+
}
607+
608+
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
609+
if err != nil {
610+
return failedToReconcileSwfCrsError(err)
611+
}
612+
613+
newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i], r.getOwnerReferences())
614+
if err != nil {
615+
return failedToReconcileSwfCrsError(err)
616+
}
617+
618+
for {
619+
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
620+
if err != nil {
621+
if util.IsNotFound(err) {
622+
break
623+
}
624+
return failedToReconcileSwfCrsError(err)
625+
}
626+
627+
if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
628+
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
629+
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
630+
if err != nil {
631+
if apierrors.IsConflict(errors.Unwrap(err)) {
632+
continue
633+
} else if util.IsNotFound(errors.Cause(err)) {
634+
break
635+
}
636+
return failedToReconcileSwfCrsError(err)
637+
}
638+
}
639+
break
640+
}
641+
}
642+
643+
return nil
644+
}
645+
646+
func failedToReconcileSwfCrsError(err error) error {
647+
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
648+
}
649+
650+
func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
651+
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
652+
if err != nil {
653+
return util.Wrap(err, "Failed to update ScheduledWorkflow")
654+
}
655+
return nil
656+
}
657+
585658
// Fetches a run with a given id.
586659
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
587660
run, err := r.runStore.GetRun(runId)

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3200,6 +3200,35 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.
32003200
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
32013201
}
32023202

3203+
func TestReconcileSwfCrs(t *testing.T) {
3204+
store, manager, job := initWithJobV2(t)
3205+
defer store.Close()
3206+
3207+
fetchedJob, err := manager.GetJob(job.UUID)
3208+
require.Nil(t, err)
3209+
require.NotNil(t, fetchedJob)
3210+
3211+
swfClient := store.SwfClient().ScheduledWorkflow("ns1")
3212+
3213+
options := v1.GetOptions{}
3214+
ctx := context.Background()
3215+
3216+
swf, err := swfClient.Get(ctx, "job-", options)
3217+
require.Nil(t, err)
3218+
3219+
// emulates an invalid/outdated spec
3220+
swf.Spec.Workflow.Spec = nil
3221+
swf, err = swfClient.Update(ctx, swf)
3222+
require.Nil(t, swf.Spec.Workflow.Spec)
3223+
3224+
err = manager.ReconcileSwfCrs(ctx)
3225+
require.Nil(t, err)
3226+
3227+
swf, err = swfClient.Get(ctx, "job-", options)
3228+
require.Nil(t, err)
3229+
require.NotNil(t, swf.Spec.Workflow.Spec)
3230+
}
3231+
32033232
func TestReportScheduledWorkflowResource_Error(t *testing.T) {
32043233
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
32053234
defer store.Close()

0 commit comments

Comments
 (0)