Skip to content

Commit 3eac93f

Browse files
authored
Merge pull request #2071 from wind57/fix-1641-issue-with-wiring
Part of #1641 issue with wiring + k8-native discovery cacheable implementation
2 parents a75f090 + 31e2792 commit 3eac93f

15 files changed

+667
-257
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.client.discovery;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import java.util.function.Predicate;
24+
import java.util.function.Supplier;
25+
import java.util.stream.Collectors;
26+
27+
import io.kubernetes.client.informer.SharedInformer;
28+
import io.kubernetes.client.informer.SharedInformerFactory;
29+
import io.kubernetes.client.informer.cache.Lister;
30+
import io.kubernetes.client.openapi.apis.CoreV1Api;
31+
import io.kubernetes.client.openapi.models.V1EndpointAddress;
32+
import io.kubernetes.client.openapi.models.V1EndpointSubset;
33+
import io.kubernetes.client.openapi.models.V1Endpoints;
34+
import io.kubernetes.client.openapi.models.V1Service;
35+
import jakarta.annotation.PostConstruct;
36+
import org.apache.commons.logging.LogFactory;
37+
38+
import org.springframework.cloud.client.ServiceInstance;
39+
import org.springframework.cloud.client.discovery.DiscoveryClient;
40+
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
41+
import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata;
42+
import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber;
43+
import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver;
44+
import org.springframework.core.log.LogAccessor;
45+
46+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.addresses;
47+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.endpointSubsetsPortData;
48+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.matchesServiceLabels;
49+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.postConstruct;
50+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.serviceMetadata;
51+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.externalName;
52+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.nonExternalName;
53+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientPodLabelsAndAnnotationsSupplier.nonExternalName;
54+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.endpointsPort;
55+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.externalNameServiceInstance;
56+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstance;
57+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstanceMetadata;
58+
import static org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryConstants.EXTERNAL_NAME;
59+
60+
abstract class KubernetesClientBlockingAbstractInformerDiscoveryClient implements DiscoveryClient {
61+
62+
private static final LogAccessor LOG = new LogAccessor(
63+
LogFactory.getLog(KubernetesClientBlockingAbstractInformerDiscoveryClient.class));
64+
65+
private final List<SharedInformerFactory> sharedInformerFactories;
66+
67+
private final List<Lister<V1Service>> serviceListers;
68+
69+
private final List<Lister<V1Endpoints>> endpointsListers;
70+
71+
private final Supplier<Boolean> informersReadyFunc;
72+
73+
private final KubernetesDiscoveryProperties properties;
74+
75+
private final Predicate<V1Service> predicate;
76+
77+
private final ServicePortSecureResolver servicePortSecureResolver;
78+
79+
private final CoreV1Api coreV1Api;
80+
81+
KubernetesClientBlockingAbstractInformerDiscoveryClient(List<SharedInformerFactory> sharedInformerFactories,
82+
List<Lister<V1Service>> serviceListers, List<Lister<V1Endpoints>> endpointsListers,
83+
List<SharedInformer<V1Service>> serviceInformers, List<SharedInformer<V1Endpoints>> endpointsInformers,
84+
KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate<V1Service> predicate) {
85+
this.sharedInformerFactories = sharedInformerFactories;
86+
this.serviceListers = serviceListers;
87+
this.endpointsListers = endpointsListers;
88+
this.coreV1Api = coreV1Api;
89+
this.properties = properties;
90+
this.predicate = predicate;
91+
92+
servicePortSecureResolver = new ServicePortSecureResolver(properties);
93+
94+
this.informersReadyFunc = () -> {
95+
boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream()
96+
.map(SharedInformer::hasSynced)
97+
.reduce(Boolean::logicalAnd)
98+
.orElse(false);
99+
boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream()
100+
.map(SharedInformer::hasSynced)
101+
.reduce(Boolean::logicalAnd)
102+
.orElse(false);
103+
return serviceInformersReady && endpointsInformersReady;
104+
};
105+
106+
}
107+
108+
@Override
109+
public List<String> getServices() {
110+
List<String> services = serviceListers.stream()
111+
.flatMap(serviceLister -> serviceLister.list().stream())
112+
.filter(service -> matchesServiceLabels(service, properties))
113+
.filter(predicate)
114+
.map(s -> s.getMetadata().getName())
115+
.distinct()
116+
.toList();
117+
LOG.debug(() -> "will return services : " + services);
118+
return services;
119+
}
120+
121+
@Override
122+
public List<ServiceInstance> getInstances(String serviceId) {
123+
Objects.requireNonNull(serviceId, "serviceId must be provided");
124+
125+
List<V1Service> allServices = serviceListers.stream()
126+
.flatMap(x -> x.list().stream())
127+
.filter(scv -> scv.getMetadata() != null)
128+
.filter(svc -> serviceId.equals(svc.getMetadata().getName()))
129+
.filter(scv -> matchesServiceLabels(scv, properties))
130+
.toList();
131+
132+
List<ServiceInstance> serviceInstances = allServices.stream()
133+
.filter(predicate)
134+
.flatMap(service -> serviceInstances(service, serviceId).stream())
135+
.collect(Collectors.toCollection(ArrayList::new));
136+
137+
if (properties.includeExternalNameServices()) {
138+
LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId);
139+
List<V1Service> externalNameServices = allServices.stream()
140+
.filter(s -> s.getSpec() != null)
141+
.filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType()))
142+
.toList();
143+
for (V1Service service : externalNameServices) {
144+
ServiceMetadata serviceMetadata = serviceMetadata(service);
145+
Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata,
146+
properties);
147+
148+
KubernetesClientInstanceIdHostPodNameSupplier supplierOne = externalName(service);
149+
ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne,
150+
serviceInstanceMetadata);
151+
serviceInstances.add(externalNameServiceInstance);
152+
}
153+
}
154+
155+
return serviceInstances;
156+
}
157+
158+
public abstract String description();
159+
160+
@Override
161+
public int getOrder() {
162+
return properties.order();
163+
}
164+
165+
@PostConstruct
166+
void afterPropertiesSet() {
167+
postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers);
168+
}
169+
170+
private List<ServiceInstance> serviceInstances(V1Service service, String serviceId) {
171+
172+
List<ServiceInstance> instances = new ArrayList<>();
173+
174+
List<V1Endpoints> allEndpoints = endpointsListers.stream()
175+
.map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId))
176+
.filter(Objects::nonNull)
177+
.toList();
178+
179+
for (V1Endpoints endpoints : allEndpoints) {
180+
List<V1EndpointSubset> subsets = endpoints.getSubsets();
181+
if (subsets == null || subsets.isEmpty()) {
182+
LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets");
183+
}
184+
else {
185+
ServiceMetadata serviceMetadata = serviceMetadata(service);
186+
Map<String, Integer> portsData = endpointSubsetsPortData(subsets);
187+
Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata,
188+
properties);
189+
190+
for (V1EndpointSubset endpointSubset : subsets) {
191+
192+
Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset));
193+
ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties);
194+
195+
List<V1EndpointAddress> addresses = addresses(endpointSubset, properties);
196+
for (V1EndpointAddress endpointAddress : addresses) {
197+
198+
KubernetesClientInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress,
199+
service);
200+
KubernetesClientPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api,
201+
service.getMetadata().getNamespace());
202+
203+
ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata,
204+
supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties);
205+
instances.add(serviceInstance);
206+
}
207+
}
208+
209+
}
210+
}
211+
212+
return instances;
213+
}
214+
215+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.client.discovery;
18+
19+
import java.util.List;
20+
import java.util.function.Predicate;
21+
22+
import io.kubernetes.client.informer.SharedInformer;
23+
import io.kubernetes.client.informer.SharedInformerFactory;
24+
import io.kubernetes.client.informer.cache.Lister;
25+
import io.kubernetes.client.openapi.apis.CoreV1Api;
26+
import io.kubernetes.client.openapi.models.V1Endpoints;
27+
import io.kubernetes.client.openapi.models.V1Service;
28+
29+
import org.springframework.cache.annotation.Cacheable;
30+
import org.springframework.cloud.client.ServiceInstance;
31+
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
32+
33+
/**
34+
* @author wind57
35+
*/
36+
class KubernetesClientCacheableInformerDiscoveryClient extends KubernetesClientBlockingAbstractInformerDiscoveryClient {
37+
38+
KubernetesClientCacheableInformerDiscoveryClient(List<SharedInformerFactory> sharedInformerFactories,
39+
List<Lister<V1Service>> serviceListers, List<Lister<V1Endpoints>> endpointsListers,
40+
List<SharedInformer<V1Service>> serviceInformers, List<SharedInformer<V1Endpoints>> endpointsInformers,
41+
KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate<V1Service> predicate) {
42+
super(sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers,
43+
properties, coreV1Api, predicate);
44+
}
45+
46+
@Override
47+
@Cacheable("k8s-native-blocking-discovery-services")
48+
public List<String> getServices() {
49+
return super.getServices();
50+
}
51+
52+
@Override
53+
@Cacheable("k8s-native-blocking-discovery-instances")
54+
public List<ServiceInstance> getInstances(String serviceId) {
55+
return super.getInstances(serviceId);
56+
}
57+
58+
@Override
59+
public String description() {
60+
return "Kubernetes Native Cacheable Discovery Client";
61+
}
62+
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.client.discovery;
18+
19+
import reactor.core.publisher.Flux;
20+
21+
import org.springframework.cache.annotation.Cacheable;
22+
import org.springframework.cloud.client.ServiceInstance;
23+
24+
class KubernetesClientCacheableInformerReactiveDiscoveryClient
25+
extends KubernetesClientReactiveAbstractInformerDiscoveryClient {
26+
27+
KubernetesClientCacheableInformerReactiveDiscoveryClient(
28+
KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient) {
29+
super(kubernetesDiscoveryClient);
30+
}
31+
32+
@Override
33+
@Cacheable("k8s-native-reactive-discovery-services")
34+
public Flux<String> getServices() {
35+
return super.getServices();
36+
}
37+
38+
@Override
39+
@Cacheable("k8s-native-reactive-discovery-instances")
40+
public Flux<ServiceInstance> getInstances(String serviceId) {
41+
return super.getInstances(serviceId);
42+
}
43+
44+
@Override
45+
public String description() {
46+
return "Kubernetes Native Cacheable Reactive Discovery Client";
47+
}
48+
49+
}

0 commit comments

Comments
 (0)