Skip to content

Commit fe8ad90

Browse files
committed
bootstrap flow-control objects
typo wrap bootstrap-creation-flow w/ wait.PollUtil go wait
1 parent 141909f commit fe8ad90

File tree

7 files changed

+685
-2
lines changed

7 files changed

+685
-2
lines changed

pkg/registry/flowcontrol/rest/BUILD

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "go_default_library",
@@ -11,10 +11,16 @@ go_library(
1111
"//pkg/registry/flowcontrol/flowschema/storage:go_default_library",
1212
"//pkg/registry/flowcontrol/prioritylevelconfiguration/storage:go_default_library",
1313
"//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
16+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
17+
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
1418
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
1519
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
1620
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
1721
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
22+
"//staging/src/k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1:go_default_library",
23+
"//vendor/k8s.io/klog:go_default_library",
1824
],
1925
)
2026

@@ -31,3 +37,15 @@ filegroup(
3137
tags = ["automanaged"],
3238
visibility = ["//visibility:public"],
3339
)
40+
41+
go_test(
42+
name = "go_default_test",
43+
srcs = ["storage_flowcontrol_test.go"],
44+
embed = [":go_default_library"],
45+
deps = [
46+
"//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library",
47+
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library",
48+
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
49+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
50+
],
51+
)

pkg/registry/flowcontrol/rest/storage_flowcontrol.go

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,34 @@ limitations under the License.
1717
package rest
1818

1919
import (
20+
"fmt"
21+
"time"
22+
2023
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
24+
apierrors "k8s.io/apimachinery/pkg/api/errors"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/util/wait"
27+
flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
2128
"k8s.io/apiserver/pkg/registry/generic"
2229
"k8s.io/apiserver/pkg/registry/rest"
2330
genericapiserver "k8s.io/apiserver/pkg/server"
2431
serverstorage "k8s.io/apiserver/pkg/server/storage"
32+
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
33+
"k8s.io/klog"
2534
"k8s.io/kubernetes/pkg/api/legacyscheme"
2635
"k8s.io/kubernetes/pkg/apis/flowcontrol"
2736
flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
2837
prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
2938
)
3039

31-
// RESTStorageProvider implements
40+
var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
41+
42+
// RESTStorageProvider is a provider of REST storage
3243
type RESTStorageProvider struct{}
3344

