Skip to content

Commit ce17870

Browse files
authored
Merge pull request #54 from nginxinc/fix-use-service-ports
Fixed the port mapping issue
2 parents 34bd211 + 812fd07 commit ce17870

File tree

6 files changed

+205
-92
lines changed

6 files changed

+205
-92
lines changed

nginx-controller/controller/controller.go

Lines changed: 97 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ import (
2626

2727
"github.com/nginxinc/kubernetes-ingress/nginx-controller/nginx"
2828
"k8s.io/kubernetes/pkg/api"
29+
podutil "k8s.io/kubernetes/pkg/api/pod"
2930
"k8s.io/kubernetes/pkg/apis/extensions"
3031
"k8s.io/kubernetes/pkg/client/cache"
3132
client "k8s.io/kubernetes/pkg/client/unversioned"
3233
"k8s.io/kubernetes/pkg/controller/framework"
34+
"k8s.io/kubernetes/pkg/labels"
3335
"k8s.io/kubernetes/pkg/runtime"
36+
"k8s.io/kubernetes/pkg/util/intstr"
3437
"k8s.io/kubernetes/pkg/watch"
3538
)
3639

@@ -384,13 +387,13 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx.
384387
ingEx.Secrets[secretName] = secret
385388
}
386389

387-
ingEx.Endpoints = make(map[string]*api.Endpoints)
390+
ingEx.Endpoints = make(map[string][]string)
388391
if ing.Spec.Backend != nil {
389392
endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace)
390393
if err != nil {
391-
glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", ing.Spec.Backend.ServiceName, err)
394+
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err)
392395
} else {
393-
ingEx.Endpoints[ing.Spec.Backend.ServiceName] = endps
396+
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps
394397
}
395398
}
396399

@@ -402,35 +405,113 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx.
402405
for _, path := range rule.HTTP.Paths {
403406
endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace)
404407
if err != nil {
405-
glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", path.Backend.ServiceName, err)
408+
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err)
406409
} else {
407-
ingEx.Endpoints[path.Backend.ServiceName] = endps
410+
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps
408411
}
409-
410412
}
411413
}
412414

413415
return ingEx
414416
}
415417

416-
func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Endpoints, error) {
418+
func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) ([]string, error) {
419+
svc, err := lbc.getServiceForIngressBackend(backend, namespace)
420+
if err != nil {
421+
glog.V(3).Infof("Error getting service %v: %v", backend.ServiceName, err)
422+
return nil, err
423+
}
424+
425+
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
426+
if err != nil {
427+
glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err)
428+
return nil, err
429+
}
430+
431+
result, err := lbc.getEndpointsForPort(endps, backend.ServicePort, svc)
432+
if err != nil {
433+
glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, backend.ServicePort, err)
434+
return nil, err
435+
}
436+
return result, nil
437+
}
438+
439+
func (lbc *LoadBalancerController) getEndpointsForPort(endps api.Endpoints, ingSvcPort intstr.IntOrString, svc *api.Service) ([]string, error) {
440+
var targetPort int
441+
var err error
442+
found := false
443+
444+
for _, port := range svc.Spec.Ports {
445+
if (ingSvcPort.Type == intstr.Int && port.Port == ingSvcPort.IntValue()) || (ingSvcPort.Type == intstr.String && port.Name == ingSvcPort.String()) {
446+
targetPort, err = lbc.getTargetPort(&port, svc)
447+
if err != nil {
448+
return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", ingSvcPort, err)
449+
}
450+
found = true
451+
break
452+
}
453+
}
454+
455+
if !found {
456+
return nil, fmt.Errorf("No port %v in service %s", ingSvcPort, svc.Name)
457+
}
458+
459+
for _, subset := range endps.Subsets {
460+
for _, port := range subset.Ports {
461+
if port.Port == targetPort {
462+
var endpoints []string
463+
for _, address := range subset.Addresses {
464+
endpoint := fmt.Sprintf("%v:%v", address.IP, port.Port)
465+
endpoints = append(endpoints, endpoint)
466+
}
467+
return endpoints, nil
468+
}
469+
}
470+
}
471+
472+
return nil, fmt.Errorf("No endpoints for target port %v in service %s", targetPort, svc.Name)
473+
}
474+
475+
func (lbc *LoadBalancerController) getTargetPort(svcPort *api.ServicePort, svc *api.Service) (int, error) {
476+
if (svcPort.TargetPort == intstr.IntOrString{}) {
477+
return svcPort.Port, nil
478+
}
479+
480+
if svcPort.TargetPort.Type == intstr.Int {
481+
return svcPort.TargetPort.IntValue(), nil
482+
}
483+
484+
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelector()})
485+
if err != nil {
486+
return 0, fmt.Errorf("Error getting pod information: %v", err)
487+
}
488+
489+
if len(pods.Items) == 0 {
490+
return 0, fmt.Errorf("No pods of service %s", svc.Name)
491+
}
492+
493+
pod := &pods.Items[0]
494+
495+
portNum, err := podutil.FindPort(pod, svcPort)
496+
if err != nil {
497+
return 0, fmt.Errorf("Error finding named port %v in pod %s: %v", svcPort, pod.Name, err)
498+
}
499+
500+
return portNum, nil
501+
}
502+
503+
func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Service, error) {
417504
svcKey := namespace + "/" + backend.ServiceName
418505
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
419506
if err != nil {
420-
glog.V(3).Infof("error getting service %v from the cache: %v", svcKey, err)
421507
return nil, err
422508
}
509+
423510
if svcExists {
424-
svc := svcObj.(*api.Service)
425-
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
426-
if err != nil {
427-
glog.V(3).Infof("error getting endpoints for service %v from the cache: %v", svc, err)
428-
return nil, err
429-
}
430-
return &endps, nil
511+
return svcObj.(*api.Service), nil
431512
}
432-
return nil, fmt.Errorf("service %s doesn't exists", svcKey)
433513

514+
return nil, fmt.Errorf("service %s doesn't exists", svcKey)
434515
}
435516

