@@ -19,6 +19,7 @@ package master
19
19
import (
20
20
"fmt"
21
21
"net"
22
+ "net/http"
22
23
"time"
23
24
24
25
corev1 "k8s.io/api/core/v1"
@@ -31,6 +32,7 @@ import (
31
32
genericapiserver "k8s.io/apiserver/pkg/server"
32
33
utilfeature "k8s.io/apiserver/pkg/util/feature"
33
34
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
35
+ "k8s.io/client-go/rest"
34
36
"k8s.io/klog"
35
37
"k8s.io/kubernetes/pkg/features"
36
38
"k8s.io/kubernetes/pkg/master/reconcilers"
@@ -51,6 +53,7 @@ type Controller struct {
51
53
ServiceClient corev1client.ServicesGetter
52
54
NamespaceClient corev1client.NamespacesGetter
53
55
EventClient corev1client.EventsGetter
56
+ healthClient rest.Interface
54
57
55
58
ServiceClusterIPRegistry rangeallocation.RangeRegistry
56
59
ServiceClusterIPInterval time.Duration
@@ -80,7 +83,7 @@ type Controller struct {
80
83
}
81
84
82
85
// NewBootstrapController returns a controller for watching the core capabilities of the master
83
- func (c * completedConfig ) NewBootstrapController (legacyRESTStorage corerest.LegacyRESTStorage , serviceClient corev1client.ServicesGetter , nsClient corev1client.NamespacesGetter , eventClient corev1client.EventsGetter ) * Controller {
86
+ func (c * completedConfig ) NewBootstrapController (legacyRESTStorage corerest.LegacyRESTStorage , serviceClient corev1client.ServicesGetter , nsClient corev1client.NamespacesGetter , eventClient corev1client.EventsGetter , healthClient rest. Interface ) * Controller {
84
87
_ , publicServicePort , err := c .GenericConfig .SecureServing .HostPort ()
85
88
if err != nil {
86
89
klog .Fatalf ("failed to get listener address: %v" , err )
@@ -95,6 +98,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
95
98
ServiceClient : serviceClient ,
96
99
NamespaceClient : nsClient ,
97
100
EventClient : eventClient ,
101
+ healthClient : healthClient ,
98
102
99
103
EndpointReconciler : c .ExtraConfig .EndpointReconcilerConfig .Reconciler ,
100
104
EndpointInterval : c .ExtraConfig .EndpointReconcilerConfig .Interval ,
@@ -138,6 +142,12 @@ func (c *Controller) Start() {
138
142
return
139
143
}
140
144
145
+ // Reconcile during first run removing itself until server is ready.
146
+ endpointPorts := createEndpointPortSpec (c .PublicServicePort , "https" , c .ExtraEndpointPorts )
147
+ if err := c .EndpointReconciler .RemoveEndpoints (kubernetesServiceName , c .PublicIP , endpointPorts ); err != nil {
148
+ klog .Errorf ("Unable to remove old endpoints from kubernetes service: %v" , err )
149
+ }
150
+
141
151
repairClusterIPs := servicecontroller .NewRepair (c .ServiceClusterIPInterval , c .ServiceClient , c .EventClient , & c .ServiceClusterIPRange , c .ServiceClusterIPRegistry )
142
152
repairNodePorts := portallocatorcontroller .NewRepair (c .ServiceNodePortInterval , c .ServiceClient , c .EventClient , c .ServiceNodePortRange , c .ServiceNodePortRegistry )
143
153
@@ -150,10 +160,6 @@ func (c *Controller) Start() {
150
160
// If we fail to repair node ports apiserver is useless. We should restart and retry.
151
161
klog .Fatalf ("Unable to perform initial service nodePort check: %v" , err )
152
162
}
153
- // Service definition is reconciled during first run to correct port and type per expectations.
154
- if err := c .UpdateKubernetesService (true ); err != nil {
155
- klog .Errorf ("Unable to perform initial Kubernetes service initialization: %v" , err )
156
- }
157
163
158
164
c .runner = async .NewRunner (c .RunKubernetesNamespaces , c .RunKubernetesService , repairClusterIPs .RunUntil , repairNodePorts .RunUntil )
159
165
c .runner .Start ()
@@ -168,7 +174,8 @@ func (c *Controller) Stop() {
168
174
go func () {
169
175
defer close (finishedReconciling )
170
176
klog .Infof ("Shutting down kubernetes service endpoint reconciler" )
171
- if err := c .EndpointReconciler .StopReconciling (kubernetesServiceName , c .PublicIP , endpointPorts ); err != nil {
177
+ c .EndpointReconciler .StopReconciling ()
178
+ if err := c .EndpointReconciler .RemoveEndpoints (kubernetesServiceName , c .PublicIP , endpointPorts ); err != nil {
172
179
klog .Error (err )
173
180
}
174
181
}()
@@ -178,7 +185,7 @@ func (c *Controller) Stop() {
178
185
// done
179
186
case <- time .After (2 * c .EndpointInterval ):
180
187
// don't block server shutdown forever if we can't reach etcd to remove ourselves
181
- klog .Warning ("StopReconciling () timed out" )
188
+ klog .Warning ("RemoveEndpoints () timed out" )
182
189
}
183
190
}
184
191
@@ -196,7 +203,14 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
196
203
197
204
// RunKubernetesService periodically updates the kubernetes service
198
205
func (c * Controller ) RunKubernetesService (ch chan struct {}) {
199
- wait .Until (func () {
206
+ // wait until process is ready
207
+ wait .PollImmediateUntil (100 * time .Millisecond , func () (bool , error ) {
208
+ var code int
209
+ c .healthClient .Get ().AbsPath ("/healthz" ).Do ().StatusCode (& code )
210
+ return code == http .StatusOK , nil
211
+ }, ch )
212
+
213
+ wait .NonSlidingUntil (func () {
200
214
// Service definition is not reconciled after first
201
215
// run, ports and type will be corrected only during
202
216
// start.
0 commit comments