Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;
import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunction;
import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesDiscoveryClient;
import org.springframework.util.Assert;

import java.util.Objects;

/**
* Kubernetes implementation of {@link ReactiveDiscoveryClient}. Currently relies on the
Expand All @@ -50,7 +51,7 @@ public String description() {

@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null");
Objects.requireNonNull(serviceId, "serviceId must not be null");
return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
/**
* @author wind57
*/
@TestPropertySource(properties = { "spring.main.cloud-platform=kubernetes",
"spring.cloud.config.import-check.enabled=false", "spring.cloud.kubernetes.client.namespace=default",
@TestPropertySource(properties = {
"spring.main.cloud-platform=kubernetes",
"spring.cloud.kubernetes.client.namespace=default",
"spring.cloud.kubernetes.discovery.metadata.add-pod-labels=true",
"spring.cloud.kubernetes.discovery.metadata.add-pod-annotations=true",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug" })
"spring.cloud.kubernetes.discovery.metadata.add-pod-annotations=true" })
@ExtendWith(OutputCaptureExtension.class)
@SpringBootTest(classes = { Fabric8DiscoveryApp.class, Fabric8DiscoveryBase.TestConfig.class },
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@
import org.springframework.cloud.kubernetes.integration.tests.commons.Phase;
import org.springframework.test.context.TestPropertySource;

import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.alterPods;
import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertBlockingConfiguration;
import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertPodMetadata;

/**
* @author wind57
*/
@TestPropertySource(properties = { "spring.cloud.discovery.reactive.enabled=false",
"logging.level.org.springframework.cloud.client.discovery.health=DEBUG",
"logging.level.org.springframework.cloud.kubernetes.commons.discovery=DEBUG" })
@TestPropertySource(properties = {
"spring.cloud.discovery.reactive.enabled=false",
"spring.cloud.discovery.blocking.enabled=true",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug",
"logging.level.org.springframework.cloud.client.discovery.health=debug",
"logging.level.org.springframework.cloud.kubernetes.commons.discovery=debug" })
class Fabric8DiscoveryBlockingIT extends Fabric8DiscoveryBase {

@LocalManagementPort
Expand All @@ -57,18 +61,8 @@ void afterEach() {
}

@Test
void test(CapturedOutput output) throws Exception {

String[] busyboxPods = K3S.execInContainer("sh", "-c", "kubectl get pods -l app=busybox -o=name --no-headers")
.getStdout()
.split("\n");

String podOne = busyboxPods[0].split("/")[1];
String podTwo = busyboxPods[1].split("/")[1];

K3S.execInContainer("sh", "-c", "kubectl label pods " + podOne + " my-label=my-value");
K3S.execInContainer("sh", "-c", "kubectl annotate pods " + podTwo + " my-annotation=my-value");

void test(CapturedOutput output) {
alterPods(K3S);
assertBlockingConfiguration(output, port);
assertPodMetadata(discoveryClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.web.server.LocalManagementPort;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.kubernetes.integration.tests.commons.Images;
import org.springframework.cloud.kubernetes.integration.tests.commons.Phase;
import org.springframework.test.context.TestPropertySource;

import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.testReactiveConfiguration;
import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.alterPods;
import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertPodMetadata;
import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertReactiveConfiguration;

/**
* @author wind57
*/
@TestPropertySource(properties = { "logging.level.org.springframework.cloud.kubernetes.commons.discovery=DEBUG",
"logging.level.org.springframework.cloud.client.discovery.health.reactive=DEBUG",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery.reactive=DEBUG",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=DEBUG",
"spring.cloud.discovery.blocking.enabled=false" })
@TestPropertySource(properties = {
"spring.cloud.discovery.reactive.enabled=true",
"spring.cloud.discovery.blocking.enabled=false",
"logging.level.org.springframework.cloud.kubernetes.commons.discovery=debug",
"logging.level.org.springframework.cloud.client.discovery.health.reactive=debug",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery.reactive=debug",
"logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug" })
class Fabric8DiscoveryReactiveIT extends Fabric8DiscoveryBase {

@LocalManagementPort
Expand All @@ -59,7 +65,9 @@ void afterEach() {

@Test
void test(CapturedOutput output) {
testReactiveConfiguration(discoveryClient, output, port);
alterPods(K3S);
assertReactiveConfiguration(output, port);
assertPodMetadata(discoveryClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.cloud.kubernetes.commons.discovery.DefaultKubernetesServiceInstance;
import org.springframework.http.HttpMethod;
import org.springframework.web.reactive.function.client.WebClient;
import org.testcontainers.k3s.K3sContainer;

import static java.util.AbstractMap.SimpleEntry;
import static java.util.Map.Entry;
Expand All @@ -43,57 +44,52 @@
*/
final class TestAssertions {

private static final String REACTIVE_STATUS = "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].status";
private static final String REACTIVE = "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client']";

private static final String BLOCKING_STATUS = "$.components.discoveryComposite.components.discoveryClient.status";
private static final String REACTIVE_STATUS = REACTIVE + ".status";

private static final String REACTIVE_SERVICES = REACTIVE + ".details.services";

private static final String BLOCKING = "$.components.discoveryComposite.components.discoveryClient";

private static final String BLOCKING_STATUS = BLOCKING + ".status";

private static final String BLOCKING_SERVICES = BLOCKING + ".details.services";

private static final String DISCOVERY_COMPOSITE_STATUS = "$.components.discoveryComposite.status";

private static final BasicJsonTester BASIC_JSON_TESTER = new BasicJsonTester(TestAssertions.class);

private TestAssertions() {

}

static void assertPodMetadata(DiscoveryClient discoveryClient) {
static void alterPods(K3sContainer container) {
try {
String[] busyboxPods = container.execInContainer("sh", "-c", "kubectl get pods -l app=busybox -o=name --no-headers")
.getStdout()
.split("\n");

String podOne = busyboxPods[0].split("/")[1];
String podTwo = busyboxPods[1].split("/")[1];

container.execInContainer("sh", "-c", "kubectl label pods " + podOne + " my-label=my-value");
container.execInContainer("sh", "-c", "kubectl annotate pods " + podTwo + " my-annotation=my-value");
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

static void assertPodMetadata(DiscoveryClient discoveryClient) {
List<ServiceInstance> serviceInstances = discoveryClient.getInstances("busybox-service");
assertPodMetadata(serviceInstances);
}

// if annotations are empty, we got the other pod, with labels here
DefaultKubernetesServiceInstance withCustomLabel = serviceInstances.stream()
.map(instance -> (DefaultKubernetesServiceInstance) instance)
.filter(x -> x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty())
.toList()
.get(0);
List<Entry<String, String>> podMetadataLabels = withCustomLabel.podMetadata()
.get("labels")
.entrySet()
.stream()
.toList();

assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service");
assertThat(withCustomLabel.getInstanceId()).isNotNull();
assertThat(withCustomLabel.getHost()).isNotNull();
assertThat(withCustomLabel.getMetadata())
.isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80"));
assertThat(podMetadataLabels).contains(new SimpleEntry<>("my-label", "my-value"));

// if annotation are present, we got the one with annotations here
DefaultKubernetesServiceInstance withCustomAnnotation = serviceInstances.stream()
.map(instance -> (DefaultKubernetesServiceInstance) instance)
.filter(x -> !x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty())
.toList()
.get(0);
List<Entry<String, String>> podMetadataAnnotations = withCustomAnnotation.podMetadata()
.get("annotations")
.entrySet()
.stream()
.toList();

assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service");
assertThat(withCustomLabel.getInstanceId()).isNotNull();
assertThat(withCustomLabel.getHost()).isNotNull();
assertThat(withCustomLabel.getMetadata())
.isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80"));
assertThat(podMetadataAnnotations).contains(new SimpleEntry<>("my-annotation", "my-value"));
static void assertPodMetadata(ReactiveDiscoveryClient discoveryClient) {
List<ServiceInstance> serviceInstances = discoveryClient.getInstances("busybox-service")
.collectList().block();
assertPodMetadata(serviceInstances);
}

static void assertAllServices(DiscoveryClient discoveryClient) {
Expand All @@ -115,12 +111,12 @@ static void assertAllServices(DiscoveryClient discoveryClient) {
}

/**
* Reactive is disabled, only blocking is active. As such,
* KubernetesInformerDiscoveryClientAutoConfiguration::indicatorInitializer will post
* an InstanceRegisteredEvent.
*
* We assert for logs and call '/health' endpoint to see that blocking discovery
* client was initialized.
* <pre>
* Reactive is disabled, only blocking is active. As such,
* We assert for logs and call '/health' endpoint to see that blocking discovery
* client was initialized.
* </pre>
*/
static void assertBlockingConfiguration(CapturedOutput output, int port) {

Expand All @@ -137,71 +133,28 @@ static void assertBlockingConfiguration(CapturedOutput output, int port) {
.block();

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathStringValue("$.components.discoveryComposite.status")
.extractingJsonPathStringValue(DISCOVERY_COMPOSITE_STATUS)
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue(BLOCKING_STATUS).isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathArrayValue("$.components.discoveryComposite.components.discoveryClient.details.services")
.extractingJsonPathArrayValue(BLOCKING_SERVICES)
.containsExactlyInAnyOrder("kubernetes", "busybox-service");

assertThat(BASIC_JSON_TESTER.from(healthResult)).doesNotHaveJsonPath(REACTIVE_STATUS);

}

/**
* Both blocking and reactive are enabled.
*/
static void testDefaultConfiguration(CapturedOutput output, int port) {

waitForLogStatement(output, "Will publish InstanceRegisteredEvent from blocking implementation");
waitForLogStatement(output, "Will publish InstanceRegisteredEvent from reactive implementation");
waitForLogStatement(output, "publishing InstanceRegisteredEvent");
waitForLogStatement(output, "Discovery Client has been initialized");

WebClient healthClient = builder().baseUrl("http://localhost:" + port + "/actuator/health").build();

String healthResult = healthClient.method(HttpMethod.GET)
.retrieve()
.bodyToMono(String.class)
.retryWhen(retrySpec())
.block();

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathStringValue("$.components.discoveryComposite.status")
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathStringValue("$.components.discoveryComposite.components.discoveryClient.status")
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathArrayValue("$.components.discoveryComposite.components.discoveryClient.details.services")
.containsExactlyInAnyOrder("kubernetes", "busybox-service");

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathStringValue("$.components.reactiveDiscoveryClients.status")
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue(
"$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].status")
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathArrayValue(
"$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].details.services")
.containsExactlyInAnyOrder("kubernetes", "busybox-service");
}

/**
* Reactive is enabled, blocking is disabled. As such,
* KubernetesInformerDiscoveryClientAutoConfiguration::indicatorInitializer will post
* an InstanceRegisteredEvent.
*
* We assert for logs and call '/health' endpoint to see that reactive discovery
* client was initialized.
* <pre>
* Reactive is enabled, only blocking is disabled. As such,
* We assert for logs and call '/health' endpoint to see that blocking discovery
* client was initialized.
* </pre>
*/
static void testReactiveConfiguration(ReactiveDiscoveryClient discoveryClient, CapturedOutput output, int port) {
static void assertReactiveConfiguration(CapturedOutput output, int port) {

waitForLogStatement(output, "Will publish InstanceRegisteredEvent from reactive implementation");
waitForLogStatement(output, "publishing InstanceRegisteredEvent");
Expand All @@ -215,23 +168,14 @@ static void testReactiveConfiguration(ReactiveDiscoveryClient discoveryClient, C
.retryWhen(retrySpec())
.block();

assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathStringValue("$.components.reactiveDiscoveryClients.status")
.isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue(REACTIVE_STATUS).isEqualTo("UP");

assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathArrayValue(
"$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].details.services")
assertThat(BASIC_JSON_TESTER.from(healthResult))
.extractingJsonPathArrayValue(REACTIVE_SERVICES)
.containsExactlyInAnyOrder("kubernetes", "busybox-service");

assertThat(BASIC_JSON_TESTER.from(healthResult)).doesNotHaveJsonPath(BLOCKING_STATUS);

List<String> services = discoveryClient.getServices().toStream().toList();

assertThat(services).contains("busybox-service");
assertThat(services).contains("kubernetes");

}

/**
Expand Down Expand Up @@ -279,6 +223,47 @@ static void filterMatchesBothNamespacesViaThePredicate(DiscoveryClient discovery

}

private static void assertPodMetadata(List<ServiceInstance> serviceInstances) {

// if annotations are empty, we got the other pod, with labels here
DefaultKubernetesServiceInstance withCustomLabel = serviceInstances.stream()
.map(instance -> (DefaultKubernetesServiceInstance) instance)
.filter(x -> x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty())
.toList()
.get(0);
List<Entry<String, String>> podMetadataLabels = withCustomLabel.podMetadata()
.get("labels")
.entrySet()
.stream()
.toList();

assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service");
assertThat(withCustomLabel.getInstanceId()).isNotNull();
assertThat(withCustomLabel.getHost()).isNotNull();
assertThat(withCustomLabel.getMetadata())
.isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80"));
assertThat(podMetadataLabels).contains(new SimpleEntry<>("my-label", "my-value"));

// if annotation are present, we got the one with annotations here
DefaultKubernetesServiceInstance withCustomAnnotation = serviceInstances.stream()
.map(instance -> (DefaultKubernetesServiceInstance) instance)
.filter(x -> !x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty())
.toList()
.get(0);
List<Entry<String, String>> podMetadataAnnotations = withCustomAnnotation.podMetadata()
.get("annotations")
.entrySet()
.stream()
.toList();

assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service");
assertThat(withCustomLabel.getInstanceId()).isNotNull();
assertThat(withCustomLabel.getHost()).isNotNull();
assertThat(withCustomLabel.getMetadata())
.isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80"));
assertThat(podMetadataAnnotations).contains(new SimpleEntry<>("my-annotation", "my-value"));
}

private static void waitForLogStatement(CapturedOutput output, String message) {
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(30))
Expand Down
Loading