Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,34 @@ public KubernetesClientServicesListSupplier(Environment environment,

@Override
public Flux<List<ServiceInstance>> get() {
List<ServiceInstance> result = new ArrayList<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had our code as :

public Flux<List<ServiceInstance>> get() { 

      List<ServiceInstance> result = ....
      return Flux.just(result); 

}

and it changes to :

public Flux<List<ServiceInstance>> get() { 

      return Flux.defer(() -> {
                 List<ServiceInstance> result = ....
                 return Flux.just(result); 
      })
}

That is the actual fix.

String serviceName = getServiceId();
LOG.debug(() -> "serviceID : " + serviceName);

if (discoveryProperties.allNamespaces()) {
LOG.debug(() -> "discovering services in all namespaces");
List<V1Service> services = services(null, serviceName);
services.forEach(service -> addMappedService(mapper, result, service));
}
else if (!discoveryProperties.namespaces().isEmpty()) {
List<String> selectiveNamespaces = discoveryProperties.namespaces().stream().sorted().toList();
LOG.debug(() -> "discovering services in selective namespaces : " + selectiveNamespaces);
selectiveNamespaces.forEach(selectiveNamespace -> {
List<V1Service> services = services(selectiveNamespace, serviceName);
return Flux.defer(() -> {
List<ServiceInstance> result = new ArrayList<>();
String serviceName = getServiceId();
LOG.debug(() -> "serviceID : " + serviceName);

if (discoveryProperties.allNamespaces()) {
LOG.debug(() -> "discovering services in all namespaces");
List<V1Service> services = services(null, serviceName);
services.forEach(service -> addMappedService(mapper, result, service));
});
}
else {
String namespace = getApplicationNamespace(null, "loadbalancer-service", kubernetesNamespaceProvider);
LOG.debug(() -> "discovering services in namespace : " + namespace);
List<V1Service> services = services(namespace, serviceName);
services.forEach(service -> addMappedService(mapper, result, service));
}
}
else if (!discoveryProperties.namespaces().isEmpty()) {
List<String> selectiveNamespaces = discoveryProperties.namespaces().stream().sorted().toList();
LOG.debug(() -> "discovering services in selective namespaces : " + selectiveNamespaces);
selectiveNamespaces.forEach(selectiveNamespace -> {
List<V1Service> services = services(selectiveNamespace, serviceName);
services.forEach(service -> addMappedService(mapper, result, service));
});
}
else {
String namespace = getApplicationNamespace(null, "loadbalancer-service", kubernetesNamespaceProvider);
LOG.debug(() -> "discovering services in namespace : " + namespace);
List<V1Service> services = services(namespace, serviceName);
services.forEach(service -> addMappedService(mapper, result, service));
}

LOG.debug(() -> "found services : " + result);
return Flux.just(result);
LOG.debug(() -> "found services : " + result);
return Flux.just(result);
});
}

private void addMappedService(KubernetesServiceInstanceMapper<V1Service> mapper, List<ServiceInstance> services,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,51 +57,53 @@ public class Fabric8ServicesListSupplier extends KubernetesServicesListSupplier<

@Override
public Flux<List<ServiceInstance>> get() {
List<ServiceInstance> result = new ArrayList<>();
String serviceName = getServiceId();
LOG.debug(() -> "serviceID : " + serviceName);
return Flux.defer(() -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same fix as above, nothing changes besides that

List<ServiceInstance> result = new ArrayList<>();
String serviceName = getServiceId();
LOG.debug(() -> "serviceID : " + serviceName);

if (discoveryProperties.allNamespaces()) {
LOG.debug(() -> "discovering services in all namespaces");
List<Service> services = kubernetesClient.services()
.inAnyNamespace()
.withField("metadata.name", serviceName)
.list()
.getItems();
services.forEach(service -> addMappedService(mapper, result, service));
}
else if (!discoveryProperties.namespaces().isEmpty()) {
List<String> selectiveNamespaces = discoveryProperties.namespaces().stream().sorted().toList();
LOG.debug(() -> "discovering services in selective namespaces : " + selectiveNamespaces);
selectiveNamespaces.forEach(selectiveNamespace -> {
Service service = kubernetesClient.services()
.inNamespace(selectiveNamespace)
.withName(serviceName)
.get();
if (discoveryProperties.allNamespaces()) {
LOG.debug(() -> "discovering services in all namespaces");
List<Service> services = kubernetesClient.services()
.inAnyNamespace()
.withField("metadata.name", serviceName)
.list()
.getItems();
services.forEach(service -> addMappedService(mapper, result, service));
}
else if (!discoveryProperties.namespaces().isEmpty()) {
List<String> selectiveNamespaces = discoveryProperties.namespaces().stream().sorted().toList();
LOG.debug(() -> "discovering services in selective namespaces : " + selectiveNamespaces);
selectiveNamespaces.forEach(selectiveNamespace -> {
Service service = kubernetesClient.services()
.inNamespace(selectiveNamespace)
.withName(serviceName)
.get();
if (service != null) {
addMappedService(mapper, result, service);
}
else {
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : "
+ selectiveNamespace);
}
});
}
else {
String namespace = Fabric8Utils.getApplicationNamespace(kubernetesClient, null, "loadbalancer-service",
namespaceProvider);
LOG.debug(() -> "discovering services in namespace : " + namespace);
Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
if (service != null) {
addMappedService(mapper, result, service);
}
else {
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : "
+ selectiveNamespace);
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : " + namespace);
}
});
}
else {
String namespace = Fabric8Utils.getApplicationNamespace(kubernetesClient, null, "loadbalancer-service",
namespaceProvider);
LOG.debug(() -> "discovering services in namespace : " + namespace);
Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
if (service != null) {
addMappedService(mapper, result, service);
}
else {
LOG.debug(() -> "did not find service with name : " + serviceName + " in namespace : " + namespace);
}
}

LOG.debug(() -> "found services : " + result);
return Flux.just(result);
LOG.debug(() -> "found services : " + result);
return Flux.just(result);
});
}