436517
func parseNginxConfigMaps(nginxConfigMaps string) (string, string, error) {

nginx-controller/nginx/configurator.go

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,13 @@ func createLocation(path string, upstream Upstream, cfg *Config, websocket bool)
203203
func (cnf *Configurator) createUpstream(ingEx *IngressEx, name string, backend *extensions.IngressBackend, namespace string) Upstream {
204204
ups := NewUpstreamWithDefaultServer(name)
205205

206-
endps, exists := ingEx.Endpoints[backend.ServiceName]
206+
endps, exists := ingEx.Endpoints[backend.ServiceName+backend.ServicePort.String()]
207207
if exists {
208-
upsServers := endpointsToUpstreamServers(*endps, backend.ServicePort.IntValue())
208+
var upsServers []UpstreamServer
209+
for _, endp := range endps {
210+
addressport := strings.Split(endp, ":")
211+
upsServers = append(upsServers, UpstreamServer{addressport[0], addressport[1]})
212+
}
209213
if len(upsServers) > 0 {
210214
ups.UpstreamServers = upsServers
211215
}
@@ -221,23 +225,6 @@ func pathOrDefault(path string) string {
221225
return path
222226
}
223227

224-
func endpointsToUpstreamServers(endps api.Endpoints, servicePort int) []UpstreamServer {
225-
var upsServers []UpstreamServer
226-
for _, subset := range endps.Subsets {
227-
for _, port := range subset.Ports {
228-
if port.Port == servicePort {
229-
for _, address := range subset.Addresses {
230-
ups := UpstreamServer{Address: address.IP, Port: fmt.Sprintf("%v", servicePort)}
231-
upsServers = append(upsServers, ups)
232-
}
233-
break
234-
}
235-
}
236-
}
237-
238-
return upsServers
239-
}
240-
241228
func getNameForUpstream(ing *extensions.Ingress, host string, service string) string {
242229
return fmt.Sprintf("%v-%v-%v-%v", ing.Namespace, ing.Name, host, service)
243230
}

nginx-controller/nginx/ingress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import "k8s.io/kubernetes/pkg/apis/extensions"
88
type IngressEx struct {
99
Ingress *extensions.Ingress
1010
Secrets map[string]*api.Secret
11-
Endpoints map[string]*api.Endpoints
11+
Endpoints map[string][]string
1212
}

nginx-plus-controller/controller/controller.go

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ import (
2626

2727
"github.com/nginxinc/kubernetes-ingress/nginx-plus-controller/nginx"
2828
"k8s.io/kubernetes/pkg/api"
29+
podutil "k8s.io/kubernetes/pkg/api/pod"
2930
"k8s.io/kubernetes/pkg/apis/extensions"
3031
"k8s.io/kubernetes/pkg/client/cache"
3132
client "k8s.io/kubernetes/pkg/client/unversioned"
3233
"k8s.io/kubernetes/pkg/controller/framework"
34+
"k8s.io/kubernetes/pkg/labels"
3335
"k8s.io/kubernetes/pkg/runtime"
36+
"k8s.io/kubernetes/pkg/util/intstr"
3437
"k8s.io/kubernetes/pkg/watch"
3538
)
3639

@@ -384,13 +387,13 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx.
384387
ingEx.Secrets[secretName] = secret
385388
}
386389

387-
ingEx.Endpoints = make(map[string]*api.Endpoints)
390+
ingEx.Endpoints = make(map[string][]string)
388391
if ing.Spec.Backend != nil {
389392
endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace)
390393
if err != nil {
391-
glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", ing.Spec.Backend.ServiceName, err)
394+
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err)
392395
} else {
393-
ingEx.Endpoints[ing.Spec.Backend.ServiceName] = endps
396+
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps
394397
}
395398
}
396399

