@@ -17,20 +17,36 @@ limitations under the License.
17
17
package rest
18
18
19
19
import (
20
+ "fmt"
21
+ "time"
22
+
20
23
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
24
+ "k8s.io/apimachinery/pkg/api/equality"
25
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
26
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27
+ "k8s.io/apimachinery/pkg/util/wait"
28
+ flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
21
29
"k8s.io/apiserver/pkg/registry/generic"
22
30
"k8s.io/apiserver/pkg/registry/rest"
23
31
genericapiserver "k8s.io/apiserver/pkg/server"
24
32
serverstorage "k8s.io/apiserver/pkg/server/storage"
33
+ flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
34
+ "k8s.io/klog"
25
35
"k8s.io/kubernetes/pkg/api/legacyscheme"
26
36
"k8s.io/kubernetes/pkg/apis/flowcontrol"
37
+ flowcontrolapisv1alpha1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1alpha1"
27
38
flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
28
39
prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
29
40
)
30
41
31
- // RESTStorageProvider implements
42
+ var _ genericapiserver.PostStartHookProvider = RESTStorageProvider {}
43
+
44
+ // RESTStorageProvider is a provider of REST storage
32
45
type RESTStorageProvider struct {}
33
46
47
+ // PostStartHookName is the name of the post-start-hook provided by flow-control storage
48
+ const PostStartHookName = "apiserver/bootstrap-system-flowcontrol-configuration"
49
+
34
50
// NewRESTStorage creates a new rest storage for flow-control api models.
35
51
func (p RESTStorageProvider ) NewRESTStorage (apiResourceConfigSource serverstorage.APIResourceConfigSource , restOptionsGetter generic.RESTOptionsGetter ) (genericapiserver.APIGroupInfo , bool , error ) {
36
52
apiGroupInfo := genericapiserver .NewDefaultAPIGroupInfo (flowcontrol .GroupName , legacyscheme .Scheme , legacyscheme .ParameterCodec , legacyscheme .Codecs )
@@ -71,3 +87,166 @@ func (p RESTStorageProvider) v1alpha1Storage(apiResourceConfigSource serverstora
71
87
func (p RESTStorageProvider ) GroupName () string {
72
88
return flowcontrol .GroupName
73
89
}
90
+
91
+ func (p RESTStorageProvider ) PostStartHook () (string , genericapiserver.PostStartHookFunc , error ) {
92
+ return PostStartHookName , func (hookContext genericapiserver.PostStartHookContext ) error {
93
+ flowcontrolClientSet := flowcontrolclient .NewForConfigOrDie (hookContext .LoopbackClientConfig )
94
+ go func () {
95
+ const retryCreatingSuggestedSettingsInterval = time .Second
96
+ _ = wait .PollImmediateUntil (
97
+ retryCreatingSuggestedSettingsInterval ,
98
+ func () (bool , error ) {
99
+ shouldEnsureSuggested , err := lastMandatoryExists (flowcontrolClientSet )
100
+ if err != nil {
101
+ klog .Errorf ("failed getting exempt flow-schema, will retry later: %v" , err )
102
+ return false , nil
103
+ }
104
+ if ! shouldEnsureSuggested {
105
+ return true , nil
106
+ }
107
+ err = ensure (
108
+ flowcontrolClientSet ,
109
+ flowcontrolbootstrap .SuggestedFlowSchemas ,
110
+ flowcontrolbootstrap .SuggestedPriorityLevelConfigurations )
111
+ if err != nil {
112
+ klog .Errorf ("failed ensuring suggested settings, will retry later: %v" , err )
113
+ return false , nil
114
+ }
115
+ return true , nil
116
+ },
117
+ hookContext .StopCh )
118
+ const retryCreatingMandatorySettingsInterval = time .Minute
119
+ _ = wait .PollImmediateUntil (
120
+ retryCreatingMandatorySettingsInterval ,
121
+ func () (bool , error ) {
122
+ if err := upgrade (
123
+ flowcontrolClientSet ,
124
+ flowcontrolbootstrap .MandatoryFlowSchemas ,
125
+ // Note: the "exempt" priority-level is supposed tobe the last item in the pre-defined
126
+ // list, so that a crash in the midst of the first kube-apiserver startup does not prevent
127
+ // the full initial set of objects from being created.
128
+ flowcontrolbootstrap .MandatoryPriorityLevelConfigurations ,
129
+ ); err != nil {
130
+ klog .Errorf ("failed creating mandatory flowcontrol settings: %v" , err )
131
+ return false , nil
132
+ }
133
+ return false , nil // always retry
134
+ },
135
+ hookContext .StopCh )
136
+ }()
137
+ return nil
138
+ }, nil
139
+
140
+ }
141
+
142
+ // Returns false if there's a "exempt" priority-level existing in the cluster, otherwise returns a true
143
+ // if the "exempt" priority-level is not found.
144
+ func lastMandatoryExists (flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface ) (bool , error ) {
145
+ if _ , err := flowcontrolClientSet .PriorityLevelConfigurations ().Get (flowcontrol .PriorityLevelConfigurationNameExempt , metav1.GetOptions {}); err != nil {
146
+ if apierrors .IsNotFound (err ) {
147
+ return true , nil
148
+ }
149
+ return false , err
150
+ }
151
+ return false , nil
152
+ }
153
+
154
+ func ensure (flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface , flowSchemas []* flowcontrolv1alpha1.FlowSchema , priorityLevels []* flowcontrolv1alpha1.PriorityLevelConfiguration ) error {
155
+ for _ , flowSchema := range flowSchemas {
156
+ _ , err := flowcontrolClientSet .FlowSchemas ().Create (flowSchema )
157
+ if apierrors .IsAlreadyExists (err ) {
158
+ klog .V (3 ).Infof ("system preset FlowSchema %s already exists, skipping creating" , flowSchema .Name )
159
+ continue
160
+ }
161
+ if err != nil {
162
+ return fmt .Errorf ("cannot create FlowSchema %s due to %v" , flowSchema .Name , err )
163
+ }
164
+ klog .V (3 ).Infof ("created system preset FlowSchema %s" , flowSchema .Name )
165
+ }
166
+ for _ , priorityLevelConfiguration := range priorityLevels {
167
+ _ , err := flowcontrolClientSet .PriorityLevelConfigurations ().Create (priorityLevelConfiguration )
168
+ if apierrors .IsAlreadyExists (err ) {
169
+ klog .V (3 ).Infof ("system preset PriorityLevelConfiguration %s already exists, skipping creating" , priorityLevelConfiguration .Name )
170
+ continue
171
+ }
172
+ if err != nil {
173
+ return fmt .Errorf ("cannot create PriorityLevelConfiguration %s due to %v" , priorityLevelConfiguration .Name , err )
174
+ }
175
+ klog .V (3 ).Infof ("created system preset PriorityLevelConfiguration %s" , priorityLevelConfiguration .Name )
176
+ }
177
+ return nil
178
+ }
179
+
180
+ func upgrade (flowcontrolClientSet flowcontrolclient.FlowcontrolV1alpha1Interface , flowSchemas []* flowcontrolv1alpha1.FlowSchema , priorityLevels []* flowcontrolv1alpha1.PriorityLevelConfiguration ) error {
181
+ for _ , expectedFlowSchema := range flowSchemas {
182
+ actualFlowSchema , err := flowcontrolClientSet .FlowSchemas ().Get (expectedFlowSchema .Name , metav1.GetOptions {})
183
+ if err == nil {
184
+ // TODO(yue9944882): extract existing version from label and compare
185
+ // TODO(yue9944882): create w/ version string attached
186
+ identical , err := flowSchemaHasWrongSpec (expectedFlowSchema , actualFlowSchema )
187
+ if err != nil {
188
+ return fmt .Errorf ("failed checking if mandatory FlowSchema %s is up-to-date due to %v, will retry later" , expectedFlowSchema .Name , err )
189
+ }
190
+ if ! identical {
191
+ if _ , err := flowcontrolClientSet .FlowSchemas ().Update (expectedFlowSchema ); err != nil {
192
+ return fmt .Errorf ("failed upgrading mandatory FlowSchema %s due to %v, will retry later" , expectedFlowSchema .Name , err )
193
+ }
194
+ }
195
+ continue
196
+ }
197
+ if ! apierrors .IsNotFound (err ) {
198
+ return fmt .Errorf ("failed getting FlowSchema %s due to %v, will retry later" , expectedFlowSchema .Name , err )
199
+ }
200
+ _ , err = flowcontrolClientSet .FlowSchemas ().Create (expectedFlowSchema )
201
+ if apierrors .IsAlreadyExists (err ) {
202
+ klog .V (3 ).Infof ("system preset FlowSchema %s already exists, skipping creating" , expectedFlowSchema .Name )
203
+ continue
204
+ }
205
+ if err != nil {
206
+ return fmt .Errorf ("cannot create FlowSchema %s due to %v" , expectedFlowSchema .Name , err )
207
+ }
208
+ klog .V (3 ).Infof ("created system preset FlowSchema %s" , expectedFlowSchema .Name )
209
+ }
210
+ for _ , expectedPriorityLevelConfiguration := range priorityLevels {
211
+ actualPriorityLevelConfiguration , err := flowcontrolClientSet .PriorityLevelConfigurations ().Get (expectedPriorityLevelConfiguration .Name , metav1.GetOptions {})
212
+ if err == nil {
213
+ // TODO(yue9944882): extract existing version from label and compare
214
+ // TODO(yue9944882): create w/ version string attached
215
+ identical , err := priorityLevelHasWrongSpec (expectedPriorityLevelConfiguration , actualPriorityLevelConfiguration )
216
+ if err != nil {
217
+ return fmt .Errorf ("failed checking if mandatory PriorityLevelConfiguration %s is up-to-date due to %v, will retry later" , expectedPriorityLevelConfiguration .Name , err )
218
+ }
219
+ if ! identical {
220
+ if _ , err := flowcontrolClientSet .PriorityLevelConfigurations ().Update (expectedPriorityLevelConfiguration ); err != nil {
221
+ return fmt .Errorf ("failed upgrading mandatory PriorityLevelConfiguration %s due to %v, will retry later" , expectedPriorityLevelConfiguration .Name , err )
222
+ }
223
+ }
224
+ continue
225
+ }
226
+ if ! apierrors .IsNotFound (err ) {
227
+ return fmt .Errorf ("failed getting PriorityLevelConfiguration %s due to %v, will retry later" , expectedPriorityLevelConfiguration .Name , err )
228
+ }
229
+ _ , err = flowcontrolClientSet .PriorityLevelConfigurations ().Create (expectedPriorityLevelConfiguration )
230
+ if apierrors .IsAlreadyExists (err ) {
231
+ klog .V (3 ).Infof ("system preset PriorityLevelConfiguration %s already exists, skipping creating" , expectedPriorityLevelConfiguration .Name )
232
+ continue
233
+ }
234
+ if err != nil {
235
+ return fmt .Errorf ("cannot create PriorityLevelConfiguration %s due to %v" , expectedPriorityLevelConfiguration .Name , err )
236
+ }
237
+ klog .V (3 ).Infof ("created system preset PriorityLevelConfiguration %s" , expectedPriorityLevelConfiguration .Name )
238
+ }
239
+ return nil
240
+ }
241
+
242
+ func flowSchemaHasWrongSpec (expected , actual * flowcontrolv1alpha1.FlowSchema ) (bool , error ) {
243
+ copiedExpectedFlowSchema := expected .DeepCopy ()
244
+ flowcontrolapisv1alpha1 .SetObjectDefaults_FlowSchema (copiedExpectedFlowSchema )
245
+ return ! equality .Semantic .DeepEqual (copiedExpectedFlowSchema .Spec , actual .Spec ), nil
246
+ }
247
+
248
+ func priorityLevelHasWrongSpec (expected , actual * flowcontrolv1alpha1.PriorityLevelConfiguration ) (bool , error ) {
249
+ copiedExpectedPriorityLevel := expected .DeepCopy ()
250
+ flowcontrolapisv1alpha1 .SetObjectDefaults_PriorityLevelConfiguration (copiedExpectedPriorityLevel )
251
+ return ! equality .Semantic .DeepEqual (copiedExpectedPriorityLevel .Spec , actual .Spec ), nil
252
+ }
0 commit comments