45+
// PostStartHookName is the name of the post-start-hook provided by flow-control storage
46+
const PostStartHookName = "apiserver/bootstrap-system-flowcontrol-configuration"
47+
3448
// NewRESTStorage creates a new rest storage for flow-control api models.
3549
func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
3650
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(flowcontrol.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
@@ -71,3 +85,129 @@ func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource serverstora
7185
func (p RESTStorageProvider) GroupName() string {
7286
return flowcontrol.GroupName
7387
}
88+
89+
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
90+
return PostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
91+
flowcontrolClientSet := flowcontrolclient.NewForConfigOrDie(hookContext.LoopbackClientConfig)
92+
go func() {
93+
const retryCreatingSuggestedSettingsInterval = time.Second
94+
_ = wait.PollImmediateUntil(
95+
retryCreatingSuggestedSettingsInterval,
96+
func() (bool, error) {
97+
shouldEnsureSuggested, err := shouldEnsureAllPredefined(flowcontrolClientSet)
98+
if err != nil {
99+
klog.Errorf("failed getting exempt flow-schema, will retry later: %v", err)
100+
return false, nil
101+
}
102+
if shouldEnsureSuggested {
103+
err := ensure(
104+
flowcontrolClientSet,
105+
flowcontrolbootstrap.SuggestedFlowSchemas,
106+
flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
107+
if err != nil {
108+
klog.Errorf("failed ensuring suggested settings, will retry later: %v", err)
109+
return false, nil
110+
}
111+
}
112+
return true, nil
113+
},
114+
hookContext.StopCh)
115+
const retryCreatingMandatorySettingsInterval = time.Minute
116+
_ = wait.PollImmediateUntil(
117+
retryCreatingMandatorySettingsInterval,
118+
func() (bool, error) {
119+
if err := upgrade(
120+
flowcontrolClientSet,
121+
flowcontrolbootstrap.MandatoryFlowSchemas,
122+
// Note: the "exempt" priority-level is supposed tobe the last item in the pre-defined
123+
// list, so that a crash in the midst of the first kube-apiserver startup does not prevent
124+
// the full initial set of objects from being created.
125+
flowcontrolbootstrap.MandatoryPriorityLevelConfigurations,
126+
); err != nil {
127+
klog.Errorf("failed creating default flowcontrol settings: %v", err)
128+
return false, nil
129+
}
130+
return false, nil // always retry
131+
},
132+
hookContext.StopCh)
133+
}()
134+
return nil
135+
}, nil
136+
137+
}
138+
139+
// Returns false if there's a "exempt" priority-level existing in the cluster, otherwise returns a true
140+
// if the "exempt" priority-level is not found.
141+
func shouldEnsureAllPredefined(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface) (bool, error) {
142+
if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil {
143+
if apierrors.IsNotFound(err) {
144+
return true, nil
145+
}
146+
return false, err
147+
}
148+
return false, nil
149+
}
150+
151+
func ensure(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
152+
for _, flowSchema := range flowSchemas {
153+
_, err := flowcontrolClientSet.FlowSchemas().Create(flowSchema)
154+
if apierrors.IsAlreadyExists(err) {
155+
klog.V(3).Infof("system preset FlowSchema %s already exists, skipping creating", flowSchema.Name)
156+
continue
157+
}
158+
if err != nil {
159+
return fmt.Errorf("cannot create FlowSchema %s due to %v", flowSchema.Name, err)
160+
}
161+
klog.V(3).Infof("created system preset FlowSchema %s", flowSchema.Name)
162+
}
163+
for _, priorityLevelConfiguration := range priorityLevels {
164+
_, err := flowcontrolClientSet.PriorityLevelConfigurations().Create(priorityLevelConfiguration)
165+
if apierrors.IsAlreadyExists(err) {
166+
klog.V(3).Infof("system preset PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name)
167+
continue
168+
}
169+
if err != nil {
170+
return fmt.Errorf("cannot create PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err)
171+
}
172+
klog.V(3).Infof("created system preset PriorityLevelConfiguration %s", priorityLevelConfiguration.Name)
173+
}
174+
return nil
175+
}
176+
177+
func upgrade(flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface, flowSchemas []*flowcontrolv1alpha1.FlowSchema, priorityLevels []*flowcontrolv1alpha1.PriorityLevelConfiguration) error {
178+
for _, flowSchema := range flowSchemas {
179+
_, err := flowcontrolClientSet.FlowSchemas().Get(flowSchema.Name, metav1.GetOptions{})
180+
if err != nil {
181+
return fmt.Errorf("failed getting FlowSchema %s due to %v, will retry later", flowSchema.Name, err)
182+
}
183+
// TODO(yue9944882): extract existing version from label and compare
184+
// TODO(yue9944882): create w/ version string attached
185+
_, err = flowcontrolClientSet.FlowSchemas().Create(flowSchema)
186+
if apierrors.IsAlreadyExists(err) {
187+
klog.V(3).Infof("system preset FlowSchema %s already exists, skipping creating", flowSchema.Name)
188+
continue
189+
}
190+
if err != nil {
191+
return fmt.Errorf("cannot create FlowSchema %s due to %v", flowSchema.Name, err)
192+
}
193+
klog.V(3).Infof("created system preset FlowSchema %s", flowSchema.Name)
194+
}
195+
for _, priorityLevelConfiguration := range priorityLevels {
196+
_, err := flowcontrolClientSet.FlowSchemas().Get(priorityLevelConfiguration.Name, metav1.GetOptions{})
197+
if err != nil {
198+
return fmt.Errorf("failed getting PriorityLevelConfiguration %s due to %v, will retry later", priorityLevelConfiguration.Name, err)
199+
}
200+
// TODO(yue9944882): extract existing version from label and compare
201+
// TODO(yue9944882): create w/ version string attached
202+
_, err = flowcontrolClientSet.PriorityLevelConfigurations().Create(priorityLevelConfiguration)
203+
if apierrors.IsAlreadyExists(err) {
204+
klog.V(3).Infof("system preset PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name)
205+
continue
206+
}
207+
if err != nil {
208+
return fmt.Errorf("cannot create PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err)
209+
}
210+
klog.V(3).Infof("created system preset PriorityLevelConfiguration %s", priorityLevelConfiguration.Name)
211+
}
212+
return nil
213+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package rest
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
flowcontrol "k8s.io/api/flowcontrol/v1alpha1"
24+
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
25+
"k8s.io/client-go/kubernetes/fake"
26+
)
27+
28+
func TestShouldEnsurePredefinedSettings(t *testing.T) {
29+
testCases := []struct {
30+
name string
31+
existingPriorityLevel *flowcontrol.PriorityLevelConfiguration
32+
expected bool
33+
}{
34+
{
35+
name: "should ensure if exempt priority-level is absent",
36+
existingPriorityLevel: nil,
37+
expected: true,
38+
},
39+
{
40+
name: "should not ensure if exempt priority-level is present",
41+
existingPriorityLevel: bootstrap.MandatoryPriorityLevelConfigurationExempt,
42+
expected: false,
43+
},
44+
}
45+
46+
for _, testCase := range testCases {
47+
t.Run(testCase.name, func(t *testing.T) {
48+
c := fake.NewSimpleClientset()
49+
if testCase.existingPriorityLevel != nil {
50+
c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(testCase.existingPriorityLevel)
51+
}
52+
should, err := shouldEnsureAllPredefined(c.FlowcontrolV1alpha1())
53+
assert.NoError(t, err)
54+
assert.Equal(t, testCase.expected, should)
55+
})
56+
}
57+
}

staging/src/k8s.io/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ filegroup(
1515
"//staging/src/k8s.io/apiserver/pkg/apis/config:all-srcs",
1616
"//staging/src/k8s.io/apiserver/pkg/apis/example:all-srcs",
1717
"//staging/src/k8s.io/apiserver/pkg/apis/example2:all-srcs",
18+
"//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:all-srcs",
1819
"//staging/src/k8s.io/apiserver/pkg/audit:all-srcs",
1920
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:all-srcs",
2021
"//staging/src/k8s.io/apiserver/pkg/authentication/authenticatorfactory:all-srcs",
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = ["default.go"],
6+
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap",
7+
importpath = "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap",
8+
visibility = ["//visibility:public"],
9+
deps = [
10+
"//staging/src/k8s.io/api/coordination/v1:go_default_library",
11+
"//staging/src/k8s.io/api/core/v1:go_default_library",
12+
"//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library",
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
15+
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
16+
],
17+
)
18+
19+
filegroup(
20+
name = "package-srcs",
21+
srcs = glob(["**"]),
22+
tags = ["automanaged"],
23+
visibility = ["//visibility:private"],
24+
)
25+
26+
filegroup(
27+
name = "all-srcs",
28+
srcs = [":package-srcs"],
29+
tags = ["automanaged"],
30+
visibility = ["//visibility:public"],
31+
)

0 commit comments

Comments
 (0)