private void addMappedService(KubernetesServiceInstanceMapper<Service> mapper, List<ServiceInstance> services,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@
import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.client.WebClient;

/**
* @author wind57
*/
Expand Down Expand Up @@ -63,24 +56,4 @@ public static Endpoints endpoints(int port, String host, String namespace) {
.build();
}

@TestConfiguration
public static class LoadBalancerConfiguration {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not removed, but moved to individual classes


@Bean
@LoadBalanced
WebClient.Builder client() {
return WebClient.builder();
}

}

@SpringBootApplication
public static class Configuration {

public static void main(String[] args) {
SpringApplication.run(Configuration.class);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2013-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {

public static void main(String[] args) {
SpringApplication.run(App.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2013-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode;

import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.client.WebClient;

@TestConfiguration
public class LoadBalancerConfiguration {

@Bean
@LoadBalanced
WebClient.Builder client() {
return WebClient.builder();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServiceInstanceMapper;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode.App;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode.LoadBalancerConfiguration;
import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.DiscoveryClientServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
Expand All @@ -43,16 +44,14 @@
import org.springframework.web.reactive.function.client.WebClient;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.Configuration;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.LoadBalancerConfiguration;

/**
* @author wind57
*/
@SpringBootTest(
properties = { "spring.cloud.kubernetes.loadbalancer.mode=POD", "spring.main.cloud-platform=KUBERNETES",
"spring.cloud.kubernetes.discovery.all-namespaces=true" },
classes = { LoadBalancerConfiguration.class, Configuration.class })
classes = { LoadBalancerConfiguration.class, App.class })
class AllNamespacesTest {

private static final String SERVICE_A_URL = "http://service-a";
Expand All @@ -69,14 +68,15 @@ class AllNamespacesTest {

private static WireMockServer serviceBMockServer;

@SuppressWarnings("rawtypes")
private static final MockedStatic<KubernetesServiceInstanceMapper> MOCKED_STATIC = Mockito
.mockStatic(KubernetesServiceInstanceMapper.class);

@Autowired
private WebClient.Builder builder;

@Autowired
private ObjectProvider<LoadBalancerClientFactory> loadBalancerClientFactory;
private LoadBalancerClientFactory loadBalancerClientFactory;

@BeforeAll
static void beforeAll() {
Expand Down Expand Up @@ -177,7 +177,6 @@ void test() {
Assertions.assertThat(serviceBResult).isEqualTo("service-b-reached");

CachingServiceInstanceListSupplier supplier = (CachingServiceInstanceListSupplier) loadBalancerClientFactory
.getIfAvailable()
.getProvider("service-a", ServiceInstanceListSupplier.class)
.getIfAvailable();
Assertions.assertThat(supplier.getDelegate().getClass())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.kubernetes.commons.loadbalancer.KubernetesServiceInstanceMapper;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode.App;
import org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.mode.LoadBalancerConfiguration;
import org.springframework.cloud.loadbalancer.core.CachingServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.DiscoveryClientServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
Expand All @@ -43,16 +44,14 @@
import org.springframework.web.reactive.function.client.WebClient;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.Configuration;
import static org.springframework.cloud.kubernetes.fabric8.loadbalancer.it.Util.LoadBalancerConfiguration;

/**
* @author wind57
*/
@SpringBootTest(properties = { "spring.cloud.kubernetes.loadbalancer.mode=POD", "spring.main.cloud-platform=KUBERNETES",
"spring.cloud.kubernetes.discovery.all-namespaces=false", "spring.cloud.kubernetes.discovery.namespaces.[0]=a",
"spring.cloud.kubernetes.discovery.namespaces.[1]=b" },
classes = { LoadBalancerConfiguration.class, Configuration.class })
classes = { LoadBalancerConfiguration.class, App.class })
class SelectiveNamespacesTest {

private static final String MY_SERVICE_URL = "http://my-service";
Expand All @@ -71,14 +70,15 @@ class SelectiveNamespacesTest {

private static WireMockServer serviceCMockServer;

@SuppressWarnings("rawtypes")
private static final MockedStatic<KubernetesServiceInstanceMapper> MOCKED_STATIC = Mockito
.mockStatic(KubernetesServiceInstanceMapper.class);

@Autowired
private WebClient.Builder builder;

@Autowired
private ObjectProvider<LoadBalancerClientFactory> loadBalancerClientFactory;
private LoadBalancerClientFactory loadBalancerClientFactory;

@BeforeAll
static void beforeAll() {
Expand Down Expand Up @@ -213,7 +213,6 @@ void test() {
}

CachingServiceInstanceListSupplier supplier = (CachingServiceInstanceListSupplier) loadBalancerClientFactory
.getIfAvailable()
.getProvider("my-service", ServiceInstanceListSupplier.class)
.getIfAvailable();
Assertions.assertThat(supplier.getDelegate().getClass())
Expand Down
Loading
Loading