diff --git a/source/service.go b/source/service.go index a2a8f8f7fa..0b0e849987 100644 --- a/source/service.go +++ b/source/service.go @@ -80,13 +80,23 @@ type serviceSource struct { nodeInformer coreinformers.NodeInformer serviceTypeFilter *serviceTypes exposeInternalIPv6 bool + excludeUnschedulable bool // process Services with legacy annotations compatibility string } // NewServiceSource creates a new serviceSource with the given config. -func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter, fqdnTemplate string, combineFqdnAnnotation bool, compatibility string, publishInternal, publishHostIP, alwaysPublishNotReadyAddresses bool, serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector, resolveLoadBalancerHostname, listenEndpointEvents bool, exposeInternalIPv6 bool) (Source, error) { +func NewServiceSource( + ctx context.Context, + kubeClient kubernetes.Interface, + namespace, annotationFilter, + fqdnTemplate string, combineFqdnAnnotation bool, + compatibility string, + publishInternal, publishHostIP, alwaysPublishNotReadyAddresses bool, + serviceTypeFilter []string, ignoreHostnameAnnotation bool, labelSelector labels.Selector, + resolveLoadBalancerHostname, listenEndpointEvents, exposeInternalIPv6, excludeUnschedulable bool, +) (Source, error) { tmpl, err := fqdn.ParseTemplate(fqdnTemplate) if err != nil { return nil, err @@ -169,6 +179,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name resolveLoadBalancerHostname: resolveLoadBalancerHostname, listenEndpointEvents: listenEndpointEvents, exposeInternalIPv6: exposeInternalIPv6, + excludeUnschedulable: excludeUnschedulable, }, nil } @@ -717,6 +728,12 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe } for _, node := range nodes { + fmt.Println("NODE", node.Name) + if node.Spec.Unschedulable && sc.excludeUnschedulable { + fmt.Println("BINGO") + log.Debugf("Skipping node %s because it is unschedulable", node.Name) + continue + } for _, address := range node.Status.Addresses { switch address.Type { case v1.NodeExternalIP: diff --git a/source/service_fqdn_test.go b/source/service_fqdn_test.go index d277ef9d34..4d7233f1da 100644 --- a/source/service_fqdn_test.go +++ b/source/service_fqdn_test.go @@ -609,6 +609,7 @@ func TestServiceSourceFqdnTemplatingExamples(t *testing.T) { false, false, true, + true, ) require.NoError(t, err) diff --git a/source/service_test.go b/source/service_test.go index 07df803bd5..c74b8efbb8 100644 --- a/source/service_test.go +++ b/source/service_test.go @@ -91,6 +91,7 @@ func (suite *ServiceSuite) SetupTest() { false, false, false, + true, ) suite.NoError(err, "should initialize service source") } @@ -174,6 +175,7 @@ func testServiceSourceNewServiceSource(t *testing.T) { false, false, false, + true, ) if ti.expectError { @@ -1158,6 +1160,7 @@ func testServiceSourceEndpoints(t *testing.T) { tc.resolveLoadBalancerHostname, false, false, + true, ) require.NoError(t, err) @@ -1374,6 +1377,7 @@ func testMultipleServicesEndpoints(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) @@ -1679,6 +1683,7 @@ func TestClusterIpServices(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) @@ -1742,6 +1747,52 @@ func TestServiceSourceNodePortServices(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "node1", }, + Spec: v1.NodeSpec{ + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.1"}, + {Type: v1.NodeExternalIP, Address: "2001:DB8::1"}, + {Type: v1.NodeInternalIP, Address: "2001:DB8::2"}, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeExternalIP, Address: "54.10.11.2"}, + {Type: v1.NodeInternalIP, Address: "10.0.1.2"}, + {Type: v1.NodeExternalIP, Address: "2001:DB8::3"}, + {Type: v1.NodeInternalIP, Address: "2001:DB8::4"}, + }, + }, + }}, + }, + { + title: "TBD", + svcNamespace: "testing", + svcName: "foo", + svcType: v1.ServiceTypeNodePort, + svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeCluster, + annotations: map[string]string{ + hostnameAnnotationKey: "foo.example.org.", + }, + expected: []*endpoint.Endpoint{ + {DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.2"}, RecordType: endpoint.RecordTypeA}, + {DNSName: "foo.example.org", Targets: endpoint.Targets{"2001:DB8::3"}, RecordType: endpoint.RecordTypeAAAA}, + }, + nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + }, + Spec: v1.NodeSpec{ + Unschedulable: false, + }, Status: v1.NodeStatus{ Addresses: []v1.NodeAddress{ {Type: v1.NodeExternalIP, Address: "54.10.11.1"}, @@ -2456,6 +2507,7 @@ func TestServiceSourceNodePortServices(t *testing.T) { false, false, tc.exposeInternalIPv6, + true, ) require.NoError(t, err) @@ -3268,6 +3320,7 @@ func TestHeadlessServices(t *testing.T) { false, false, tc.exposeInternalIPv6, + true, ) require.NoError(t, err) @@ -3638,6 +3691,7 @@ func TestMultipleHeadlessServicesPointingToPodsOnTheSameNode(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) assert.NotNil(t, src) @@ -4096,6 +4150,7 @@ func TestHeadlessServicesHostIP(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) @@ -4306,6 +4361,7 @@ func TestExternalServices(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) @@ -4368,6 +4424,7 @@ func BenchmarkServiceEndpoints(b *testing.B) { false, false, false, + true, ) require.NoError(b, err) @@ -4467,6 +4524,7 @@ func TestNewServiceSourceInformersEnabled(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) svcSrc, ok := svc.(*serviceSource) @@ -4498,6 +4556,7 @@ func TestNewServiceSourceWithServiceTypeFilters_Unsupported(t *testing.T) { false, false, false, + true, ) require.Errorf(t, err, "unsupported service type filter: \"UnknownType\". Supported types are: [\"ClusterIP\" \"NodePort\" \"LoadBalancer\" \"ExternalName\"]") require.Nil(t, svc, "ServiceSource should be nil when an unsupported service type is provided") @@ -4677,6 +4736,7 @@ func TestEndpointSlicesIndexer(t *testing.T) { false, false, false, + true, ) require.NoError(t, err) ss, ok := src.(*serviceSource) @@ -4857,3 +4917,7 @@ func TestServiceSource_AddEventHandler(t *testing.T) { }) } } + +func TestServiceSource_Nodes(t *testing.T) { + +} diff --git a/source/store.go b/source/store.go index c77e89c3b4..cf81c7f13d 100644 --- a/source/store.go +++ b/source/store.go @@ -429,7 +429,11 @@ func buildServiceSource(ctx context.Context, p ClientGenerator, cfg *Config) (So if err != nil { return nil, err } - return NewServiceSource(ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.ResolveLoadBalancerHostname, cfg.ListenEndpointEvents, cfg.ExposeInternalIPv6) + return NewServiceSource( + ctx, client, cfg.Namespace, cfg.AnnotationFilter, cfg.FQDNTemplate, cfg.CombineFQDNAndAnnotation, + cfg.Compatibility, cfg.PublishInternal, cfg.PublishHostIP, cfg.AlwaysPublishNotReadyAddresses, + cfg.ServiceTypeFilter, cfg.IgnoreHostnameAnnotation, cfg.LabelFilter, cfg.ResolveLoadBalancerHostname, + cfg.ListenEndpointEvents, cfg.ExposeInternalIPv6, cfg.ExcludeUnschedulable) } // buildIngressSource creates an Ingress source for exposing Kubernetes ingresses as DNS records.