| 
17 | 17 | package org.springframework.cloud.kubernetes.client.discovery;  | 
18 | 18 | 
 
  | 
19 | 19 | import java.io.StringReader;  | 
 | 20 | +import java.util.Collections;  | 
 | 21 | +import java.util.concurrent.Semaphore;  | 
20 | 22 | 
 
  | 
21 | 23 | import com.github.tomakehurst.wiremock.client.WireMock;  | 
 | 24 | +import com.github.tomakehurst.wiremock.extension.Parameters;  | 
 | 25 | +import com.github.tomakehurst.wiremock.extension.PostServeAction;  | 
 | 26 | +import com.github.tomakehurst.wiremock.extension.ServeEventListener;  | 
22 | 27 | import com.github.tomakehurst.wiremock.junit5.WireMockExtension;  | 
 | 28 | +import com.github.tomakehurst.wiremock.stubbing.ServeEvent;  | 
23 | 29 | import io.kubernetes.client.openapi.ApiClient;  | 
24 | 30 | import io.kubernetes.client.openapi.JSON;  | 
25 | 31 | import io.kubernetes.client.openapi.models.V1Endpoints;  | 
26 | 32 | import io.kubernetes.client.openapi.models.V1EndpointsList;  | 
27 | 33 | import io.kubernetes.client.openapi.models.V1ListMeta;  | 
28 | 34 | import io.kubernetes.client.openapi.models.V1ObjectMeta;  | 
 | 35 | +import io.kubernetes.client.openapi.models.V1Pod;  | 
 | 36 | +import io.kubernetes.client.openapi.models.V1PodList;  | 
29 | 37 | import io.kubernetes.client.openapi.models.V1Service;  | 
30 | 38 | import io.kubernetes.client.openapi.models.V1ServiceList;  | 
31 | 39 | import io.kubernetes.client.util.ClientBuilder;  | 
 | 
