diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientBlockingAbstractInformerDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientBlockingAbstractInformerDiscoveryClient.java new file mode 100644 index 000000000..a63272f49 --- /dev/null +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientBlockingAbstractInformerDiscoveryClient.java @@ -0,0 +1,215 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.client.discovery; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import io.kubernetes.client.informer.SharedInformer; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.informer.cache.Lister; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1EndpointAddress; +import io.kubernetes.client.openapi.models.V1EndpointSubset; +import io.kubernetes.client.openapi.models.V1Endpoints; +import io.kubernetes.client.openapi.models.V1Service; +import jakarta.annotation.PostConstruct; +import org.apache.commons.logging.LogFactory; + +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.DiscoveryClient; +import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; +import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata; +import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber; +import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver; +import org.springframework.core.log.LogAccessor; + +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.addresses; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.endpointSubsetsPortData; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.matchesServiceLabels; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.postConstruct; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.serviceMetadata; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.externalName; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.nonExternalName; +import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientPodLabelsAndAnnotationsSupplier.nonExternalName; +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.endpointsPort; +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.externalNameServiceInstance; +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstance; +import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstanceMetadata; +import static org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryConstants.EXTERNAL_NAME; + +abstract class KubernetesClientBlockingAbstractInformerDiscoveryClient implements DiscoveryClient { + + private static final LogAccessor LOG = new LogAccessor( + LogFactory.getLog(KubernetesClientBlockingAbstractInformerDiscoveryClient.class)); + + private final List sharedInformerFactories; + + private final List> serviceListers; + + private final List> endpointsListers; + + private final Supplier informersReadyFunc; + + private final KubernetesDiscoveryProperties properties; + + private final Predicate predicate; + + private final ServicePortSecureResolver servicePortSecureResolver; + + private final CoreV1Api coreV1Api; + + KubernetesClientBlockingAbstractInformerDiscoveryClient(List sharedInformerFactories, + List> serviceListers, List> endpointsListers, + List> serviceInformers, List> endpointsInformers, + KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate predicate) { + this.sharedInformerFactories = sharedInformerFactories; + this.serviceListers = serviceListers; + this.endpointsListers = endpointsListers; + this.coreV1Api = coreV1Api; + this.properties = properties; + this.predicate = predicate; + + servicePortSecureResolver = new ServicePortSecureResolver(properties); + + this.informersReadyFunc = () -> { + boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream() + .map(SharedInformer::hasSynced) + .reduce(Boolean::logicalAnd) + .orElse(false); + boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream() + .map(SharedInformer::hasSynced) + .reduce(Boolean::logicalAnd) + .orElse(false); + return serviceInformersReady && endpointsInformersReady; + }; + + } + + @Override + public List getServices() { + List services = serviceListers.stream() + .flatMap(serviceLister -> serviceLister.list().stream()) + .filter(service -> matchesServiceLabels(service, properties)) + .filter(predicate) + .map(s -> s.getMetadata().getName()) + .distinct() + .toList(); + LOG.debug(() -> "will return services : " + services); + return services; + } + + @Override + public List getInstances(String serviceId) { + Objects.requireNonNull(serviceId, "serviceId must be provided"); + + List allServices = serviceListers.stream() + .flatMap(x -> x.list().stream()) + .filter(scv -> scv.getMetadata() != null) + .filter(svc -> serviceId.equals(svc.getMetadata().getName())) + .filter(scv -> matchesServiceLabels(scv, properties)) + .toList(); + + List serviceInstances = allServices.stream() + .filter(predicate) + .flatMap(service -> serviceInstances(service, serviceId).stream()) + .collect(Collectors.toCollection(ArrayList::new)); + + if (properties.includeExternalNameServices()) { + LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId); + List externalNameServices = allServices.stream() + .filter(s -> s.getSpec() != null) + .filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType())) + .toList(); + for (V1Service service : externalNameServices) { + ServiceMetadata serviceMetadata = serviceMetadata(service); + Map serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata, + properties); + + KubernetesClientInstanceIdHostPodNameSupplier supplierOne = externalName(service); + ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne, + serviceInstanceMetadata); + serviceInstances.add(externalNameServiceInstance); + } + } + + return serviceInstances; + } + + public abstract String description(); + + @Override + public int getOrder() { + return properties.order(); + } + + @PostConstruct + void afterPropertiesSet() { + postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers); + } + + private List serviceInstances(V1Service service, String serviceId) { + + List instances = new ArrayList<>(); + + List allEndpoints = endpointsListers.stream() + .map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId)) + .filter(Objects::nonNull) + .toList(); + + for (V1Endpoints endpoints : allEndpoints) { + List subsets = endpoints.getSubsets(); + if (subsets == null || subsets.isEmpty()) { + LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets"); + } + else { + ServiceMetadata serviceMetadata = serviceMetadata(service); + Map portsData = endpointSubsetsPortData(subsets); + Map serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata, + properties); + + for (V1EndpointSubset endpointSubset : subsets) { + + Map endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset)); + ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties); + + List addresses = addresses(endpointSubset, properties); + for (V1EndpointAddress endpointAddress : addresses) { + + KubernetesClientInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress, + service); + KubernetesClientPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api, + service.getMetadata().getNamespace()); + + ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata, + supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties); + instances.add(serviceInstance); + } + } + + } + } + + return instances; + } + +} diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerDiscoveryClient.java new file mode 100644 index 000000000..28550a645 --- /dev/null +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerDiscoveryClient.java @@ -0,0 +1,63 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.client.discovery; + +import java.util.List; +import java.util.function.Predicate; + +import io.kubernetes.client.informer.SharedInformer; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.informer.cache.Lister; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Endpoints; +import io.kubernetes.client.openapi.models.V1Service; + +import org.springframework.cache.annotation.Cacheable; +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; + +/** + * @author wind57 + */ +class KubernetesClientCacheableInformerDiscoveryClient extends KubernetesClientBlockingAbstractInformerDiscoveryClient { + + KubernetesClientCacheableInformerDiscoveryClient(List sharedInformerFactories, + List> serviceListers, List> endpointsListers, + List> serviceInformers, List> endpointsInformers, + KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate predicate) { + super(sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers, + properties, coreV1Api, predicate); + } + + @Override + @Cacheable("k8s-native-blocking-discovery-services") + public List getServices() { + return super.getServices(); + } + + @Override + @Cacheable("k8s-native-blocking-discovery-instances") + public List getInstances(String serviceId) { + return super.getInstances(serviceId); + } + + @Override + public String description() { + return "Kubernetes Native Cacheable Discovery Client"; + } + +} diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerReactiveDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerReactiveDiscoveryClient.java new file mode 100644 index 000000000..c4a8249d3 --- /dev/null +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientCacheableInformerReactiveDiscoveryClient.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.client.discovery; + +import reactor.core.publisher.Flux; + +import org.springframework.cache.annotation.Cacheable; +import org.springframework.cloud.client.ServiceInstance; + +class KubernetesClientCacheableInformerReactiveDiscoveryClient + extends KubernetesClientReactiveAbstractInformerDiscoveryClient { + + KubernetesClientCacheableInformerReactiveDiscoveryClient( + KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient) { + super(kubernetesDiscoveryClient); + } + + @Override + @Cacheable("k8s-native-reactive-discovery-services") + public Flux getServices() { + return super.getServices(); + } + + @Override + @Cacheable("k8s-native-reactive-discovery-instances") + public Flux getInstances(String serviceId) { + return super.getInstances(serviceId); + } + + @Override + public String description() { + return "Kubernetes Native Cacheable Reactive Discovery Client"; + } + +} diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClient.java index 1663a6e2b..c69ace9fc 100644 --- a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClient.java +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClient.java @@ -16,208 +16,37 @@ package org.springframework.cloud.kubernetes.client.discovery; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; import io.kubernetes.client.informer.SharedInformer; import io.kubernetes.client.informer.SharedInformerFactory; import io.kubernetes.client.informer.cache.Lister; import io.kubernetes.client.openapi.apis.CoreV1Api; -import io.kubernetes.client.openapi.models.V1EndpointAddress; -import io.kubernetes.client.openapi.models.V1EndpointSubset; import io.kubernetes.client.openapi.models.V1Endpoints; import io.kubernetes.client.openapi.models.V1Service; -import jakarta.annotation.PostConstruct; -import org.apache.commons.logging.LogFactory; -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; -import org.springframework.cloud.kubernetes.commons.discovery.ServiceMetadata; -import org.springframework.cloud.kubernetes.commons.discovery.ServicePortNameAndNumber; -import org.springframework.cloud.kubernetes.commons.discovery.ServicePortSecureResolver; -import org.springframework.core.log.LogAccessor; - -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.addresses; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.endpointSubsetsPortData; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.matchesServiceLabels; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.postConstruct; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientDiscoveryClientUtils.serviceMetadata; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.externalName; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientInstanceIdHostPodNameSupplier.nonExternalName; -import static org.springframework.cloud.kubernetes.client.discovery.KubernetesClientPodLabelsAndAnnotationsSupplier.nonExternalName; -import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.endpointsPort; -import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.externalNameServiceInstance; -import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstance; -import static org.springframework.cloud.kubernetes.commons.discovery.DiscoveryClientUtils.serviceInstanceMetadata; -import static org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryConstants.EXTERNAL_NAME; /** * @author Min Kim * @author Ryan Baxter * @author Tim Yysewyn */ -class KubernetesClientInformerDiscoveryClient implements DiscoveryClient { - - private static final LogAccessor LOG = new LogAccessor( - LogFactory.getLog(KubernetesClientInformerDiscoveryClient.class)); - - private final List sharedInformerFactories; - - private final List> serviceListers; - - private final List> endpointsListers; - - private final Supplier informersReadyFunc; - - private final KubernetesDiscoveryProperties properties; - - private final Predicate predicate; - - private final ServicePortSecureResolver servicePortSecureResolver; - - private final CoreV1Api coreV1Api; +class KubernetesClientInformerDiscoveryClient extends KubernetesClientBlockingAbstractInformerDiscoveryClient { KubernetesClientInformerDiscoveryClient(List sharedInformerFactories, List> serviceListers, List> endpointsListers, List> serviceInformers, List> endpointsInformers, KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate predicate) { - this.sharedInformerFactories = sharedInformerFactories; - this.serviceListers = serviceListers; - this.endpointsListers = endpointsListers; - this.coreV1Api = coreV1Api; - this.properties = properties; - this.predicate = predicate; - - servicePortSecureResolver = new ServicePortSecureResolver(properties); - - this.informersReadyFunc = () -> { - boolean serviceInformersReady = serviceInformers.isEmpty() || serviceInformers.stream() - .map(SharedInformer::hasSynced) - .reduce(Boolean::logicalAnd) - .orElse(false); - boolean endpointsInformersReady = endpointsInformers.isEmpty() || endpointsInformers.stream() - .map(SharedInformer::hasSynced) - .reduce(Boolean::logicalAnd) - .orElse(false); - return serviceInformersReady && endpointsInformersReady; - }; + super(sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers, + properties, coreV1Api, predicate); } @Override public String description() { - return "Kubernetes Client Discovery"; - } - - @Override - public List getInstances(String serviceId) { - Objects.requireNonNull(serviceId, "serviceId must be provided"); - - List allServices = serviceListers.stream() - .flatMap(x -> x.list().stream()) - .filter(scv -> scv.getMetadata() != null) - .filter(svc -> serviceId.equals(svc.getMetadata().getName())) - .filter(scv -> matchesServiceLabels(scv, properties)) - .toList(); - - List serviceInstances = allServices.stream() - .filter(predicate) - .flatMap(service -> serviceInstances(service, serviceId).stream()) - .collect(Collectors.toCollection(ArrayList::new)); - - if (properties.includeExternalNameServices()) { - LOG.debug(() -> "Searching for 'ExternalName' type of services with serviceId : " + serviceId); - List externalNameServices = allServices.stream() - .filter(s -> s.getSpec() != null) - .filter(s -> EXTERNAL_NAME.equals(s.getSpec().getType())) - .toList(); - for (V1Service service : externalNameServices) { - ServiceMetadata serviceMetadata = serviceMetadata(service); - Map serviceInstanceMetadata = serviceInstanceMetadata(Map.of(), serviceMetadata, - properties); - - KubernetesClientInstanceIdHostPodNameSupplier supplierOne = externalName(service); - ServiceInstance externalNameServiceInstance = externalNameServiceInstance(serviceMetadata, supplierOne, - serviceInstanceMetadata); - serviceInstances.add(externalNameServiceInstance); - } - } - - return serviceInstances; - } - - private List serviceInstances(V1Service service, String serviceId) { - - List instances = new ArrayList<>(); - - List allEndpoints = endpointsListers.stream() - .map(endpointsLister -> endpointsLister.namespace(service.getMetadata().getNamespace()).get(serviceId)) - .filter(Objects::nonNull) - .toList(); - - for (V1Endpoints endpoints : allEndpoints) { - List subsets = endpoints.getSubsets(); - if (subsets == null || subsets.isEmpty()) { - LOG.debug(() -> "serviceId : " + serviceId + " does not have any subsets"); - } - else { - ServiceMetadata serviceMetadata = serviceMetadata(service); - Map portsData = endpointSubsetsPortData(subsets); - Map serviceInstanceMetadata = serviceInstanceMetadata(portsData, serviceMetadata, - properties); - - for (V1EndpointSubset endpointSubset : subsets) { - - Map endpointsPortData = endpointSubsetsPortData(List.of(endpointSubset)); - ServicePortNameAndNumber portData = endpointsPort(endpointsPortData, serviceMetadata, properties); - - List addresses = addresses(endpointSubset, properties); - for (V1EndpointAddress endpointAddress : addresses) { - - KubernetesClientInstanceIdHostPodNameSupplier supplierOne = nonExternalName(endpointAddress, - service); - KubernetesClientPodLabelsAndAnnotationsSupplier supplierTwo = nonExternalName(coreV1Api, - service.getMetadata().getNamespace()); - - ServiceInstance serviceInstance = serviceInstance(servicePortSecureResolver, serviceMetadata, - supplierOne, supplierTwo, portData, serviceInstanceMetadata, properties); - instances.add(serviceInstance); - } - } - - } - } - - return instances; - } - - @Override - public List getServices() { - List services = serviceListers.stream() - .flatMap(serviceLister -> serviceLister.list().stream()) - .filter(service -> matchesServiceLabels(service, properties)) - .filter(predicate) - .map(s -> s.getMetadata().getName()) - .distinct() - .toList(); - LOG.debug(() -> "will return services : " + services); - return services; - } - - @Override - public int getOrder() { - return properties.order(); - } - - @PostConstruct - void afterPropertiesSet() { - postConstruct(sharedInformerFactories, properties, informersReadyFunc, serviceListers); + return "Kubernetes Native Discovery Client"; } } diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfiguration.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfiguration.java index 4dda10ced..be4ba9533 100644 --- a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfiguration.java +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfiguration.java @@ -37,6 +37,8 @@ import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryClientHealthIndicatorInitializer; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryPropertiesAutoConfiguration; +import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnDiscoveryCacheableBlockingDisabled; +import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnDiscoveryCacheableBlockingEnabled; import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnSpringCloudKubernetesBlockingDiscovery; import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnSpringCloudKubernetesBlockingDiscoveryHealthInitializer; import org.springframework.context.ApplicationEventPublisher; @@ -59,6 +61,7 @@ final class KubernetesClientInformerDiscoveryClientAutoConfiguration { @Bean @ConditionalOnMissingBean + @ConditionalOnDiscoveryCacheableBlockingDisabled KubernetesClientInformerDiscoveryClient kubernetesClientInformerDiscoveryClient( List sharedInformerFactories, List> serviceListers, List> endpointsListers, List> serviceInformers, @@ -68,6 +71,18 @@ KubernetesClientInformerDiscoveryClient kubernetesClientInformerDiscoveryClient( serviceInformers, endpointsInformers, properties, coreV1Api, predicate); } + @Bean + @ConditionalOnMissingBean + @ConditionalOnDiscoveryCacheableBlockingEnabled + KubernetesClientCacheableInformerDiscoveryClient kubernetesClientCacheableInformerDiscoveryClient( + List sharedInformerFactories, List> serviceListers, + List> endpointsListers, List> serviceInformers, + List> endpointsInformers, KubernetesDiscoveryProperties properties, + CoreV1Api coreV1Api, Predicate predicate) { + return new KubernetesClientCacheableInformerDiscoveryClient(sharedInformerFactories, serviceListers, + endpointsListers, serviceInformers, endpointsInformers, properties, coreV1Api, predicate); + } + @Bean @ConditionalOnSpringCloudKubernetesBlockingDiscoveryHealthInitializer KubernetesDiscoveryClientHealthIndicatorInitializer indicatorInitializer( diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClient.java index f9a90f713..962b4c03f 100644 --- a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClient.java +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClient.java @@ -16,24 +16,15 @@ package org.springframework.cloud.kubernetes.client.discovery; -import java.util.Objects; - -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -import org.springframework.cloud.client.ServiceInstance; -import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; - /** * @author Ryan Baxter */ -public final class KubernetesClientInformerReactiveDiscoveryClient implements ReactiveDiscoveryClient { - - private final KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient; +public final class KubernetesClientInformerReactiveDiscoveryClient + extends KubernetesClientReactiveAbstractInformerDiscoveryClient { public KubernetesClientInformerReactiveDiscoveryClient( KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient) { - this.kubernetesDiscoveryClient = kubernetesDiscoveryClient; + super(kubernetesDiscoveryClient); } @Override @@ -41,22 +32,4 @@ public String description() { return "Kubernetes Reactive Discovery Client"; } - @Override - public Flux getInstances(String serviceId) { - Objects.requireNonNull(serviceId, "serviceId must be provided"); - return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getInstances(serviceId))) - .subscribeOn(Schedulers.boundedElastic()); - } - - @Override - public Flux getServices() { - return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getServices())) - .subscribeOn(Schedulers.boundedElastic()); - } - - @Override - public int getOrder() { - return kubernetesDiscoveryClient.getOrder(); - } - } diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration.java index 70666fca0..531bc6e16 100644 --- a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration.java +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration.java @@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ReactiveCommonsClientAutoConfiguration; import org.springframework.cloud.client.discovery.composite.reactive.ReactiveCompositeDiscoveryClientAutoConfiguration; @@ -39,6 +40,8 @@ import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryClientHealthIndicatorInitializer; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryPropertiesAutoConfiguration; +import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnDiscoveryCacheableReactiveDisabled; +import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnDiscoveryCacheableReactiveEnabled; import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnSpringCloudKubernetesReactiveDiscovery; import org.springframework.cloud.kubernetes.commons.discovery.conditionals.ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer; import org.springframework.context.ApplicationEventPublisher; @@ -62,26 +65,80 @@ final class KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration { private static final LogAccessor LOG = new LogAccessor( LogFactory.getLog(KubernetesClientInformerReactiveDiscoveryClientAutoConfiguration.class)); - // in case blocking is disabled @Bean @ConditionalOnMissingBean - KubernetesClientInformerDiscoveryClient kubernetesClientInformerDiscoveryClientForReactiveImplementation( + @ConditionalOnDiscoveryCacheableReactiveDisabled + KubernetesClientInformerReactiveDiscoveryClient kubernetesClientReactiveDiscoveryClient( List sharedInformerFactories, List> serviceListers, List> endpointsListers, List> serviceInformers, List> endpointsInformers, KubernetesDiscoveryProperties properties, CoreV1Api coreV1Api, Predicate predicate) { - return new KubernetesClientInformerDiscoveryClient(sharedInformerFactories, serviceListers, endpointsListers, - serviceInformers, endpointsInformers, properties, coreV1Api, predicate); + KubernetesClientInformerDiscoveryClient blockingClient = new KubernetesClientInformerDiscoveryClient( + sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers, + properties, coreV1Api, predicate); + blockingClient.afterPropertiesSet(); + + return new KubernetesClientInformerReactiveDiscoveryClient(blockingClient); } + @Bean + @ConditionalOnBean(KubernetesClientInformerReactiveDiscoveryClient.class) + @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer + ReactiveDiscoveryClientHealthIndicator kubernetesReactiveDiscoveryClientHealthIndicator( + KubernetesClientInformerReactiveDiscoveryClient reactiveClient, + DiscoveryClientHealthIndicatorProperties properties) { + return new ReactiveDiscoveryClientHealthIndicator(reactiveClient, properties); + } + + // Above two beans are created when cacheable is disabled + @Bean @ConditionalOnMissingBean - KubernetesClientInformerReactiveDiscoveryClient kubernetesClientReactiveDiscoveryClient( - KubernetesClientInformerDiscoveryClient kubernetesClientInformerDiscoveryClient) { - return new KubernetesClientInformerReactiveDiscoveryClient(kubernetesClientInformerDiscoveryClient); + @ConditionalOnDiscoveryCacheableReactiveEnabled + KubernetesClientCacheableInformerReactiveDiscoveryClient kubernetesClientCacheableReactiveDiscoveryClient( + List sharedInformerFactories, List> serviceListers, + List> endpointsListers, List> serviceInformers, + List> endpointsInformers, KubernetesDiscoveryProperties properties, + CoreV1Api coreV1Api, Predicate predicate) { + + KubernetesClientInformerDiscoveryClient blockingClient = new KubernetesClientInformerDiscoveryClient( + sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers, + properties, coreV1Api, predicate); + blockingClient.afterPropertiesSet(); + + return new KubernetesClientCacheableInformerReactiveDiscoveryClient(blockingClient); + } + + @Bean + @ConditionalOnMissingBean(KubernetesClientInformerReactiveDiscoveryClient.class) + @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer + ReactiveDiscoveryClientHealthIndicator reactiveDiscoveryClientHealthIndicator( + List sharedInformerFactories, List> serviceListers, + List> endpointsListers, List> serviceInformers, + List> endpointsInformers, + KubernetesDiscoveryProperties kubernetesDiscoveryProperties, CoreV1Api coreV1Api, + Predicate predicate, DiscoveryClientHealthIndicatorProperties properties) { + + KubernetesClientInformerDiscoveryClient blockingClient = new KubernetesClientInformerDiscoveryClient( + sharedInformerFactories, serviceListers, endpointsListers, serviceInformers, endpointsInformers, + kubernetesDiscoveryProperties, coreV1Api, predicate); + blockingClient.afterPropertiesSet(); + + KubernetesClientInformerReactiveDiscoveryClient reactiveClient = new KubernetesClientInformerReactiveDiscoveryClient( + blockingClient); + + return new ReactiveDiscoveryClientHealthIndicator(reactiveClient, properties); } + // Above two beans are created when cacheable is enabled. In this case, we can't make + // KubernetesClientInformerDiscoveryClient a @Bean, since blocking discovery might be + // disabled and we do not want to allow wiring of it. + // Nevertheless, we still need an instance of KubernetesClientInformerDiscoveryClient + // in order to create the ReactiveDiscoveryClientHealthIndicator and + // KubernetesClientCacheableInformerReactiveDiscoveryClient. + // As such, we create two of such instances in each bean. + /** * Post an event so that health indicator is initialized. */ @@ -93,15 +150,4 @@ KubernetesDiscoveryClientHealthIndicatorInitializer reactiveIndicatorInitializer return new KubernetesDiscoveryClientHealthIndicatorInitializer(podUtils, applicationEventPublisher); } - /** - * unlike the blocking implementation, we need to register the health indicator. - */ - @Bean - @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer - ReactiveDiscoveryClientHealthIndicator kubernetesReactiveDiscoveryClientHealthIndicator( - KubernetesClientInformerReactiveDiscoveryClient client, - DiscoveryClientHealthIndicatorProperties properties) { - return new ReactiveDiscoveryClientHealthIndicator(client, properties); - } - } diff --git a/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientReactiveAbstractInformerDiscoveryClient.java b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientReactiveAbstractInformerDiscoveryClient.java new file mode 100644 index 000000000..99709cc78 --- /dev/null +++ b/spring-cloud-kubernetes-client-discovery/src/main/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientReactiveAbstractInformerDiscoveryClient.java @@ -0,0 +1,59 @@ +/* + * Copyright 2019-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.client.discovery; + +import java.util.Objects; + +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import org.springframework.cloud.client.ServiceInstance; +import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; + +/** + * @author wind57 + */ +abstract class KubernetesClientReactiveAbstractInformerDiscoveryClient implements ReactiveDiscoveryClient { + + private final KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient; + + KubernetesClientReactiveAbstractInformerDiscoveryClient( + KubernetesClientInformerDiscoveryClient kubernetesDiscoveryClient) { + this.kubernetesDiscoveryClient = kubernetesDiscoveryClient; + } + + public abstract String description(); + + @Override + public Flux getInstances(String serviceId) { + Objects.requireNonNull(serviceId, "serviceId must be provided"); + return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getInstances(serviceId))) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public Flux getServices() { + return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getServices())) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public int getOrder() { + return kubernetesDiscoveryClient.getOrder(); + } + +} diff --git a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfigurationApplicationContextTests.java b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfigurationApplicationContextTests.java index 2608fc71e..9ed696910 100644 --- a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfigurationApplicationContextTests.java +++ b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerDiscoveryClientAutoConfigurationApplicationContextTests.java @@ -280,9 +280,7 @@ void kubernetesDiscoveryBlockingDisabled() { setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", "spring.cloud.discovery.blocking.enabled=false", "spring.cloud.kubernetes.client.namespace=default"); applicationContextRunner.run(context -> { - assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); - // only the implementation for the reactive - assertThat(context).hasBean("kubernetesClientInformerDiscoveryClientForReactiveImplementation"); + assertThat(context).doesNotHaveBean(KubernetesClientInformerDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesDiscoveryClientHealthIndicatorInitializer.class); @@ -301,9 +299,7 @@ void kubernetesDiscoveryBlockingDisabledWithSelectiveNamespaces() { setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", "spring.cloud.discovery.blocking.enabled=false", "spring.cloud.kubernetes.discovery.namespaces=a,b"); applicationContextRunner.run(context -> { - assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); - // only the implementation for the reactive - assertThat(context).hasBean("kubernetesClientInformerDiscoveryClientForReactiveImplementation"); + assertThat(context).doesNotHaveBean(KubernetesClientInformerDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesDiscoveryClientHealthIndicatorInitializer.class); @@ -469,6 +465,59 @@ void reactiveDisabledWithSelectiveNamespaces() { }); } + /** + *
+	 *     - no property related to cacheable in the blocking implementation is set, as such:
+	 *     - KubernetesClientInformerDiscoveryClient is present
+	 *     - KubernetesClientCacheableInformerDiscoveryClient is not present
+	 * 
+ */ + @Test + void blockingCacheableDefault() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); + assertThat(context).doesNotHaveBean(KubernetesClientCacheableInformerDiscoveryClient.class); + }); + } + + /** + *
+	 *     - cacheable in the blocking implementation = false, as such:
+	 *     - KubernetesClientInformerDiscoveryClient is present
+	 *     - KubernetesClientCacheableInformerDiscoveryClient is not present
+	 * 
+ */ + @Test + void blockingCacheableDisabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.blocking.enabled=false", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); + assertThat(context).doesNotHaveBean(KubernetesClientCacheableInformerDiscoveryClient.class); + }); + } + + /** + *
+	 *     - cacheable in the blocking implementation = true, as such:
+	 *     - KubernetesClientInformerDiscoveryClient is not present
+	 *     - KubernetesClientCacheableInformerDiscoveryClient is present
+	 * 
+ */ + @Test + void blockingCacheableEnabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.blocking.enabled=true", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(KubernetesClientInformerDiscoveryClient.class); + assertThat(context).hasSingleBean(KubernetesClientCacheableInformerDiscoveryClient.class); + }); + } + private void setup(String... properties) { applicationContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of( diff --git a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java index b416b6db0..3db1ffea7 100644 --- a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java +++ b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java @@ -289,9 +289,7 @@ void blockingDisabled() { setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", "spring.cloud.discovery.blocking.enabled=false", "spring.cloud.kubernetes.client.namespace=default"); applicationContextRunner.run(context -> { - assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); - // only the implementation for the reactive - assertThat(context).hasBean("kubernetesClientInformerDiscoveryClientForReactiveImplementation"); + assertThat(context).doesNotHaveBean(KubernetesClientInformerDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); // simple from commons and ours @@ -310,9 +308,7 @@ void blockingDisabledWithSelectiveNamespaces() { setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", "spring.cloud.discovery.blocking.enabled=false", "spring.cloud.kubernetes.discovery.namespaces=a,b"); applicationContextRunner.run(context -> { - assertThat(context).hasSingleBean(KubernetesClientInformerDiscoveryClient.class); - // only the implementation for the reactive - assertThat(context).hasBean("kubernetesClientInformerDiscoveryClientForReactiveImplementation"); + assertThat(context).doesNotHaveBean(KubernetesClientInformerDiscoveryClient.class); assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); // simple from commons and ours @@ -390,6 +386,81 @@ void healthEnabledClassNotPresentWithSelectiveNamespaces() { }); } + /** + *
+	 *     - no property related to cacheable in the reactive implementation is set, as such:
+	 *     - KubernetesClientInformerReactiveDiscoveryClient is present
+	 *     - KubernetesClientCacheableInformerReactiveDiscoveryClient is not present
+	 * 
+ */ + @Test + void reactiveCacheableDefault() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); + assertThat(context).doesNotHaveBean(KubernetesClientCacheableInformerReactiveDiscoveryClient.class); + }); + } + + /** + *
+	 *     - cacheable in the reactive implementation = false, as such:
+	 *     - KubernetesClientInformerReactiveDiscoveryClient is present
+	 *     - KubernetesClientCacheableInformerReactiveDiscoveryClient is not present
+	 * 
+ */ + @Test + void reactiveCacheableDisabled() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.reactive.enabled=false", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).hasSingleBean(KubernetesClientInformerReactiveDiscoveryClient.class); + assertThat(context).doesNotHaveBean(KubernetesClientCacheableInformerReactiveDiscoveryClient.class); + }); + } + + /** + *
+	 *     - cacheable in the reactive implementation = true, as such:
+	 *     - KubernetesClientInformerReactiveDiscoveryClient is not present
+	 *     - KubernetesClientCacheableInformerReactiveDiscoveryClient is present
+	 *     - Health indicator is disabled
+	 * 
+ */ + @Test + void reactiveCacheableEnabledWithoutHealthIndicator() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.reactive.enabled=true", + "spring.cloud.discovery.client.health-indicator.enabled=false", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(KubernetesClientInformerReactiveDiscoveryClient.class); + assertThat(context).hasSingleBean(KubernetesClientCacheableInformerReactiveDiscoveryClient.class); + }); + } + + /** + *
+	 *     - cacheable in the reactive implementation = true, as such:
+	 *     - KubernetesClientInformerReactiveDiscoveryClient is not present
+	 *     - KubernetesClientCacheableInformerReactiveDiscoveryClient is present
+	 *     - Health indicator is enabled
+	 * 
+ */ + @Test + void reactiveCacheableEnabledWithHealthIndicator() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.reactive.enabled=true", + "spring.cloud.discovery.client.health-indicator.enabled=true", + "spring.cloud.kubernetes.client.namespace=default"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(KubernetesClientInformerReactiveDiscoveryClient.class); + assertThat(context).hasSingleBean(KubernetesClientCacheableInformerReactiveDiscoveryClient.class); + }); + } + private void setup(String... properties) { applicationContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of( diff --git a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationTests.java b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationTests.java index b54bde292..0b3236005 100644 --- a/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationTests.java +++ b/spring-cloud-kubernetes-client-discovery/src/test/java/org/springframework/cloud/kubernetes/client/discovery/KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationTests.java @@ -88,12 +88,12 @@ ApiClient apiClient() { WireMock.configureFor(wireMockServer.port()); stubFor(get("/api/v1/namespaces/test/endpoints?resourceVersion=0&watch=false") .willReturn(aResponse().withStatus(200) - .withBody(new JSON().serialize(new V1EndpointsListBuilder() + .withBody(JSON.serialize(new V1EndpointsListBuilder() .withMetadata(new V1ListMetaBuilder().withResourceVersion("0").build()) .build())))); stubFor(get("/api/v1/namespaces/test/services?resourceVersion=0&watch=false") .willReturn(aResponse().withStatus(200) - .withBody(new JSON().serialize(new V1ServiceListBuilder() + .withBody(JSON.serialize(new V1ServiceListBuilder() .withMetadata(new V1ListMetaBuilder().withResourceVersion("0").build()) .build())))); return new ClientBuilder().setBasePath(wireMockServer.baseUrl()).build(); diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableDiscoveryClient.java b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableDiscoveryClient.java index b44571631..ccabcb7aa 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableDiscoveryClient.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableDiscoveryClient.java @@ -62,9 +62,4 @@ public String description() { return "Fabric8 Cacheable Blocking Discovery Client"; } - @Override - public int getOrder() { - return super.getOrder(); - } - } diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableReactiveDiscoveryClient.java b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableReactiveDiscoveryClient.java index aafbc037c..000107e7c 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableReactiveDiscoveryClient.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CacheableReactiveDiscoveryClient.java @@ -47,9 +47,4 @@ public String description() { return "Fabric8 Cacheable Reactive Discovery Client"; } - @Override - public int getOrder() { - return super.getOrder(); - } - } diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfiguration.java b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfiguration.java index feb9cb5d3..498b95287 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfiguration.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfiguration.java @@ -24,6 +24,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.client.ConditionalOnDiscoveryHealthIndicatorEnabled; @@ -77,6 +78,16 @@ Fabric8ReactiveDiscoveryClient fabric8ReactiveDiscoveryClient(KubernetesClient c return new Fabric8ReactiveDiscoveryClient(fabric8DiscoveryClient); } + @Bean + @ConditionalOnBean(Fabric8ReactiveDiscoveryClient.class) + @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer + ReactiveDiscoveryClientHealthIndicator kubernetesReactiveDiscoveryClientHealthIndicator( + Fabric8ReactiveDiscoveryClient reactiveClient, DiscoveryClientHealthIndicatorProperties properties) { + return new ReactiveDiscoveryClientHealthIndicator(reactiveClient, properties); + } + + // Above two beans are created when cacheable is disabled + @Bean @ConditionalOnMissingBean @ConditionalOnDiscoveryCacheableReactiveEnabled @@ -84,11 +95,38 @@ Fabric8CacheableReactiveDiscoveryClient fabric8CacheableReactiveDiscoveryClient( KubernetesDiscoveryProperties properties, Predicate predicate, Environment environment) { ServicePortSecureResolver servicePortSecureResolver = new ServicePortSecureResolver(properties); KubernetesNamespaceProvider namespaceProvider = new KubernetesNamespaceProvider(environment); - Fabric8DiscoveryClient fabric8DiscoveryClient = new Fabric8DiscoveryClient(client, properties, + Fabric8DiscoveryClient blockingClient = new Fabric8DiscoveryClient(client, properties, servicePortSecureResolver, namespaceProvider, predicate); - return new Fabric8CacheableReactiveDiscoveryClient(fabric8DiscoveryClient); + return new Fabric8CacheableReactiveDiscoveryClient(blockingClient); } + @Bean + @ConditionalOnMissingBean(Fabric8ReactiveDiscoveryClient.class) + @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer + ReactiveDiscoveryClientHealthIndicator kubernetesReactiveClientHealthIndicator(KubernetesClient client, + KubernetesDiscoveryProperties kubernetesDiscoveryProperties, Predicate predicate, + DiscoveryClientHealthIndicatorProperties properties, Environment environment) { + + KubernetesNamespaceProvider namespaceProvider = new KubernetesNamespaceProvider(environment); + ServicePortSecureResolver servicePortSecureResolver = new ServicePortSecureResolver( + kubernetesDiscoveryProperties); + Fabric8DiscoveryClient fabric8DiscoveryClient = new Fabric8DiscoveryClient(client, + kubernetesDiscoveryProperties, servicePortSecureResolver, namespaceProvider, predicate); + + Fabric8ReactiveDiscoveryClient reactiveClient = new Fabric8ReactiveDiscoveryClient(fabric8DiscoveryClient); + + return new ReactiveDiscoveryClientHealthIndicator(reactiveClient, properties); + } + + // Above two beans are created when cacheable is enabled. In this case, we can't make + // Fabric8DiscoveryClient a @Bean, since blocking discovery might be disabled and we + // do + // not want to allow wiring of it. Nevertheless, we still need an instance of + // Fabric8DiscoveryClient + // in order to create the ReactiveDiscoveryClientHealthIndicator and + // Fabric8CacheableReactiveDiscoveryClient + // As such, we create two of such instances in each bean. + /** * Post an event so that health indicator is initialized. */ @@ -101,11 +139,4 @@ KubernetesDiscoveryClientHealthIndicatorInitializer reactiveIndicatorInitializer return new KubernetesDiscoveryClientHealthIndicatorInitializer(podUtils, applicationEventPublisher); } - @Bean - @ConditionalOnSpringCloudKubernetesReactiveDiscoveryHealthInitializer - ReactiveDiscoveryClientHealthIndicator kubernetesReactiveDiscoveryClientHealthIndicator( - Fabric8ReactiveDiscoveryClient client, DiscoveryClientHealthIndicatorProperties properties) { - return new ReactiveDiscoveryClientHealthIndicator(client, properties); - } - } diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java b/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java index a5397fe4c..85544a11f 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8ReactiveDiscoveryClientAutoConfigurationApplicationContextTests.java @@ -180,10 +180,11 @@ void reactiveCacheableDisabled() { * - cacheable in the reactive implementation = true, as such: * - Fabric8ReactiveDiscoveryClient is not present * - Fabric8CacheableReactiveDiscoveryClient is present + * - Health indicator is disabled * */ @Test - void reactiveCacheableEnabled() { + void reactiveCacheableEnabledWithoutHealthIndicator() { setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", "spring.cloud.kubernetes.discovery.cacheable.reactive.enabled=true", "spring.cloud.discovery.client.health-indicator.enabled=false"); @@ -193,6 +194,25 @@ void reactiveCacheableEnabled() { }); } + /** + *
+	 *     - cacheable in the reactive implementation = true, as such:
+	 *     - Fabric8ReactiveDiscoveryClient is not present
+	 *     - Fabric8CacheableReactiveDiscoveryClient is present
+	 *     - Health indicator is enabled
+	 * 
+ */ + @Test + void reactiveCacheableEnabledWithHealthIndicator() { + setup("spring.main.cloud-platform=KUBERNETES", "spring.cloud.config.enabled=false", + "spring.cloud.kubernetes.discovery.cacheable.reactive.enabled=true", + "spring.cloud.discovery.client.health-indicator.enabled=true"); + applicationContextRunner.run(context -> { + assertThat(context).doesNotHaveBean(Fabric8ReactiveDiscoveryClient.class); + assertThat(context).hasSingleBean(Fabric8CacheableReactiveDiscoveryClient.class); + }); + } + private void setup(String... properties) { applicationContextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of(UtilAutoConfiguration.class,