From 956628c8e1a7a98832a3f7ecb884f92a236638b5 Mon Sep 17 00:00:00 2001 From: wind57 Date: Sun, 9 Feb 2025 14:12:42 +0200 Subject: [PATCH] started work Signed-off-by: wind57 --- .../KubernetesReactiveDiscoveryClient.java | 5 +- .../discovery/Fabric8DiscoveryBase.java | 8 +- .../discovery/Fabric8DiscoveryBlockingIT.java | 24 +-- .../discovery/Fabric8DiscoveryReactiveIT.java | 22 +- .../client/discovery/TestAssertions.java | 199 ++++++++---------- 5 files changed, 123 insertions(+), 135 deletions(-) diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/reactive/KubernetesReactiveDiscoveryClient.java b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/reactive/KubernetesReactiveDiscoveryClient.java index e86791b75e..eeb92ff640 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/reactive/KubernetesReactiveDiscoveryClient.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/reactive/KubernetesReactiveDiscoveryClient.java @@ -25,7 +25,8 @@ import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunction; import org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesDiscoveryClient; -import org.springframework.util.Assert; + +import java.util.Objects; /** * Kubernetes implementation of {@link ReactiveDiscoveryClient}. Currently relies on the @@ -50,7 +51,7 @@ public String description() { @Override public Flux getInstances(String serviceId) { - Assert.notNull(serviceId, "[Assertion failed] - the object argument must not be null"); + Objects.requireNonNull(serviceId, "serviceId must not be null"); return Flux.defer(() -> Flux.fromIterable(kubernetesDiscoveryClient.getInstances(serviceId))) .subscribeOn(Schedulers.boundedElastic()); } diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBase.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBase.java index 93ccf8ddd4..e213fada23 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBase.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBase.java @@ -35,11 +35,11 @@ /** * @author wind57 */ -@TestPropertySource(properties = { "spring.main.cloud-platform=kubernetes", - "spring.cloud.config.import-check.enabled=false", "spring.cloud.kubernetes.client.namespace=default", +@TestPropertySource(properties = { + "spring.main.cloud-platform=kubernetes", + "spring.cloud.kubernetes.client.namespace=default", "spring.cloud.kubernetes.discovery.metadata.add-pod-labels=true", - "spring.cloud.kubernetes.discovery.metadata.add-pod-annotations=true", - "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug" }) + "spring.cloud.kubernetes.discovery.metadata.add-pod-annotations=true" }) @ExtendWith(OutputCaptureExtension.class) @SpringBootTest(classes = { Fabric8DiscoveryApp.class, Fabric8DiscoveryBase.TestConfig.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBlockingIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBlockingIT.java index 9f60a5e913..45a7236bea 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBlockingIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryBlockingIT.java @@ -28,15 +28,19 @@ import org.springframework.cloud.kubernetes.integration.tests.commons.Phase; import org.springframework.test.context.TestPropertySource; +import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.alterPods; import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertBlockingConfiguration; import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertPodMetadata; /** * @author wind57 */ -@TestPropertySource(properties = { "spring.cloud.discovery.reactive.enabled=false", - "logging.level.org.springframework.cloud.client.discovery.health=DEBUG", - "logging.level.org.springframework.cloud.kubernetes.commons.discovery=DEBUG" }) +@TestPropertySource(properties = { + "spring.cloud.discovery.reactive.enabled=false", + "spring.cloud.discovery.blocking.enabled=true", + "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug", + "logging.level.org.springframework.cloud.client.discovery.health=debug", + "logging.level.org.springframework.cloud.kubernetes.commons.discovery=debug" }) class Fabric8DiscoveryBlockingIT extends Fabric8DiscoveryBase { @LocalManagementPort @@ -57,18 +61,8 @@ void afterEach() { } @Test - void test(CapturedOutput output) throws Exception { - - String[] busyboxPods = K3S.execInContainer("sh", "-c", "kubectl get pods -l app=busybox -o=name --no-headers") - .getStdout() - .split("\n"); - - String podOne = busyboxPods[0].split("/")[1]; - String podTwo = busyboxPods[1].split("/")[1]; - - K3S.execInContainer("sh", "-c", "kubectl label pods " + podOne + " my-label=my-value"); - K3S.execInContainer("sh", "-c", "kubectl annotate pods " + podTwo + " my-annotation=my-value"); - + void test(CapturedOutput output) { + alterPods(K3S); assertBlockingConfiguration(output, port); assertPodMetadata(discoveryClient); } diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryReactiveIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryReactiveIT.java index 12cf477f20..8ca029127a 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryReactiveIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/Fabric8DiscoveryReactiveIT.java @@ -21,23 +21,29 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.system.CapturedOutput; import org.springframework.boot.test.web.server.LocalManagementPort; +import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.kubernetes.integration.tests.commons.Images; import org.springframework.cloud.kubernetes.integration.tests.commons.Phase; import org.springframework.test.context.TestPropertySource; -import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.testReactiveConfiguration; +import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.alterPods; +import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertPodMetadata; +import static org.springframework.cloud.kubernetes.fabric8.client.discovery.TestAssertions.assertReactiveConfiguration; /** * @author wind57 */ -@TestPropertySource(properties = { "logging.level.org.springframework.cloud.kubernetes.commons.discovery=DEBUG", - "logging.level.org.springframework.cloud.client.discovery.health.reactive=DEBUG", - "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery.reactive=DEBUG", - "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=DEBUG", - "spring.cloud.discovery.blocking.enabled=false" }) +@TestPropertySource(properties = { + "spring.cloud.discovery.reactive.enabled=true", + "spring.cloud.discovery.blocking.enabled=false", + "logging.level.org.springframework.cloud.kubernetes.commons.discovery=debug", + "logging.level.org.springframework.cloud.client.discovery.health.reactive=debug", + "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery.reactive=debug", + "logging.level.org.springframework.cloud.kubernetes.fabric8.discovery=debug" }) class Fabric8DiscoveryReactiveIT extends Fabric8DiscoveryBase { @LocalManagementPort @@ -59,7 +65,9 @@ void afterEach() { @Test void test(CapturedOutput output) { - testReactiveConfiguration(discoveryClient, output, port); + alterPods(K3S); + assertReactiveConfiguration(output, port); + assertPodMetadata(discoveryClient); } } diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/TestAssertions.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/TestAssertions.java index 1fdd8272b9..1cecda0e52 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/TestAssertions.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-client-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/client/discovery/TestAssertions.java @@ -30,6 +30,7 @@ import org.springframework.cloud.kubernetes.commons.discovery.DefaultKubernetesServiceInstance; import org.springframework.http.HttpMethod; import org.springframework.web.reactive.function.client.WebClient; +import org.testcontainers.k3s.K3sContainer; import static java.util.AbstractMap.SimpleEntry; import static java.util.Map.Entry; @@ -43,9 +44,19 @@ */ final class TestAssertions { - private static final String REACTIVE_STATUS = "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].status"; + private static final String REACTIVE = "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client']"; - private static final String BLOCKING_STATUS = "$.components.discoveryComposite.components.discoveryClient.status"; + private static final String REACTIVE_STATUS = REACTIVE + ".status"; + + private static final String REACTIVE_SERVICES = REACTIVE + ".details.services"; + + private static final String BLOCKING = "$.components.discoveryComposite.components.discoveryClient"; + + private static final String BLOCKING_STATUS = BLOCKING + ".status"; + + private static final String BLOCKING_SERVICES = BLOCKING + ".details.services"; + + private static final String DISCOVERY_COMPOSITE_STATUS = "$.components.discoveryComposite.status"; private static final BasicJsonTester BASIC_JSON_TESTER = new BasicJsonTester(TestAssertions.class); @@ -53,47 +64,32 @@ private TestAssertions() { } - static void assertPodMetadata(DiscoveryClient discoveryClient) { + static void alterPods(K3sContainer container) { + try { + String[] busyboxPods = container.execInContainer("sh", "-c", "kubectl get pods -l app=busybox -o=name --no-headers") + .getStdout() + .split("\n"); + + String podOne = busyboxPods[0].split("/")[1]; + String podTwo = busyboxPods[1].split("/")[1]; + + container.execInContainer("sh", "-c", "kubectl label pods " + podOne + " my-label=my-value"); + container.execInContainer("sh", "-c", "kubectl annotate pods " + podTwo + " my-annotation=my-value"); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + static void assertPodMetadata(DiscoveryClient discoveryClient) { List serviceInstances = discoveryClient.getInstances("busybox-service"); + assertPodMetadata(serviceInstances); + } - // if annotations are empty, we got the other pod, with labels here - DefaultKubernetesServiceInstance withCustomLabel = serviceInstances.stream() - .map(instance -> (DefaultKubernetesServiceInstance) instance) - .filter(x -> x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty()) - .toList() - .get(0); - List> podMetadataLabels = withCustomLabel.podMetadata() - .get("labels") - .entrySet() - .stream() - .toList(); - - assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service"); - assertThat(withCustomLabel.getInstanceId()).isNotNull(); - assertThat(withCustomLabel.getHost()).isNotNull(); - assertThat(withCustomLabel.getMetadata()) - .isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80")); - assertThat(podMetadataLabels).contains(new SimpleEntry<>("my-label", "my-value")); - - // if annotation are present, we got the one with annotations here - DefaultKubernetesServiceInstance withCustomAnnotation = serviceInstances.stream() - .map(instance -> (DefaultKubernetesServiceInstance) instance) - .filter(x -> !x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty()) - .toList() - .get(0); - List> podMetadataAnnotations = withCustomAnnotation.podMetadata() - .get("annotations") - .entrySet() - .stream() - .toList(); - - assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service"); - assertThat(withCustomLabel.getInstanceId()).isNotNull(); - assertThat(withCustomLabel.getHost()).isNotNull(); - assertThat(withCustomLabel.getMetadata()) - .isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80")); - assertThat(podMetadataAnnotations).contains(new SimpleEntry<>("my-annotation", "my-value")); + static void assertPodMetadata(ReactiveDiscoveryClient discoveryClient) { + List serviceInstances = discoveryClient.getInstances("busybox-service") + .collectList().block(); + assertPodMetadata(serviceInstances); } static void assertAllServices(DiscoveryClient discoveryClient) { @@ -115,12 +111,12 @@ static void assertAllServices(DiscoveryClient discoveryClient) { } /** - * Reactive is disabled, only blocking is active. As such, - * KubernetesInformerDiscoveryClientAutoConfiguration::indicatorInitializer will post - * an InstanceRegisteredEvent. * - * We assert for logs and call '/health' endpoint to see that blocking discovery - * client was initialized. + *
+	 *     	Reactive is disabled, only blocking is active. As such,
+	 * 	 	We assert for logs and call '/health' endpoint to see that blocking discovery
+	 * 	 	client was initialized.
+	 * 
*/ static void assertBlockingConfiguration(CapturedOutput output, int port) { @@ -137,13 +133,13 @@ static void assertBlockingConfiguration(CapturedOutput output, int port) { .block(); assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathStringValue("$.components.discoveryComposite.status") + .extractingJsonPathStringValue(DISCOVERY_COMPOSITE_STATUS) .isEqualTo("UP"); assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue(BLOCKING_STATUS).isEqualTo("UP"); assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathArrayValue("$.components.discoveryComposite.components.discoveryClient.details.services") + .extractingJsonPathArrayValue(BLOCKING_SERVICES) .containsExactlyInAnyOrder("kubernetes", "busybox-service"); assertThat(BASIC_JSON_TESTER.from(healthResult)).doesNotHaveJsonPath(REACTIVE_STATUS); @@ -151,57 +147,14 @@ static void assertBlockingConfiguration(CapturedOutput output, int port) { } /** - * Both blocking and reactive are enabled. - */ - static void testDefaultConfiguration(CapturedOutput output, int port) { - - waitForLogStatement(output, "Will publish InstanceRegisteredEvent from blocking implementation"); - waitForLogStatement(output, "Will publish InstanceRegisteredEvent from reactive implementation"); - waitForLogStatement(output, "publishing InstanceRegisteredEvent"); - waitForLogStatement(output, "Discovery Client has been initialized"); - - WebClient healthClient = builder().baseUrl("http://localhost:" + port + "/actuator/health").build(); - - String healthResult = healthClient.method(HttpMethod.GET) - .retrieve() - .bodyToMono(String.class) - .retryWhen(retrySpec()) - .block(); - - assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathStringValue("$.components.discoveryComposite.status") - .isEqualTo("UP"); - - assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathStringValue("$.components.discoveryComposite.components.discoveryClient.status") - .isEqualTo("UP"); - - assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathArrayValue("$.components.discoveryComposite.components.discoveryClient.details.services") - .containsExactlyInAnyOrder("kubernetes", "busybox-service"); - - assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathStringValue("$.components.reactiveDiscoveryClients.status") - .isEqualTo("UP"); - - assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue( - "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].status") - .isEqualTo("UP"); - - assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathArrayValue( - "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].details.services") - .containsExactlyInAnyOrder("kubernetes", "busybox-service"); - } - - /** - * Reactive is enabled, blocking is disabled. As such, - * KubernetesInformerDiscoveryClientAutoConfiguration::indicatorInitializer will post - * an InstanceRegisteredEvent. * - * We assert for logs and call '/health' endpoint to see that reactive discovery - * client was initialized. + *
+	 *     	Reactive is enabled, only blocking is disabled. As such,
+	 * 	 	We assert for logs and call '/health' endpoint to see that blocking discovery
+	 * 	 	client was initialized.
+	 * 
*/ - static void testReactiveConfiguration(ReactiveDiscoveryClient discoveryClient, CapturedOutput output, int port) { + static void assertReactiveConfiguration(CapturedOutput output, int port) { waitForLogStatement(output, "Will publish InstanceRegisteredEvent from reactive implementation"); waitForLogStatement(output, "publishing InstanceRegisteredEvent"); @@ -215,23 +168,14 @@ static void testReactiveConfiguration(ReactiveDiscoveryClient discoveryClient, C .retryWhen(retrySpec()) .block(); - assertThat(BASIC_JSON_TESTER.from(healthResult)) - .extractingJsonPathStringValue("$.components.reactiveDiscoveryClients.status") - .isEqualTo("UP"); - assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathStringValue(REACTIVE_STATUS).isEqualTo("UP"); - assertThat(BASIC_JSON_TESTER.from(healthResult)).extractingJsonPathArrayValue( - "$.components.reactiveDiscoveryClients.components.['Fabric8 Kubernetes Reactive Discovery Client'].details.services") + assertThat(BASIC_JSON_TESTER.from(healthResult)) + .extractingJsonPathArrayValue(REACTIVE_SERVICES) .containsExactlyInAnyOrder("kubernetes", "busybox-service"); assertThat(BASIC_JSON_TESTER.from(healthResult)).doesNotHaveJsonPath(BLOCKING_STATUS); - List services = discoveryClient.getServices().toStream().toList(); - - assertThat(services).contains("busybox-service"); - assertThat(services).contains("kubernetes"); - } /** @@ -279,6 +223,47 @@ static void filterMatchesBothNamespacesViaThePredicate(DiscoveryClient discovery } + private static void assertPodMetadata(List serviceInstances) { + + // if annotations are empty, we got the other pod, with labels here + DefaultKubernetesServiceInstance withCustomLabel = serviceInstances.stream() + .map(instance -> (DefaultKubernetesServiceInstance) instance) + .filter(x -> x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty()) + .toList() + .get(0); + List> podMetadataLabels = withCustomLabel.podMetadata() + .get("labels") + .entrySet() + .stream() + .toList(); + + assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service"); + assertThat(withCustomLabel.getInstanceId()).isNotNull(); + assertThat(withCustomLabel.getHost()).isNotNull(); + assertThat(withCustomLabel.getMetadata()) + .isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80")); + assertThat(podMetadataLabels).contains(new SimpleEntry<>("my-label", "my-value")); + + // if annotation are present, we got the one with annotations here + DefaultKubernetesServiceInstance withCustomAnnotation = serviceInstances.stream() + .map(instance -> (DefaultKubernetesServiceInstance) instance) + .filter(x -> !x.podMetadata().getOrDefault("annotations", Map.of()).isEmpty()) + .toList() + .get(0); + List> podMetadataAnnotations = withCustomAnnotation.podMetadata() + .get("annotations") + .entrySet() + .stream() + .toList(); + + assertThat(withCustomLabel.getServiceId()).isEqualTo("busybox-service"); + assertThat(withCustomLabel.getInstanceId()).isNotNull(); + assertThat(withCustomLabel.getHost()).isNotNull(); + assertThat(withCustomLabel.getMetadata()) + .isEqualTo(Map.of("k8s_namespace", "default", "type", "ClusterIP", "port.busybox-port", "80")); + assertThat(podMetadataAnnotations).contains(new SimpleEntry<>("my-annotation", "my-value")); + } + private static void waitForLogStatement(CapturedOutput output, String message) { await().pollInterval(Duration.ofSeconds(1)) .atMost(Duration.ofSeconds(30))