69 | 77 |  */  | 
70 | 78 | class KubernetesClientInformerReactiveDiscoveryClientAutoConfigurationApplicationContextTests {  | 
71 | 79 | 
 
  | 
72 |  | -	@RegisterExtension  | 
73 |  | -	static WireMockExtension apiServer =  | 
74 |  | -		WireMockExtension.newInstance()  | 
75 |  | -			.options(options().dynamicPort())  | 
76 |  | -			.build();  | 
 | 80 | +	private static final Parameters GET_ENDPOINTS_PARAMETERS = new Parameters();  | 
77 | 81 | 
 
  | 
78 |  | -	@Configuration  | 
79 |  | -	static class ApiClientConfig {  | 
 | 82 | +	private static final Semaphore GET_ENDPOINTS_SEMAPHORE = new Semaphore(1);  | 
80 | 83 | 
 
  | 
81 |  | -		@Bean  | 
82 |  | -		@Primary  | 
83 |  | -		ApiClient apiClient() throws Exception {  | 
84 |  | -			ApiClient client = new ClientBuilder().setBasePath("http://localhost:" + apiServer.getPort()).build();  | 
85 |  | -			return client;  | 
86 |  | -		}  | 
 | 84 | +	private static final String GET_ENDPOINTS_PARAMETER_NAME = "get-endpoints";  | 
87 | 85 | 
 
  | 
88 |  | -	}  | 
 | 86 | +	@RegisterExtension  | 
 | 87 | +	private static final WireMockExtension API_SERVER =  | 
 | 88 | +		WireMockExtension.newInstance()  | 
 | 89 | +			.options(options().dynamicPort().extensions(new PostServeExtension()))  | 
 | 90 | +			.build();  | 
89 | 91 | 
 
  | 
90 | 92 | 	@BeforeEach  | 
91 |  | -	void beforeAll() {  | 
92 |  | -		apiServer.stubFor(  | 
93 |  | -			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/endpoints.*"))  | 
94 |  | -				.withQueryParam("watch", WireMock.equalTo("false"))  | 
95 |  | -				.willReturn(  | 
96 |  | -					WireMock.aResponse()  | 
97 |  | -						.withStatus(200)  | 
98 |  | -						.withBody(  | 
99 |  | -							JSON.serialize(  | 
100 |  | -								new V1EndpointsList()  | 
101 |  | -									.metadata(new V1ListMeta().resourceVersion("0"))  | 
102 |  | -									.addItemsItem(new V1Endpoints().metadata(new V1ObjectMeta().namespace("default")))  | 
103 |  | -							))));  | 
104 |  | - | 
105 |  | -		WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/endpoints.*"))  | 
106 |  | -			.withQueryParam("watch", WireMock.equalTo("true"))  | 
107 |  | -			.willReturn(aResponse().withStatus(200).withBody("{}"));  | 
108 |  | - | 
109 |  | -		apiServer.stubFor(  | 
110 |  | -			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/services.*"))  | 
111 |  | -				.withQueryParam("watch", equalTo("false"))  | 
112 |  | -				.willReturn(  | 
113 |  | -					WireMock.aResponse()  | 
114 |  | -						.withStatus(200)  | 
115 |  | -						.withBody(  | 
116 |  | -							JSON.serialize(  | 
117 |  | -								new V1ServiceList()  | 
118 |  | -									.metadata(new V1ListMeta().resourceVersion("0"))  | 
119 |  | -									.addItemsItem(new V1Service().metadata(new V1ObjectMeta().namespace("default")))  | 
120 |  | -							))));  | 
121 |  | - | 
122 |  | -		apiServer.stubFor(  | 
123 |  | -			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/services.*"))  | 
124 |  | -				.withQueryParam("watch", equalTo("true"))  | 
125 |  | -				.willReturn(aResponse().withStatus(200).withBody("{}")));  | 
 | 93 | +	void beforeAll() throws InterruptedException {  | 
 | 94 | +		mockEndpointsCall();  | 
 | 95 | +		mockServicesCall();  | 
126 | 96 | 	}  | 
127 | 97 | 
 
  | 
128 | 98 | 	private ApplicationContextRunner applicationContextRunner;  | 
@@ -519,4 +489,80 @@ private void setupWithFilteredClassLoader(String name, String... properties) {  | 
519 | 489 | 			.withPropertyValues(properties);  | 
520 | 490 | 	}  | 
521 | 491 | 
 
  | 
 | 492 | +	@Configuration  | 
 | 493 | +	static class ApiClientConfig {  | 
 | 494 | + | 
 | 495 | +		@Bean  | 
 | 496 | +		@Primary  | 
 | 497 | +		ApiClient apiClient() throws Exception {  | 
 | 498 | +			return new ClientBuilder().setBasePath("http://localhost:" + API_SERVER.getPort()).build();  | 
 | 499 | +		}  | 
 | 500 | + | 
 | 501 | +	}  | 
 | 502 | + | 
 | 503 | +	private static void mockEndpointsCall() {  | 
 | 504 | + | 
 | 505 | +		// watch=false, first call to populate watcher cache  | 
 | 506 | +		API_SERVER.stubFor(  | 
 | 507 | +			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/endpoints.*"))  | 
 | 508 | +				.withQueryParam("watch", WireMock.equalTo("false"))  | 
 | 509 | +				.willReturn(  | 
 | 510 | +					WireMock.aResponse()  | 
 | 511 | +						.withStatus(200)  | 
 | 512 | +						.withBody(  | 
 | 513 | +							JSON.serialize(  | 
 | 514 | +								new V1EndpointsList()  | 
 | 515 | +									.metadata(new V1ListMeta().resourceVersion("0"))  | 
 | 516 | +									.addItemsItem(new V1Endpoints().metadata(new V1ObjectMeta().namespace("default")))  | 
 | 517 | +							))));  | 
 | 518 | + | 
 | 519 | +		// watch=true, call to re-sync  | 
 | 520 | +		API_SERVER.stubFor(WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/endpoints.*"))  | 
 | 521 | +			.withQueryParam("watch", WireMock.equalTo("true"))  | 
 | 522 | +			.willReturn(aResponse().withStatus(200).withBody("{}")));  | 
 | 523 | +	}  | 
 | 524 | + | 
 | 525 | +	private static void mockServicesCall() throws InterruptedException {  | 
 | 526 | + | 
 | 527 | +		// watch=false, first call to populate watcher cache  | 
 | 528 | +		API_SERVER.stubFor(  | 
 | 529 | +			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/services.*"))  | 
 | 530 | +				.withQueryParam("watch", equalTo("false"))  | 
 | 531 | +				.willReturn(  | 
 | 532 | +					WireMock.aResponse()  | 
 | 533 | +						.withStatus(200)  | 
 | 534 | +						.withBody(  | 
 | 535 | +							JSON.serialize(  | 
 | 536 | +								new V1ServiceList()  | 
 | 537 | +									.metadata(new V1ListMeta().resourceVersion("0"))  | 
 | 538 | +									.addItemsItem(new V1Service().metadata(new V1ObjectMeta().namespace("default")))  | 
 | 539 | +							))));  | 
 | 540 | + | 
 | 541 | +		GET_ENDPOINTS_SEMAPHORE.acquire(1);  | 
 | 542 | +		GET_ENDPOINTS_PARAMETERS.put(GET_ENDPOINTS_PARAMETER_NAME, GET_ENDPOINTS_SEMAPHORE);  | 
 | 543 | +		// watch=true, call to re-sync  | 
 | 544 | +		API_SERVER.stubFor(  | 
 | 545 | +			WireMock.get(WireMock.urlMatching("^/api/v1/namespaces/default/services.*"))  | 
 | 546 | +				.withPostServeAction("PostServeExtension", GET_ENDPOINTS_PARAMETERS)  | 
 | 547 | +				.withQueryParam("watch", equalTo("true"))  | 
 | 548 | +				.willReturn(aResponse().withStatus(200).withBody("{}")));  | 
 | 549 | +	}  | 
 | 550 | + | 
 | 551 | +	private static final class PostServeExtension implements ServeEventListener {  | 
 | 552 | + | 
 | 553 | +		@Override  | 
 | 554 | +		public String getName() {  | 
 | 555 | +			return "PostServeExtension";  | 
 | 556 | +		}  | 
 | 557 | + | 
 | 558 | +		@Override  | 
 | 559 | +		public void afterMatch(ServeEvent serveEvent, Parameters parameters) {  | 
 | 560 | +			Object getEndpointsSemaphore = parameters.get(GET_ENDPOINTS_PARAMETER_NAME);  | 
 | 561 | +			if (getEndpointsSemaphore != null) {  | 
 | 562 | +				Semaphore semaphore = (Semaphore) getEndpointsSemaphore;  | 
 | 563 | +				semaphore.release();  | 
 | 564 | +			}  | 
 | 565 | +		}  | 
 | 566 | +	}  | 
 | 567 | + | 
522 | 568 | }  | 
0 commit comments