Skip to content

Commit d1e7c63

Browse files
authored
Fix 1158 (#1160)
1 parent 93ec00c commit d1e7c63

File tree

12 files changed

+1147
-531
lines changed

12 files changed

+1147
-531
lines changed

docs/src/main/asciidoc/discovery-client.adoc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,23 @@ Spring Cloud Kubernetes can also watch the Kubernetes service catalog for change
131131
milliseconds (by default it is `30000`). The heartbeat event will contain the target references (and their namespaces of the addresses of all endpoints
132132
(for the exact details of what will get returned you can take a look inside `KubernetesCatalogWatch`). This is an implementation detail, and listeners of the heartbeat event
133133
should not rely on the details. Instead, they should see if there are differences between two subsequent heartbeats via `equals` method. We will take care to return a correct implementation that adheres to the equals contract.
134-
The endpoints will be queried in either all namespaces (enabled via `spring.cloud.kubernetes.discovery.all-namespaces=true`), or
135-
we will use: xref:property-source-config.adoc#namespace-resolution[Namespace Resolution].
134+
The endpoints will be queried in either :
135+
136+
- all namespaces (enabled via `spring.cloud.kubernetes.discovery.all-namespaces=true`)
137+
138+
- specific namespaces (enabled via `spring.cloud.kubernetes.discovery.namespaces`), for example:
139+
140+
```
141+
spring:
142+
cloud:
143+
kubernetes:
144+
discovery:
145+
namespaces:
146+
- namespace-a
147+
- namespace-b
148+
```
149+
150+
- we will use: xref:property-source-config.adoc#namespace-resolution[Namespace Resolution] if the above two paths are not taken.
136151

137152
In order to enable this functionality you need to add
138153
`@EnableScheduling` on a configuration class in your application.

spring-cloud-kubernetes-fabric8-config/src/main/java/org/springframework/cloud/kubernetes/fabric8/config/Fabric8ConfigContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@
2727
*
2828
* @author wind57
2929
*/
30-
final record Fabric8ConfigContext(KubernetesClient client, NormalizedSource normalizedSource, String namespace,
30+
record Fabric8ConfigContext(KubernetesClient client, NormalizedSource normalizedSource, String namespace,
3131
Environment environment) {
3232
}

spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8EndpointSliceV1CatalogWatch.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.kubernetes.fabric8.discovery;
1818

19+
import java.util.ArrayList;
1920
import java.util.List;
2021
import java.util.function.Function;
2122
import java.util.stream.Stream;
@@ -44,23 +45,25 @@ final class Fabric8EndpointSliceV1CatalogWatch
4445
public List<EndpointNameAndNamespace> apply(Fabric8CatalogWatchContext context) {
4546
// take only pods that have endpoints
4647
List<EndpointSlice> endpointSlices;
48+
KubernetesClient client = context.kubernetesClient();
49+
4750
if (context.properties().allNamespaces()) {
4851
LOG.debug(() -> "discovering endpoints in all namespaces");
49-
50-
// can't use try with resources here as it will close the client
51-
KubernetesClient client = context.kubernetesClient();
5252
endpointSlices = client.discovery().v1().endpointSlices().inAnyNamespace()
5353
.withLabels(context.properties().serviceLabels()).list().getItems();
5454
}
55+
else if (!context.properties().namespaces().isEmpty()) {
56+
LOG.debug(() -> "discovering endpoints in " + context.properties().namespaces());
57+
List<EndpointSlice> inner = new ArrayList<>(context.properties().namespaces().size());
58+
context.properties().namespaces()
59+
.forEach(namespace -> inner.addAll(endpointSlices(context, namespace, client)));
60+
endpointSlices = inner;
61+
}
5562
else {
5663
String namespace = Fabric8Utils.getApplicationNamespace(context.kubernetesClient(), null, "catalog-watcher",
5764
context.namespaceProvider());
58-
LOG.debug(() -> "fabric8 catalog watcher will use namespace : " + namespace);
59-
60-
// can't use try with resources here as it will close the client
61-
KubernetesClient client = context.kubernetesClient();
62-
endpointSlices = client.discovery().v1().endpointSlices().inNamespace(namespace)
63-
.withLabels(context.properties().serviceLabels()).list().getItems();
65+
LOG.debug(() -> "discovering endpoints in namespace : " + namespace);
66+
endpointSlices = endpointSlices(context, namespace, client);
6467
}
6568

6669
Stream<ObjectReference> references = endpointSlices.stream().map(EndpointSlice::getEndpoints)
@@ -70,4 +73,10 @@ public List<EndpointNameAndNamespace> apply(Fabric8CatalogWatchContext context)
7073

7174
}
7275

76+
private List<EndpointSlice> endpointSlices(Fabric8CatalogWatchContext context, String namespace,
77+
KubernetesClient client) {
78+
return client.discovery().v1().endpointSlices().inNamespace(namespace)
79+
.withLabels(context.properties().serviceLabels()).list().getItems();
80+
}
81+
7382
}

spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8EndpointsCatalogWatch.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.cloud.kubernetes.fabric8.discovery;
1818

19+
import java.util.ArrayList;
1920
import java.util.List;
2021
import java.util.Objects;
2122
import java.util.function.Function;
@@ -44,25 +45,25 @@ final class Fabric8EndpointsCatalogWatch
4445

4546
@Override
4647
public List<EndpointNameAndNamespace> apply(Fabric8CatalogWatchContext context) {
47-
// take only pods that have endpoints
4848
List<Endpoints> endpoints;
49+
KubernetesClient client = context.kubernetesClient();
50+
4951
if (context.properties().allNamespaces()) {
5052
LOG.debug(() -> "discovering endpoints in all namespaces");
51-
52-
// can't use try with resources here as it will close the client
53-
KubernetesClient client = context.kubernetesClient();
5453
endpoints = client.endpoints().inAnyNamespace().withLabels(context.properties().serviceLabels()).list()
5554
.getItems();
5655
}
56+
else if (!context.properties().namespaces().isEmpty()) {
57+
LOG.debug(() -> "discovering endpoints in " + context.properties().namespaces());
58+
List<Endpoints> inner = new ArrayList<>(context.properties().namespaces().size());
59+
context.properties().namespaces().forEach(namespace -> inner.addAll(endpoints(context, namespace, client)));
60+
endpoints = inner;
61+
}
5762
else {
5863
String namespace = Fabric8Utils.getApplicationNamespace(context.kubernetesClient(), null, "catalog-watcher",
5964
context.namespaceProvider());
60-
LOG.debug(() -> "fabric8 catalog watcher will use namespace : " + namespace);
61-
62-
// can't use try with resources here as it will close the client
63-
KubernetesClient client = context.kubernetesClient();
64-
endpoints = client.endpoints().inNamespace(namespace).withLabels(context.properties().serviceLabels())
65-
.list().getItems();
65+
LOG.debug(() -> "discovering endpoints in namespace : " + namespace);
66+
endpoints = endpoints(context, namespace, client);
6667
}
6768

6869
/**
@@ -82,4 +83,9 @@ public List<EndpointNameAndNamespace> apply(Fabric8CatalogWatchContext context)
8283
return Fabric8CatalogWatchContext.state(references);
8384
}
8485

86+
private List<Endpoints> endpoints(Fabric8CatalogWatchContext context, String namespace, KubernetesClient client) {
87+
return client.endpoints().inNamespace(namespace).withLabels(context.properties().serviceLabels()).list()
88+
.getItems();
89+
}
90+
8591
}

spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/KubernetesCatalogWatch.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public void catalogServicesWatch() {
8484

8585
@PostConstruct
8686
void postConstruct() {
87+
stateGenerator = stateGenerator();
88+
}
89+
90+
Function<Fabric8CatalogWatchContext, List<EndpointNameAndNamespace>> stateGenerator() {
91+
92+
Function<Fabric8CatalogWatchContext, List<EndpointNameAndNamespace>> localStateGenerator;
8793

8894
if (context.properties().useEndpointSlices()) {
8995
// can't use try with resources here as it will close the client
@@ -99,14 +105,16 @@ void postConstruct() {
99105
throw new IllegalArgumentException("EndpointSlices are not supported on the cluster");
100106
}
101107
else {
102-
stateGenerator = new Fabric8EndpointSliceV1CatalogWatch();
108+
localStateGenerator = new Fabric8EndpointSliceV1CatalogWatch();
103109
}
104110
}
105111
else {
106-
stateGenerator = new Fabric8EndpointsCatalogWatch();
112+
localStateGenerator = new Fabric8EndpointsCatalogWatch();
107113
}
108114

109-
LOG.debug(() -> "stateGenerator is of type: " + stateGenerator.getClass().getSimpleName());
115+
LOG.debug(() -> "stateGenerator is of type: " + localStateGenerator.getClass().getSimpleName());
116+
117+
return localStateGenerator;
110118
}
111119

112120
}

0 commit comments

Comments
 (0)