Skip to content

Commit f2bc8e8

Browse files
committed
wip
Signed-off-by: wind57 <[email protected]>
1 parent 11cd7c8 commit f2bc8e8

File tree

3 files changed

+259
-164
lines changed

3 files changed

+259
-164
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package org.springframework.cloud.kubernetes.client.discovery;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.Objects;
7+
import java.util.function.Predicate;
8+
import java.util.function.Supplier;
9+
import java.util.stream.Collectors;
10+
11+
import io.kubernetes.client.informer.SharedInformer;
12+
import io.kubernetes.client.informer.SharedInformerFactory;
13+
import io.kubernetes.client.informer.cache.Lister;
14+
import io.kubernetes.client.openapi.apis.CoreV1Api;
15+
import io.kubernetes.client.openapi.models.V1EndpointAddress;
16+
import io.kubernetes.client.openapi.models.V1EndpointSubset;
17+
import io.kubernetes.client.openapi.models.V1Endpoints;
18+
import io.kubernetes.client.openapi.models.V1Service;
19+
import jakarta.annotation.PostConstruct;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import org.springframework.cloud.client.ServiceInstance;
23+
import org.springframework.cloud.client.discovery.DiscoveryClient;
24+
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
25+
import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata;
26+
import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber;
27+
import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver;
28+
import org.springframework.core.log.LogAccessor;
29+
30+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.addresses;
31+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.endpointSubsetsPortData;
32+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.matchesServiceLabels;
33+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.postConstruct;
34+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.serviceMetadata;
35+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.externalName;
36+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.nonExternalName;
37+
import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientPodLabelsAndAnnotationsSupplier.nonExternalName;
38+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.endpointsPort;
39+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.externalNameServiceInstance;
40+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstance;
41+
import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstanceMetadata;
42+
import static org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryConstants.EXTERNAL_NAME;
43+
44+
abstract class KubernetesClientAbstractInformerDiscoveryClient implements DiscoveryClient {
45+
46+
private static final LogAccessor LOG = new LogAccessor(
47+
LogFactory.getLog(KubernetesClientAbstractInformerDiscoveryClient.class));
48+
49+
private final List<SharedInformerFactory> sharedInformerFactories;
50+
51+
private final List<Lister<V1Service>> serviceListers;
52+
53+
private final List<Lister<V1Endpoints>> endpointsListers;
54+
55+
private final Supplier<Boolean> informersReadyFunc;
56+
57+
private final KubernetesDiscoveryProperties properties;
58+
59+
private final Predicate<V1Service> predicate;
60+
61+
private final ServicePortSecureResolver servicePortSecureResolver;
62+
63+
private final CoreV1Api coreV1Api;
64+
65+
KubernetesClientAbstractInformerDiscoveryClient(List<SharedInformerFactory> sharedInformerFactories,
66+
List<Lister<V1Service>> serviceListers, List<Lister<V1Endpoints>> endpointsListers,
67+
List<SharedInformer<V1Service>> serviceInformers, List<SharedInformer<V1Endpoints>> endpointsInformers,
68+
KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate<V1Service> predicate) {
69+
this.sharedInformerFactories = sharedInformerFactories;
70+
this.serviceListers = serviceListers;
71+
this.endpointsListers = endpointsListers;
72+
this.coreV1Api = coreV1Api;
73+
this.properties = properties;
74+
this.predicate = predicate;
75+
76+
servicePortSecureResolver = new ServicePortSecureResolver(properties);
77+
78+
this.informersReadyFunc = () -> {
79+
boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream()
80+
.map(SharedInformer::hasSynced)
81+
.reduce(Boolean::logicalAnd)
82+
.orElse(false);
83+
boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream()
84+
.map(SharedInformer::hasSynced)
85+
.reduce(Boolean::logicalAnd)
86+
.orElse(false);
87+
return serviceInformersReady && endpointsInformersReady;
88+
};
89+
90+
}
91+
92+
@Override
93+
public List<String> getServices() {
94+
List<String> services = serviceListers.stream()
95+
.flatMap(serviceLister -> serviceLister.list().stream())
96+
.filter(service -> matchesServiceLabels(service, properties))
97+
.filter(predicate)
98+
.map(s -> s.getMetadata().getName())
99+
.distinct()
100+
.toList();
101+
LOG.debug(() -> "will return services : " + services);
102+
return services;
103+
}
104+
105+
@Override
106+
public List<ServiceInstance> getInstances(String serviceId) {
107+
Objects.requireNonNull(serviceId, "serviceId must be provided");
108+
109+
List<V1Service> allServices = serviceListers.stream()
110+
.flatMap(x -> x.list().stream())
111+
.filter(scv -> scv.getMetadata() != null)
112+
.filter(svc -> serviceId.equals(svc.getMetadata().getName()))
113+
.filter(scv -> matchesServiceLabels(scv, properties))
114+
.toList();
115+
116+
List<ServiceInstance> serviceInstances = allServices.stream()
117+
.filter(predicate)
118+
.flatMap(service -> serviceInstances(service, serviceId).stream())
119+
.collect(Collectors.toCollection(ArrayList::new));
120+
121+
if (properties.includeExternalNameServices()) {
122+
LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId);
123+
List<V1Service> externalNameServices = allServices.stream()
124+
.filter(s -> s.getSpec() != null)
125+
.filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType()))
126+
.toList();
127+
for (V1Service service : externalNameServices) {
128+
ServiceMetadata serviceMetadata = serviceMetadata(service);
129+
Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata,
130+
properties);
131+
132+
KubernetesClientInstanceIdHostPodNameSupplier supplierOne = externalName(service);
133+
ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne,
134+
serviceInstanceMetadata);
135+
serviceInstances.add(externalNameServiceInstance);
136+
}
137+
}
138+
139+
return serviceInstances;
140+
}
141+
142+
public abstract String description();
143+
144+
@Override
145+
public int getOrder() {
146+
return properties.order();
147+
}
148+
149+
@PostConstruct
150+
void localPostConstruct() {
151+
postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers);
152+
}
153+
154+
private List<ServiceInstance> serviceInstances(V1Service service, String serviceId) {
155+
156+
List<ServiceInstance> instances = new ArrayList<>();
157+
158+
List<V1Endpoints> allEndpoints = endpointsListers.stream()
159+
.map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId))
160+
.filter(Objects::nonNull)
161+
.toList();
162+
163+
for (V1Endpoints endpoints : allEndpoints) {
164+
List<V1EndpointSubset> subsets = endpoints.getSubsets();
165+
if (subsets == null || subsets.isEmpty()) {
166+
LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets");
167+
}
168+
else {
169+
ServiceMetadata serviceMetadata = serviceMetadata(service);
170+
Map<String, Integer> portsData = endpointSubsetsPortData(subsets);
171+
Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata,
172+
properties);
173+
174+
for (V1EndpointSubset endpointSubset : subsets) {
175+
176+
Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset));
177+
ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties);
178+
179+
List<V1EndpointAddress> addresses = addresses(endpointSubset, properties);
180+
for (V1EndpointAddress endpointAddress : addresses) {
181+
182+
KubernetesClientInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress,
183+
service);
184+
KubernetesClientPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api,
185+
service.getMetadata().getNamespace());
186+
187+
ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata,
188+
supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties);
189+
instances.add(serviceInstance);
190+
}
191+
}
192+
193+
}
194+
}
195+
196+
return instances;
197+
}
198+
199+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.springframework.cloud.kubernetes.client.discovery;
2+
3+
import java.util.List;
4+
import java.util.function.Predicate;
5+
6+
import io.kubernetes.client.informer.SharedInformer;
7+
import io.kubernetes.client.informer.SharedInformerFactory;
8+
import io.kubernetes.client.informer.cache.Lister;
9+
import io.kubernetes.client.openapi.apis.CoreV1Api;
10+
import io.kubernetes.client.openapi.models.V1Endpoints;
11+
import io.kubernetes.client.openapi.models.V1Service;
12+
13+
import org.springframework.cache.annotation.Cacheable;
14+
import org.springframework.cloud.client.ServiceInstance;
15+
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
16+
17+
/**
18+
* @author wind57
19+
*/
20+
class KubernetesClientCacheableInformerDiscoveryClient extends KubernetesClientAbstractInformerDiscoveryClient {
21+
22+
KubernetesClientCacheableInformerDiscoveryClient(List<SharedInformerFactory> sharedInformerFactories,
23+
List<Lister<V1Service>> serviceListers, List<Lister<V1Endpoints>> endpointsListers,
24+
List<SharedInformer<V1Service>> serviceInformers, List<SharedInformer<V1Endpoints>> endpointsInformers,
25+
KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate<V1Service> predicate) {
26+
super(sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers,
27+
properties, coreV1Api, predicate);
28+
}
29+
30+
@Override
31+
@Cacheable("k8s-native-blocking-discovery-services")
32+
public List<String> getServices() {
33+
return super.getServices();
34+
}
35+
36+
@Override
37+
@Cacheable("k8s-native-blocking-discovery-instances")
38+
public List<ServiceInstance> getInstances(String serviceId) {
39+
return super.getInstances(serviceId);
40+
}
41+
42+
@Override
43+
public String description() {
44+
return "Kubernetes Native Discovery Client";
45+
}
46+
47+
@Override
48+
public int getOrder() {
49+
return super.getOrder();
50+
}
51+
}

0 commit comments

Comments
 (0)