@@ -402,32 +405,110 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) nginx.
402405
for _, path := range rule.HTTP.Paths {
403406
endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace)
404407
if err != nil {
405-
glog.V(3).Infof("Error retrieving endpoints for the services %v: %v", path.Backend.ServiceName, err)
408+
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err)
406409
} else {
407-
ingEx.Endpoints[path.Backend.ServiceName] = endps
410+
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps
408411
}
409-
410412
}
411413
}
412414

413415
return ingEx
414416
}
415417

416-
func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Endpoints, error) {
418+
func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extensions.IngressBackend, namespace string) ([]string, error) {
419+
svc, err := lbc.getServiceForIngressBackend(backend, namespace)
420+
if err != nil {
421+
glog.V(3).Infof("Error getting service %v: %v", backend.ServiceName, err)
422+
return nil, err
423+
}
424+
425+
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
426+
if err != nil {
427+
glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err)
428+
return nil, err
429+
}
430+
431+
result, err := lbc.getEndpointsForPort(endps, backend.ServicePort, svc)
432+
if err != nil {
433+
glog.V(3).Infof("Error getting endpoints for service %s port %v: %v", svc.Name, backend.ServicePort, err)
434+
return nil, err
435+
}
436+
return result, nil
437+
}
438+
439+
func (lbc *LoadBalancerController) getEndpointsForPort(endps api.Endpoints, ingSvcPort intstr.IntOrString, svc *api.Service) ([]string, error) {
440+
var targetPort int
441+
var err error
442+
found := false
443+
444+
for _, port := range svc.Spec.Ports {
445+
if (ingSvcPort.Type == intstr.Int && port.Port == ingSvcPort.IntValue()) || (ingSvcPort.Type == intstr.String && port.Name == ingSvcPort.String()) {
446+
targetPort, err = lbc.getTargetPort(&port, svc)
447+
if err != nil {
448+
return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", ingSvcPort, err)
449+
}
450+
found = true
451+
break
452+
}
453+
}
454+
455+
if !found {
456+
return nil, fmt.Errorf("No port %v in service %s", ingSvcPort, svc.Name)
457+
}
458+
459+
for _, subset := range endps.Subsets {
460+
for _, port := range subset.Ports {
461+
if port.Port == targetPort {
462+
var endpoints []string
463+
for _, address := range subset.Addresses {
464+
endpoint := fmt.Sprintf("%v:%v", address.IP, port.Port)
465+
endpoints = append(endpoints, endpoint)
466+
}
467+
return endpoints, nil
468+
}
469+
}
470+
}
471+
472+
return nil, fmt.Errorf("No endpoints for target port %v in service %s", targetPort, svc.Name)
473+
}
474+
475+
func (lbc *LoadBalancerController) getTargetPort(svcPort *api.ServicePort, svc *api.Service) (int, error) {
476+
if (svcPort.TargetPort == intstr.IntOrString{}) {
477+
return svcPort.Port, nil
478+
}
479+
480+
if svcPort.TargetPort.Type == intstr.Int {
481+
return svcPort.TargetPort.IntValue(), nil
482+
}
483+
484+
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelector()})
485+
if err != nil {
486+
return 0, fmt.Errorf("Error getting pod information: %v", err)
487+
}
488+
489+
if len(pods.Items) == 0 {
490+
return 0, fmt.Errorf("No pods of service %s", svc.Name)
491+
}
492+
493+
pod := &pods.Items[0]
494+
495+
portNum, err := podutil.FindPort(pod, svcPort)
496+
if err != nil {
497+
return 0, fmt.Errorf("Error finding named port %v in pod %s: %v", svcPort, pod.Name, err)
498+
}
499+
500+
return portNum, nil
501+
}
502+
503+
func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensions.IngressBackend, namespace string) (*api.Service, error) {
417504
svcKey := namespace + "/" + backend.ServiceName
418505
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
419506
if err != nil {
420-
glog.V(3).Infof("error getting service %v from the cache: %v", svcKey, err)
421507
return nil, err
422508
}
509+
423510
if svcExists {
424-
svc := svcObj.(*api.Service)
425-
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
426-
if err != nil {
427-
glog.V(3).Infof("error getting endpoints for service %v from the cache: %v", svc, err)
428-
return nil, err
429-
}
430-
return &endps, nil
511+
return svcObj.(*api.Service), nil
431512
}
432513

433514
return nil, fmt.Errorf("service %s doesn't exists", svcKey)

0 commit comments

Comments
 (0)