@@ -75,9 +75,9 @@ const (
75
75
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
76
76
)
77
77
78
- // NewEndpointController returns a new *EndpointController .
78
+ // NewEndpointController returns a new *Controller .
79
79
func NewEndpointController (podInformer coreinformers.PodInformer , serviceInformer coreinformers.ServiceInformer ,
80
- endpointsInformer coreinformers.EndpointsInformer , client clientset.Interface , endpointUpdatesBatchPeriod time.Duration ) * EndpointController {
80
+ endpointsInformer coreinformers.EndpointsInformer , client clientset.Interface , endpointUpdatesBatchPeriod time.Duration ) * Controller {
81
81
broadcaster := record .NewBroadcaster ()
82
82
broadcaster .StartStructuredLogging (0 )
83
83
broadcaster .StartRecordingToSink (& v1core.EventSinkImpl {Interface : client .CoreV1 ().Events ("" )})
@@ -86,7 +86,7 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
86
86
if client != nil && client .CoreV1 ().RESTClient ().GetRateLimiter () != nil {
87
87
ratelimiter .RegisterMetricAndTrackRateLimiterUsage ("endpoint_controller" , client .CoreV1 ().RESTClient ().GetRateLimiter ())
88
88
}
89
- e := & EndpointController {
89
+ e := & Controller {
90
90
client : client ,
91
91
queue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "endpoint" ),
92
92
workerLoopPeriod : time .Second ,
@@ -127,8 +127,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
127
127
return e
128
128
}
129
129
130
- // EndpointController manages selector-based service endpoints.
131
- type EndpointController struct {
130
+ // Controller manages selector-based service endpoints.
131
+ type Controller struct {
132
132
client clientset.Interface
133
133
eventBroadcaster record.EventBroadcaster
134
134
eventRecorder record.EventRecorder
@@ -177,7 +177,7 @@ type EndpointController struct {
177
177
178
178
// Run will not return until stopCh is closed. workers determines how many
179
179
// endpoints will be handled in parallel.
180
- func (e * EndpointController ) Run (workers int , stopCh <- chan struct {}) {
180
+ func (e * Controller ) Run (workers int , stopCh <- chan struct {}) {
181
181
defer utilruntime .HandleCrash ()
182
182
defer e .queue .ShutDown ()
183
183
@@ -202,7 +202,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
202
202
203
203
// When a pod is added, figure out what services it will be a member of and
204
204
// enqueue them. obj must have *v1.Pod type.
205
- func (e * EndpointController ) addPod (obj interface {}) {
205
+ func (e * Controller ) addPod (obj interface {}) {
206
206
pod := obj .(* v1.Pod )
207
207
services , err := e .serviceSelectorCache .GetPodServiceMemberships (e .serviceLister , pod )
208
208
if err != nil {
@@ -250,7 +250,7 @@ func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointA
250
250
// When a pod is updated, figure out what services it used to be a member of
251
251
// and what services it will be a member of, and enqueue the union of these.
252
252
// old and cur must be *v1.Pod types.
253
- func (e * EndpointController ) updatePod (old , cur interface {}) {
253
+ func (e * Controller ) updatePod (old , cur interface {}) {
254
254
services := endpointutil .GetServicesToUpdateOnPodChange (e .serviceLister , e .serviceSelectorCache , old , cur )
255
255
for key := range services {
256
256
e .queue .AddAfter (key , e .endpointUpdatesBatchPeriod )
@@ -259,15 +259,15 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
259
259
260
260
// When a pod is deleted, enqueue the services the pod used to be a member of.
261
261
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
262
- func (e * EndpointController ) deletePod (obj interface {}) {
262
+ func (e * Controller ) deletePod (obj interface {}) {
263
263
pod := endpointutil .GetPodFromDeleteAction (obj )
264
264
if pod != nil {
265
265
e .addPod (pod )
266
266
}
267
267
}
268
268
269
269
// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
270
- func (e * EndpointController ) onServiceUpdate (obj interface {}) {
270
+ func (e * Controller ) onServiceUpdate (obj interface {}) {
271
271
key , err := controller .KeyFunc (obj )
272
272
if err != nil {
273
273
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for object %+v: %v" , obj , err ))
@@ -279,7 +279,7 @@ func (e *EndpointController) onServiceUpdate(obj interface{}) {
279
279
}
280
280
281
281
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
282
- func (e * EndpointController ) onServiceDelete (obj interface {}) {
282
+ func (e * Controller ) onServiceDelete (obj interface {}) {
283
283
key , err := controller .KeyFunc (obj )
284
284
if err != nil {
285
285
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for object %+v: %v" , obj , err ))
@@ -290,7 +290,7 @@ func (e *EndpointController) onServiceDelete(obj interface{}) {
290
290
e .queue .Add (key )
291
291
}
292
292
293
- func (e * EndpointController ) onEndpointsDelete (obj interface {}) {
293
+ func (e * Controller ) onEndpointsDelete (obj interface {}) {
294
294
key , err := controller .KeyFunc (obj )
295
295
if err != nil {
296
296
utilruntime .HandleError (fmt .Errorf ("Couldn't get key for object %+v: %v" , obj , err ))
@@ -303,12 +303,12 @@ func (e *EndpointController) onEndpointsDelete(obj interface{}) {
303
303
// marks them done. You may run as many of these in parallel as you wish; the
304
304
// workqueue guarantees that they will not end up processing the same service
305
305
// at the same time.
306
- func (e * EndpointController ) worker () {
306
+ func (e * Controller ) worker () {
307
307
for e .processNextWorkItem () {
308
308
}
309
309
}
310
310
311
- func (e * EndpointController ) processNextWorkItem () bool {
311
+ func (e * Controller ) processNextWorkItem () bool {
312
312
eKey , quit := e .queue .Get ()
313
313
if quit {
314
314
return false
@@ -321,7 +321,7 @@ func (e *EndpointController) processNextWorkItem() bool {
321
321
return true
322
322
}
323
323
324
- func (e * EndpointController ) handleErr (err error , key interface {}) {
324
+ func (e * Controller ) handleErr (err error , key interface {}) {
325
325
if err == nil {
326
326
e .queue .Forget (key )
327
327
return
@@ -343,7 +343,7 @@ func (e *EndpointController) handleErr(err error, key interface{}) {
343
343
utilruntime .HandleError (err )
344
344
}
345
345
346
- func (e * EndpointController ) syncService (key string ) error {
346
+ func (e * Controller ) syncService (key string ) error {
347
347
startTime := time .Now ()
348
348
defer func () {
349
349
klog .V (4 ).Infof ("Finished syncing service %q endpoints. (%v)" , key , time .Since (startTime ))
@@ -550,7 +550,7 @@ func (e *EndpointController) syncService(key string) error {
550
550
// do this once on startup, because in steady-state these are detected (but
551
551
// some stragglers could have been left behind if the endpoint controller
552
552
// reboots).
553
- func (e * EndpointController ) checkLeftoverEndpoints () {
553
+ func (e * Controller ) checkLeftoverEndpoints () {
554
554
list , err := e .endpointsLister .List (labels .Everything ())
555
555
if err != nil {
556
556
utilruntime .HandleError (fmt .Errorf ("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)" , err ))
0 commit comments