Skip to content

Commit ab85e6e

Browse files
hbelmiromprahl
authored andcommitted
Revert "UPSTREAM: 11469: fix(backend): Synced ScheduledWorkflow CRs on apiserver startup"
This reverts commit 8ab8dae. (cherry picked from commit cf1df65)
1 parent 88be75a commit ab85e6e

File tree

6 files changed

+7
-145
lines changed

6 files changed

+7
-145
lines changed

backend/src/apiserver/client/scheduled_workflow_fake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti
6666
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name)
6767
}
6868

69-
func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70-
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
71-
return scheduledWorkflow, nil
69+
func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70+
glog.Error("This fake method is not yet implemented.")
71+
return nil, nil
7272
}
7373

7474
func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error {

backend/src/apiserver/list/list.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"encoding/base64"
2323
"encoding/json"
2424
"fmt"
25-
"math"
2625
"reflect"
2726
"strings"
2827

@@ -98,13 +97,6 @@ type Options struct {
9897
*token
9998
}
10099

101-
func EmptyOptions() *Options {
102-
return &Options{
103-
math.MaxInt32,
104-
&token{},
105-
}
106-
}
107-
108100
// Matches returns trues if the sorting and filtering criteria in o matches that
109101
// of the one supplied in opts.
110102
func (o *Options) Matches(opts *Options) bool {
@@ -221,14 +213,9 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
221213
if o.IsDesc {
222214
order = "DESC"
223215
}
224-
225-
if o.SortByFieldName != "" {
226-
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
227-
}
228-
229-
if o.KeyFieldName != "" {
230-
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
231-
}
216+
sqlBuilder = sqlBuilder.
217+
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
218+
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
232219

233220
return sqlBuilder
234221
}

backend/src/apiserver/list/list_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
package list
1616

1717
import (
18-
"fmt"
19-
"math"
2018
"reflect"
2119
"strings"
2220
"testing"
@@ -647,11 +645,6 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
647645
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
648646
wantArgs: nil,
649647
},
650-
{
651-
in: EmptyOptions(),
652-
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
653-
wantArgs: nil,
654-
},
655648
{
656649
in: &Options{
657650
PageSize: 123,

backend/src/apiserver/main.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"os"
2929
"strconv"
3030
"strings"
31-
"sync"
3231
"time"
3332

3433
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
@@ -138,25 +137,10 @@ func main() {
138137
}
139138
log.SetLevel(level)
140139

141-
backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
142-
defer backgroundCancel()
143-
wg := sync.WaitGroup{}
144-
wg.Add(1)
145-
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
146140
go startRpcServer(resourceManager, tlsConfig)
147-
// This is blocking
148141
startHttpProxy(resourceManager, tlsConfig)
149-
backgroundCancel()
150-
clientManager.Close()
151-
wg.Wait()
152-
}
153142

154-
func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
155-
defer wg.Done()
156-
err := resourceManager.ReconcileSwfCrs(ctx)
157-
if err != nil {
158-
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
159-
}
143+
clientManager.Close()
160144
}
161145

162146
// A custom http request header matcher to pass on the user identity

backend/src/apiserver/resource/resource_manager.go

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ import (
1818
"context"
1919
"encoding/json"
2020
"fmt"
21-
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
2221
"io"
2322
"net"
24-
"reflect"
2523
"strconv"
2624
"time"
2725

@@ -584,77 +582,6 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
584582
return newRun, nil
585583
}
586584

587-
// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
588-
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
589-
filterContext := &model.FilterContext{
590-
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
591-
}
592-
593-
opts := list.EmptyOptions()
594-
595-
jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)
596-
597-
if err != nil {
598-
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
599-
}
600-
601-
for i := range jobs {
602-
select {
603-
case <-ctx.Done():
604-
return nil
605-
default:
606-
}
607-
608-
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
609-
if err != nil {
610-
return failedToReconcileSwfCrsError(err)
611-
}
612-
613-
newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i], r.getOwnerReferences())
614-
if err != nil {
615-
return failedToReconcileSwfCrsError(err)
616-
}
617-
618-
for {
619-
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
620-
if err != nil {
621-
if util.IsNotFound(err) {
622-
break
623-
}
624-
return failedToReconcileSwfCrsError(err)
625-
}
626-
627-
if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
628-
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
629-
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
630-
if err != nil {
631-
if apierrors.IsConflict(errors.Unwrap(err)) {
632-
continue
633-
} else if util.IsNotFound(errors.Cause(err)) {
634-
break
635-
}
636-
return failedToReconcileSwfCrsError(err)
637-
}
638-
}
639-
break
640-
}
641-
}
642-
643-
return nil
644-
}
645-
646-
func failedToReconcileSwfCrsError(err error) error {
647-
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
648-
}
649-
650-
func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
651-
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
652-
if err != nil {
653-
return util.Wrap(err, "Failed to update ScheduledWorkflow")
654-
}
655-
return nil
656-
}
657-
658585
// Fetches a run with a given id.
659586
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
660587
run, err := r.runStore.GetRun(runId)

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3200,35 +3200,6 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.
32003200
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
32013201
}
32023202

3203-
func TestReconcileSwfCrs(t *testing.T) {
3204-
store, manager, job := initWithJobV2(t)
3205-
defer store.Close()
3206-
3207-
fetchedJob, err := manager.GetJob(job.UUID)
3208-
require.Nil(t, err)
3209-
require.NotNil(t, fetchedJob)
3210-
3211-
swfClient := store.SwfClient().ScheduledWorkflow("ns1")
3212-
3213-
options := v1.GetOptions{}
3214-
ctx := context.Background()
3215-
3216-
swf, err := swfClient.Get(ctx, "job-", options)
3217-
require.Nil(t, err)
3218-
3219-
// emulates an invalid/outdated spec
3220-
swf.Spec.Workflow.Spec = nil
3221-
swf, err = swfClient.Update(ctx, swf)
3222-
require.Nil(t, swf.Spec.Workflow.Spec)
3223-
3224-
err = manager.ReconcileSwfCrs(ctx)
3225-
require.Nil(t, err)
3226-
3227-
swf, err = swfClient.Get(ctx, "job-", options)
3228-
require.Nil(t, err)
3229-
require.NotNil(t, swf.Spec.Workflow.Spec)
3230-
}
3231-
32323203
func TestReportScheduledWorkflowResource_Error(t *testing.T) {
32333204
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
32343205
defer store.Close()

0 commit comments

Comments
 (0)