Skip to content

Commit c1a66a4

Browse files
authored
Merge pull request kubernetes#89145 from sttts/sttts-apiextensions-discovery-sync
apiextensions: wait for complete discovery endpoint
2 parents 1aa64b2 + 34f5737 commit c1a66a4

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
207207
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
208208
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
209209

210-
crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
210+
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
211211
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
212212
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
213213
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
@@ -231,12 +231,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
231231
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
232232
}
233233

234-
go crdController.Run(context.StopCh)
235234
go namingController.Run(context.StopCh)
236235
go establishingController.Run(context.StopCh)
237236
go nonStructuralSchemaController.Run(5, context.StopCh)
238237
go apiApprovalController.Run(5, context.StopCh)
239238
go finalizingController.Run(5, context.StopCh)
239+
240+
discoverySyncedCh := make(chan struct{})
241+
go discoveryController.Run(context.StopCh, discoverySyncedCh)
242+
select {
243+
case <-context.StopCh:
244+
case <-discoverySyncedCh:
245+
}
246+
240247
return nil
241248
})
242249
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery)
201201
})
202202
}
203203

204-
func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
204+
func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
205205
defer utilruntime.HandleCrash()
206206
defer c.queue.ShutDown()
207207
defer klog.Infof("Shutting down DiscoveryController")
@@ -213,6 +213,31 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
213213
return
214214
}
215215

216+
// initially sync all group versions to make sure we serve complete discovery
217+
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
218+
crds, err := c.crdLister.List(labels.Everything())
219+
if err != nil {
220+
utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
221+
return false, nil
222+
}
223+
for _, crd := range crds {
224+
for _, v := range crd.Spec.Versions {
225+
gv := schema.GroupVersion{crd.Spec.Group, v.Name}
226+
if err := c.sync(gv); err != nil {
227+
utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
228+
return false, nil
229+
}
230+
}
231+
}
232+
return true, nil
233+
}, stopCh); err == wait.ErrWaitTimeout {
234+
utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
235+
return
236+
} else if err != nil {
237+
panic(fmt.Errorf("unexpected error: %v", err))
238+
}
239+
close(synchedCh)
240+
216241
// only start one worker thread since its a slow moving API
217242
go wait.Until(c.runWorker, time.Second, stopCh)
218243

0 commit comments

Comments
 (0)