| 
 | 1 | +/*  | 
 | 2 | + * Copyright 2013-present the original author or authors.  | 
 | 3 | + *  | 
 | 4 | + * Licensed under the Apache License, Version 2.0 (the "License");  | 
 | 5 | + * you may not use this file except in compliance with the License.  | 
 | 6 | + * You may obtain a copy of the License at  | 
 | 7 | + *  | 
 | 8 | + *      https://www.apache.org/licenses/LICENSE-2.0  | 
 | 9 | + *  | 
 | 10 | + * Unless required by applicable law or agreed to in writing, software  | 
 | 11 | + * distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 13 | + * See the License for the specific language governing permissions and  | 
 | 14 | + * limitations under the License.  | 
 | 15 | + */  | 
 | 16 | + | 
 | 17 | +package org.springframework.cloud.kubernetes.client.discovery;  | 
 | 18 | + | 
 | 19 | +import java.util.ArrayList;  | 
 | 20 | +import java.util.List;  | 
 | 21 | +import java.util.Map;  | 
 | 22 | +import java.util.Objects;  | 
 | 23 | +import java.util.function.Predicate;  | 
 | 24 | +import java.util.function.Supplier;  | 
 | 25 | +import java.util.stream.Collectors;  | 
 | 26 | + | 
 | 27 | +import io.kubernetes.client.informer.SharedInformer;  | 
 | 28 | +import io.kubernetes.client.informer.SharedInformerFactory;  | 
 | 29 | +import io.kubernetes.client.informer.cache.Lister;  | 
 | 30 | +import io.kubernetes.client.openapi.apis.CoreV1Api;  | 
 | 31 | +import io.kubernetes.client.openapi.models.V1EndpointAddress;  | 
 | 32 | +import io.kubernetes.client.openapi.models.V1EndpointSubset;  | 
 | 33 | +import io.kubernetes.client.openapi.models.V1Endpoints;  | 
 | 34 | +import io.kubernetes.client.openapi.models.V1Service;  | 
 | 35 | +import jakarta.annotation.PostConstruct;  | 
 | 36 | +import org.apache.commons.logging.LogFactory;  | 
 | 37 | + | 
 | 38 | +import org.springframework.cloud.client.ServiceInstance;  | 
 | 39 | +import org.springframework.cloud.client.discovery.DiscoveryClient;  | 
 | 40 | +import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;  | 
 | 41 | +import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata;  | 
 | 42 | +import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber;  | 
 | 43 | +import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver;  | 
 | 44 | +import org.springframework.core.log.LogAccessor;  | 
 | 45 | + | 
 | 46 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.addresses;  | 
 | 47 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.endpointSubsetsPortData;  | 
 | 48 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.matchesServiceLabels;  | 
 | 49 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.postConstruct;  | 
 | 50 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.serviceMetadata;  | 
 | 51 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.externalName;  | 
 | 52 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.nonExternalName;  | 
 | 53 | +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientPodLabelsAndAnnotationsSupplier.nonExternalName;  | 
 | 54 | +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.endpointsPort;  | 
 | 55 | +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.externalNameServiceInstance;  | 
 | 56 | +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstance;  | 
 | 57 | +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstanceMetadata;  | 
 | 58 | +import static org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryConstants.EXTERNAL_NAME;  | 
 | 59 | + | 
 | 60 | +abstract class KubernetesClientBlockingAbstractInformerDiscoveryClient implements DiscoveryClient {  | 
 | 61 | + | 
 | 62 | +	private static final LogAccessor LOG = new LogAccessor(  | 
 | 63 | +			LogFactory.getLog(KubernetesClientBlockingAbstractInformerDiscoveryClient.class));  | 
 | 64 | + | 
 | 65 | +	private final List<SharedInformerFactory> sharedInformerFactories;  | 
 | 66 | + | 
 | 67 | +	private final List<Lister<V1Service>> serviceListers;  | 
 | 68 | + | 
 | 69 | +	private final List<Lister<V1Endpoints>> endpointsListers;  | 
 | 70 | + | 
 | 71 | +	private final Supplier<Boolean> informersReadyFunc;  | 
 | 72 | + | 
 | 73 | +	private final KubernetesDiscoveryProperties properties;  | 
 | 74 | + | 
 | 75 | +	private final Predicate<V1Service> predicate;  | 
 | 76 | + | 
 | 77 | +	private final ServicePortSecureResolver servicePortSecureResolver;  | 
 | 78 | + | 
 | 79 | +	private final CoreV1Api coreV1Api;  | 
 | 80 | + | 
 | 81 | +	KubernetesClientBlockingAbstractInformerDiscoveryClient(List<SharedInformerFactory> sharedInformerFactories,  | 
 | 82 | +			List<Lister<V1Service>> serviceListers, List<Lister<V1Endpoints>> endpointsListers,  | 
 | 83 | +			List<SharedInformer<V1Service>> serviceInformers, List<SharedInformer<V1Endpoints>> endpointsInformers,  | 
 | 84 | +			KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate<V1Service> predicate) {  | 
 | 85 | +		this.sharedInformerFactories = sharedInformerFactories;  | 
 | 86 | +		this.serviceListers = serviceListers;  | 
 | 87 | +		this.endpointsListers = endpointsListers;  | 
 | 88 | +		this.coreV1Api = coreV1Api;  | 
 | 89 | +		this.properties = properties;  | 
 | 90 | +		this.predicate = predicate;  | 
 | 91 | + | 
 | 92 | +		servicePortSecureResolver = new ServicePortSecureResolver(properties);  | 
 | 93 | + | 
 | 94 | +		this.informersReadyFunc = () -> {  | 
 | 95 | +			boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream()  | 
 | 96 | +				.map(SharedInformer::hasSynced)  | 
 | 97 | +				.reduce(Boolean::logicalAnd)  | 
 | 98 | +				.orElse(false);  | 
 | 99 | +			boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream()  | 
 | 100 | +				.map(SharedInformer::hasSynced)  | 
 | 101 | +				.reduce(Boolean::logicalAnd)  | 
 | 102 | +				.orElse(false);  | 
 | 103 | +			return serviceInformersReady && endpointsInformersReady;  | 
 | 104 | +		};  | 
 | 105 | + | 
 | 106 | +	}  | 
 | 107 | + | 
 | 108 | +	@Override  | 
 | 109 | +	public List<String> getServices() {  | 
 | 110 | +		List<String> services = serviceListers.stream()  | 
 | 111 | +			.flatMap(serviceLister -> serviceLister.list().stream())  | 
 | 112 | +			.filter(service -> matchesServiceLabels(service, properties))  | 
 | 113 | +			.filter(predicate)  | 
 | 114 | +			.map(s -> s.getMetadata().getName())  | 
 | 115 | +			.distinct()  | 
 | 116 | +			.toList();  | 
 | 117 | +		LOG.debug(() -> "will return services : " + services);  | 
 | 118 | +		return services;  | 
 | 119 | +	}  | 
 | 120 | + | 
 | 121 | +	@Override  | 
 | 122 | +	public List<ServiceInstance> getInstances(String serviceId) {  | 
 | 123 | +		Objects.requireNonNull(serviceId, "serviceId must be provided");  | 
 | 124 | + | 
 | 125 | +		List<V1Service> allServices = serviceListers.stream()  | 
 | 126 | +			.flatMap(x -> x.list().stream())  | 
 | 127 | +			.filter(scv -> scv.getMetadata() != null)  | 
 | 128 | +			.filter(svc -> serviceId.equals(svc.getMetadata().getName()))  | 
 | 129 | +			.filter(scv -> matchesServiceLabels(scv, properties))  | 
 | 130 | +			.toList();  | 
 | 131 | + | 
 | 132 | +		List<ServiceInstance> serviceInstances = allServices.stream()  | 
 | 133 | +			.filter(predicate)  | 
 | 134 | +			.flatMap(service -> serviceInstances(service, serviceId).stream())  | 
 | 135 | +			.collect(Collectors.toCollection(ArrayList::new));  | 
 | 136 | + | 
 | 137 | +		if (properties.includeExternalNameServices()) {  | 
 | 138 | +			LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId);  | 
 | 139 | +			List<V1Service> externalNameServices = allServices.stream()  | 
 | 140 | +				.filter(s -> s.getSpec() != null)  | 
 | 141 | +				.filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType()))  | 
 | 142 | +				.toList();  | 
 | 143 | +			for (V1Service service : externalNameServices) {  | 
 | 144 | +				ServiceMetadata serviceMetadata = serviceMetadata(service);  | 
 | 145 | +				Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata,  | 
 | 146 | +						properties);  | 
 | 147 | + | 
 | 148 | +				KubernetesClientInstanceIdHostPodNameSupplier supplierOne = externalName(service);  | 
 | 149 | +				ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne,  | 
 | 150 | +						serviceInstanceMetadata);  | 
 | 151 | +				serviceInstances.add(externalNameServiceInstance);  | 
 | 152 | +			}  | 
 | 153 | +		}  | 
 | 154 | + | 
 | 155 | +		return serviceInstances;  | 
 | 156 | +	}  | 
 | 157 | + | 
 | 158 | +	public abstract String description();  | 
 | 159 | + | 
 | 160 | +	@Override  | 
 | 161 | +	public int getOrder() {  | 
 | 162 | +		return properties.order();  | 
 | 163 | +	}  | 
 | 164 | + | 
 | 165 | +	@PostConstruct  | 
 | 166 | +	void afterPropertiesSet() {  | 
 | 167 | +		postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers);  | 
 | 168 | +	}  | 
 | 169 | + | 
 | 170 | +	private List<ServiceInstance> serviceInstances(V1Service service, String serviceId) {  | 
 | 171 | + | 
 | 172 | +		List<ServiceInstance> instances = new ArrayList<>();  | 
 | 173 | + | 
 | 174 | +		List<V1Endpoints> allEndpoints = endpointsListers.stream()  | 
 | 175 | +			.map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId))  | 
 | 176 | +			.filter(Objects::nonNull)  | 
 | 177 | +			.toList();  | 
 | 178 | + | 
 | 179 | +		for (V1Endpoints endpoints : allEndpoints) {  | 
 | 180 | +			List<V1EndpointSubset> subsets = endpoints.getSubsets();  | 
 | 181 | +			if (subsets == null || subsets.isEmpty()) {  | 
 | 182 | +				LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets");  | 
 | 183 | +			}  | 
 | 184 | +			else {  | 
 | 185 | +				ServiceMetadata serviceMetadata = serviceMetadata(service);  | 
 | 186 | +				Map<String, Integer> portsData = endpointSubsetsPortData(subsets);  | 
 | 187 | +				Map<String, String> serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata,  | 
 | 188 | +						properties);  | 
 | 189 | + | 
 | 190 | +				for (V1EndpointSubset endpointSubset : subsets) {  | 
 | 191 | + | 
 | 192 | +					Map<String, Integer> endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset));  | 
 | 193 | +					ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties);  | 
 | 194 | + | 
 | 195 | +					List<V1EndpointAddress> addresses = addresses(endpointSubset, properties);  | 
 | 196 | +					for (V1EndpointAddress endpointAddress : addresses) {  | 
 | 197 | + | 
 | 198 | +						KubernetesClientInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress,  | 
 | 199 | +								service);  | 
 | 200 | +						KubernetesClientPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api,  | 
 | 201 | +								service.getMetadata().getNamespace());  | 
 | 202 | + | 
 | 203 | +						ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata,  | 
 | 204 | +								supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties);  | 
 | 205 | +						instances.add(serviceInstance);  | 
 | 206 | +					}  | 
 | 207 | +				}  | 
 | 208 | + | 
 | 209 | +			}  | 
 | 210 | +		}  | 
 | 211 | + | 
 | 212 | +		return instances;  | 
 | 213 | +	}  | 
 | 214 | + | 
 | 215 | +}  | 
0 commit comments