Skip to content

Commit 79fee52

Browse files
authored
Merge pull request kubernetes#125483 from wojtek-t/storage_readiness_hook
Implement resilient watchcache initialization post-start-hook
2 parents 688c1c3 + a5772bd commit 79fee52

File tree

18 files changed

+319
-31
lines changed

18 files changed

+319
-31
lines changed

cmd/kube-apiserver/app/options/options_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,17 @@ func TestAddFlags(t *testing.T) {
139139
expected := &ServerRunOptions{
140140
Options: &controlplaneapiserver.Options{
141141
GenericServerRunOptions: &apiserveroptions.ServerRunOptions{
142-
AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"),
143-
CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"},
144-
MaxRequestsInFlight: 400,
145-
MaxMutatingRequestsInFlight: 200,
146-
RequestTimeout: time.Duration(2) * time.Minute,
147-
MinRequestTimeout: 1800,
148-
JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
149-
MaxRequestBodyBytes: int64(3 * 1024 * 1024),
150-
ComponentGlobalsRegistry: componentGlobalsRegistry,
151-
ComponentName: utilversion.DefaultKubeComponent,
142+
AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"),
143+
CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"},
144+
MaxRequestsInFlight: 400,
145+
MaxMutatingRequestsInFlight: 200,
146+
RequestTimeout: time.Duration(2) * time.Minute,
147+
MinRequestTimeout: 1800,
148+
StorageInitializationTimeout: time.Minute,
149+
JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
150+
MaxRequestBodyBytes: int64(3 * 1024 * 1024),
151+
ComponentGlobalsRegistry: componentGlobalsRegistry,
152+
ComponentName: utilversion.DefaultKubeComponent,
152153
},
153154
Admission: &kubeoptions.AdmissionOptions{
154155
GenericAdmission: &apiserveroptions.AdmissionOptions{

pkg/controlplane/apiserver/options/options_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,17 @@ func TestAddFlags(t *testing.T) {
123123
// This is a snapshot of expected options parsed by args.
124124
expected := &Options{
125125
GenericServerRunOptions: &apiserveroptions.ServerRunOptions{
126-
AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"),
127-
CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"},
128-
MaxRequestsInFlight: 400,
129-
MaxMutatingRequestsInFlight: 200,
130-
RequestTimeout: time.Duration(2) * time.Minute,
131-
MinRequestTimeout: 1800,
132-
JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
133-
MaxRequestBodyBytes: int64(3 * 1024 * 1024),
134-
ComponentGlobalsRegistry: componentGlobalsRegistry,
135-
ComponentName: utilversion.DefaultKubeComponent,
126+
AdvertiseAddress: netutils.ParseIPSloppy("192.168.10.10"),
127+
CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"},
128+
MaxRequestsInFlight: 400,
129+
MaxMutatingRequestsInFlight: 200,
130+
RequestTimeout: time.Duration(2) * time.Minute,
131+
MinRequestTimeout: 1800,
132+
StorageInitializationTimeout: time.Minute,
133+
JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
134+
MaxRequestBodyBytes: int64(3 * 1024 * 1024),
135+
ComponentGlobalsRegistry: componentGlobalsRegistry,
136+
ComponentName: utilversion.DefaultKubeComponent,
136137
},
137138
Admission: &kubeoptions.AdmissionOptions{
138139
GenericAdmission: &apiserveroptions.AdmissionOptions{

pkg/controlplane/apiserver/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
245245
})
246246
}
247247

248+
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) {
249+
s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook)
250+
}
251+
248252
s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
249253
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
250254
return nil

pkg/features/kube_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,6 +1275,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
12751275

12761276
genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
12771277

1278+
genericfeatures.WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta},
1279+
12781280
genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
12791281

12801282
genericfeatures.WatchList: {Default: true, PreRelease: featuregate.Beta},

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,12 @@ const (
290290
// Enables support for watch bookmark events.
291291
WatchBookmark featuregate.Feature = "WatchBookmark"
292292

293+
// owner: @wojtek-t
294+
// beta: v1.31
295+
//
296+
// Enables post-start-hook for storage readiness
297+
WatchCacheInitializationPostStartHook featuregate.Feature = "WatchCacheInitializationPostStartHook"
298+
293299
// owner: @serathius
294300
// beta: 1.30
295301
// Enables watches without resourceVersion to be served from storage.
@@ -408,6 +414,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
408414

409415
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
410416

417+
WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta},
418+
411419
WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
412420

413421
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ type Store struct {
226226
// storageVersionHash as empty in the discovery document.
227227
StorageVersioner runtime.GroupVersioner
228228

229+
// ReadinessCheckFunc checks if the storage is ready for accepting requests.
230+
// The field is optional, if set needs to be thread-safe.
231+
ReadinessCheckFunc func() error
232+
229233
// DestroyFunc cleans up clients used by the underlying Storage; optional.
230234
// If set, DestroyFunc has to be implemented in thread-safe way and
231235
// be prepared for being called more than once.
@@ -234,6 +238,7 @@ type Store struct {
234238

235239
// Note: the rest.StandardStorage interface aggregates the common REST verbs
236240
var _ rest.StandardStorage = &Store{}
241+
var _ rest.StorageWithReadiness = &Store{}
237242
var _ rest.TableConvertor = &Store{}
238243
var _ GenericStore = &Store{}
239244

@@ -292,6 +297,14 @@ func (e *Store) New() runtime.Object {
292297
return e.NewFunc()
293298
}
294299

300+
// ReadinessCheck checks if the storage is ready for accepting requests.
301+
func (e *Store) ReadinessCheck() error {
302+
if e.ReadinessCheckFunc != nil {
303+
return e.ReadinessCheckFunc()
304+
}
305+
return nil
306+
}
307+
295308
// Destroy cleans up its resources on shutdown.
296309
func (e *Store) Destroy() {
297310
if e.DestroyFunc != nil {
@@ -1614,6 +1627,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
16141627
}
16151628
}
16161629
}
1630+
if e.Storage.Storage != nil {
1631+
e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck
1632+
}
16171633

16181634
return nil
16191635
}

staging/src/k8s.io/apiserver/pkg/registry/rest/rest.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ import (
5252
// Storage is a generic interface for RESTful storage services.
5353
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
5454
// that objects may implement any of the below interfaces.
55+
//
56+
// Consider using StorageWithReadiness whenever possible.
5557
type Storage interface {
5658
// New returns an empty object that can be used with Create and Update after request data has been put into it.
5759
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
@@ -63,6 +65,14 @@ type Storage interface {
6365
Destroy()
6466
}
6567

68+
// StorageWithReadiness extends Storage interface with the readiness check.
69+
type StorageWithReadiness interface {
70+
Storage
71+
72+
// ReadinessCheck allows for checking storage readiness.
73+
ReadinessCheck() error
74+
}
75+
6676
// Scoper indicates what scope the resource is at. It must be specified.
6777
// It is usually provided automatically based on your strategy.
6878
type Scoper interface {

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ type Config struct {
216216
// twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
217217
MinRequestTimeout int
218218

219+
// StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization
220+
// before declaring apiserver ready.
221+
StorageInitializationTimeout time.Duration
222+
219223
// This represents the maximum amount of time it should take for apiserver to complete its startup
220224
// sequence and become healthy. From apiserver's start time to when this amount of time has
221225
// elapsed, /livez will assume that unfinished post-start hooks will complete successfully and
@@ -426,6 +430,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
426430
MaxMutatingRequestsInFlight: 200,
427431
RequestTimeout: time.Duration(60) * time.Second,
428432
MinRequestTimeout: 1800,
433+
StorageInitializationTimeout: time.Minute,
429434
LivezGracePeriod: time.Duration(0),
430435
ShutdownDelayDuration: time.Duration(0),
431436
// 1.5MB is the default client request size in bytes
@@ -824,6 +829,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
824829
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
825830

826831
APIServerID: c.APIServerID,
832+
StorageReadinessHook: NewStorageReadinessHook(c.StorageInitializationTimeout),
827833
StorageVersionManager: c.StorageVersionManager,
828834

829835
EffectiveVersion: c.EffectiveVersion,

staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,10 @@ type GenericAPIServer struct {
233233
// APIServerID is the ID of this API server
234234
APIServerID string
235235

236+
// StorageReadinessHook implements post-start-hook functionality for checking readiness
237+
// of underlying storage for registered resources.
238+
StorageReadinessHook *StorageReadinessHook
239+
236240
// StorageVersionManager holds the storage versions of the API resources installed by this server.
237241
StorageVersionManager storageversion.Manager
238242

@@ -844,6 +848,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
844848
} else {
845849
s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
846850
}
851+
s.registerStorageReadinessCheck("", apiGroupInfo)
847852

848853
return nil
849854
}
@@ -902,10 +907,28 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro
902907

903908
s.DiscoveryGroupManager.AddGroup(apiGroup)
904909
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
910+
s.registerStorageReadinessCheck(apiGroupInfo.PrioritizedVersions[0].Group, apiGroupInfo)
905911
}
906912
return nil
907913
}
908914

915+
// registerStorageReadinessCheck registers the readiness checks for all underlying storages
916+
// for a given APIGroup.
917+
func (s *GenericAPIServer) registerStorageReadinessCheck(groupName string, apiGroupInfo *APIGroupInfo) {
918+
for version, storageMap := range apiGroupInfo.VersionedResourcesStorageMap {
919+
for resource, storage := range storageMap {
920+
if withReadiness, ok := storage.(rest.StorageWithReadiness); ok {
921+
gvr := metav1.GroupVersionResource{
922+
Group: groupName,
923+
Version: version,
924+
Resource: resource,
925+
}
926+
s.StorageReadinessHook.RegisterStorage(gvr, withReadiness)
927+
}
928+
}
929+
}
930+
}
931+
909932
// InstallAPIGroup exposes the given api group in the API.
910933
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
911934
// underlying storage will be destroyed on this servers shutdown.

staging/src/k8s.io/apiserver/pkg/server/options/server_run_options.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ const (
4747
type ServerRunOptions struct {
4848
AdvertiseAddress net.IP
4949

50-
CorsAllowedOriginList []string
51-
HSTSDirectives []string
52-
ExternalHost string
53-
MaxRequestsInFlight int
54-
MaxMutatingRequestsInFlight int
55-
RequestTimeout time.Duration
56-
GoawayChance float64
57-
LivezGracePeriod time.Duration
58-
MinRequestTimeout int
59-
ShutdownDelayDuration time.Duration
50+
CorsAllowedOriginList []string
51+
HSTSDirectives []string
52+
ExternalHost string
53+
MaxRequestsInFlight int
54+
MaxMutatingRequestsInFlight int
55+
RequestTimeout time.Duration
56+
GoawayChance float64
57+
LivezGracePeriod time.Duration
58+
MinRequestTimeout int
59+
StorageInitializationTimeout time.Duration
60+
ShutdownDelayDuration time.Duration
6061
// We intentionally did not add a flag for this option. Users of the
6162
// apiserver library can wire it to a flag.
6263
JSONPatchMaxCopyBytes int64
@@ -116,6 +117,7 @@ func NewServerRunOptionsForComponent(componentName string, componentGlobalsRegis
116117
RequestTimeout: defaults.RequestTimeout,
117118
LivezGracePeriod: defaults.LivezGracePeriod,
118119
MinRequestTimeout: defaults.MinRequestTimeout,
120+
StorageInitializationTimeout: defaults.StorageInitializationTimeout,
119121
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
120122
ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
121123
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
@@ -140,6 +142,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
140142
c.RequestTimeout = s.RequestTimeout
141143
c.GoawayChance = s.GoawayChance
142144
c.MinRequestTimeout = s.MinRequestTimeout
145+
c.StorageInitializationTimeout = s.StorageInitializationTimeout
143146
c.ShutdownDelayDuration = s.ShutdownDelayDuration
144147
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
145148
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
@@ -197,6 +200,10 @@ func (s *ServerRunOptions) Validate() []error {
197200
errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value"))
198201
}
199202

203+
if s.StorageInitializationTimeout < 0 {
204+
errors = append(errors, fmt.Errorf("--storage-initialization-timeout can not be negative value"))
205+
}
206+
200207
if s.ShutdownDelayDuration < 0 {
201208
errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
202209
}
@@ -350,6 +357,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
350357
"handler, which picks a randomized value above this number as the connection timeout, "+
351358
"to spread out load.")
352359

360+
fs.DurationVar(&s.StorageInitializationTimeout, "storage-initialization-timeout", s.StorageInitializationTimeout,
361+
"Maximum amount of time to wait for storage initialization before declaring apiserver ready. Defaults to 1m.")
362+
353363
fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+
354364
"Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+
355365
"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+

0 commit comments

Comments
 (0)