From 3da4c3bb4473fcde72da01058b4be6875a160be4 Mon Sep 17 00:00:00 2001 From: Jens Schulze Date: Wed, 23 Apr 2025 13:11:59 +0200 Subject: [PATCH 1/3] add request specific policies --- .../java/commercetools/TimeoutTest.java | 45 +++++++++ .../api/defaultconfig/ApiRootBuilder.java | 8 ++ .../vrap/rmf/base/client/ClientBuilder.java | 19 ++++ .../rmf/base/client/http/PolicyBuilder.java | 12 +-- .../base/client/http/PolicyMiddleware.java | 22 +++++ .../client/http/RequestPolicyBuilder.java | 95 +++++++++++++++++++ .../http/RequestPolicyMiddlewareImpl.java | 51 ++++++++++ 7 files changed, 244 insertions(+), 8 deletions(-) create mode 100644 rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java create mode 100644 rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareImpl.java diff --git a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java index 86996387f3b..6d2fa89e021 100644 --- a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java +++ b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java @@ -9,8 +9,10 @@ import com.commercetools.api.defaultconfig.ApiRootBuilder; import com.commercetools.api.defaultconfig.ServiceRegion; import com.commercetools.api.models.category.Category; +import com.commercetools.api.models.product.ProductProjectionPagedSearchResponse; import commercetools.utils.CommercetoolsTestUtils; +import io.vrap.rmf.base.client.ApiHttpMethod; import io.vrap.rmf.base.client.ApiHttpResponse; import io.vrap.rmf.base.client.http.HttpStatusCode; import io.vrap.rmf.base.client.oauth2.ClientCredentials; @@ -93,4 +95,47 @@ public void timeoutWithRetryTimeout() { .getBody(); }); } + + @Test + public void requestPolicies() { + String projectKey = CommercetoolsTestUtils.getProjectKey(); + + ProjectApiRoot b = ApiRootBuilder.of() + .defaultClient(ClientCredentials.of() + .withClientId(CommercetoolsTestUtils.getClientId()) + .withClientSecret(CommercetoolsTestUtils.getClientSecret()) + .build(), + ServiceRegion.GCP_EUROPE_WEST1) + .withTelemetryMiddleware((request, next) -> next.apply(request).thenApply((response) -> { + try { + Thread.sleep(2000); // ensure timeout + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + return response; + })) + .withRequestPolicies(policies -> policies + .withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.POST), + policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(10))) + .withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.GET), + policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(1)))) + .build(projectKey); + + Assertions.assertThatExceptionOfType(TimeoutExceededException.class).isThrownBy(() -> { + Category category = b.categories() + .withId("fdbaf4ea-fbc9-4fea-bac4-1d7e6c1995b3") + .get() + .executeBlocking() + .getBody(); + }); + + ProductProjectionPagedSearchResponse searchResponse = b.productProjections() + .search() + .post() + .executeBlocking() + .getBody(); + + Assertions.assertThat(searchResponse).isNotNull(); + } } diff --git a/commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java b/commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java index c1896008d7f..286ea2779dc 100644 --- a/commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java +++ b/commercetools/commercetools-sdk-java-api/src/main/java/com/commercetools/api/defaultconfig/ApiRootBuilder.java @@ -321,6 +321,14 @@ public ApiRootBuilder withPolicies(final PolicyBuilder policyBuilder) { return with(clientBuilder -> clientBuilder.withPolicies(policyBuilder)); } + public ApiRootBuilder withRequestPolicies(final Function fn) { + return with(clientBuilder -> clientBuilder.withRequestPolicies(fn)); + } + + public ApiRootBuilder withRequestPolicies(final RequestPolicyBuilder policyBuilder) { + return with(clientBuilder -> clientBuilder.withRequestPolicies(policyBuilder)); + } + public ApiRootBuilder withPolicyMiddleware(final PolicyMiddleware policyMiddleware) { return with(clientBuilder -> clientBuilder.withPolicyMiddleware(policyMiddleware)); } diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java index e03eaf7a942..9771560ab52 100644 --- a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/ClientBuilder.java @@ -41,6 +41,7 @@ public class ClientBuilder implements Builder { private Supplier oAuthMiddleware; private Supplier retryMiddleware; private PolicyBuilder policyBuilder; + private RequestPolicyBuilder requestPolicyBuilder; private Supplier policyMiddleware; private Supplier queueMiddleware; private Supplier correlationIdMiddleware; @@ -1275,6 +1276,24 @@ public ClientBuilder withPolicies(Function fn) { return this; } + /** + * add middleware for safe handling of failed requests + * @param requestPolicyBuilder the policy builder + * @return ClientBuilder instance + */ + public ClientBuilder withRequestPolicies(RequestPolicyBuilder requestPolicyBuilder) { + this.requestPolicyBuilder = requestPolicyBuilder; + this.policyMiddleware = requestPolicyBuilder::build; + return this; + } + + public ClientBuilder withRequestPolicies(Function fn) { + this.requestPolicyBuilder = fn + .apply(Optional.ofNullable(requestPolicyBuilder).orElse(RequestPolicyBuilder.of())); + this.policyMiddleware = requestPolicyBuilder::build; + return this; + } + /** * add middleware for safe handling of failed requests * @param policyMiddleware {@link PolicyMiddleware} to be used diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java index c1b2c2b77e4..43e8dcb0705 100644 --- a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java @@ -10,9 +10,6 @@ import io.vrap.rmf.base.client.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import dev.failsafe.*; import dev.failsafe.spi.Scheduler; @@ -47,11 +44,6 @@ */ public class PolicyBuilder { - static final String loggerName = ClientBuilder.COMMERCETOOLS + ".retry"; - - private static final InternalLogger logger = InternalLogger.getLogger(loggerName); - private static final Logger classLogger = LoggerFactory.getLogger(PolicyMiddleware.class); - private final List>> policies; private final Scheduler scheduler; @@ -61,6 +53,10 @@ public PolicyBuilder() { this.scheduler = Scheduler.DEFAULT; } + List>> getPolicies() { + return policies; + } + public PolicyBuilder(List>> policies) { this.policies = policies; this.scheduler = Scheduler.DEFAULT; diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java index 6a171864cfd..d3a0936fc28 100644 --- a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyMiddleware.java @@ -2,9 +2,12 @@ package io.vrap.rmf.base.client.http; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; +import io.vrap.rmf.base.client.ApiHttpRequest; import io.vrap.rmf.base.client.ApiHttpResponse; import dev.failsafe.Policy; @@ -28,4 +31,23 @@ public static PolicyMiddleware of(final ExecutorService scheduler, final List>> policies) { return new PolicyMiddlewareImpl(Scheduler.of(scheduler), policies); } + + public static PolicyMiddleware of( + final List, List>>>> policies, + final Scheduler scheduler) { + return new RequestPolicyMiddlewareImpl(scheduler, policies); + } + + public static PolicyMiddleware of( + final List, List>>>> policies, + final ScheduledExecutorService scheduler) { + return new RequestPolicyMiddlewareImpl(Scheduler.of(scheduler), policies); + } + + public static PolicyMiddleware of( + final List, List>>>> policies, + final ExecutorService scheduler) { + return new RequestPolicyMiddlewareImpl(Scheduler.of(scheduler), policies); + } + } diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java new file mode 100644 index 00000000000..29b8685195c --- /dev/null +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java @@ -0,0 +1,95 @@ + +package io.vrap.rmf.base.client.http; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.function.Predicate; + +import io.vrap.rmf.base.client.ApiHttpRequest; +import io.vrap.rmf.base.client.ApiHttpResponse; + +import dev.failsafe.Policy; +import dev.failsafe.spi.Scheduler; + +/** + *

PolicyBuilder

+ * + *

The PolicyBuilder allows the combination of different policies for failing requests.

+ * + *

The order of policies matters. For example applying a {@link #withTimeout(Duration) timeout} before + * {@link #withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count + * against every single request even the retried ones.

+ * + *

Retry

+ * + *

Retrying on HTTP status codes

+ * + * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationStatusCodes()} + * + *

Retrying specific exceptions

+ * + * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationExceptions()} + * + *

Timeout

+ * + * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#timeoutConfiguration()} + * + *

Bulkhead

+ * + *

Implementation of a Queue to limit the number of concurrent requests handled by the client

+ * + * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#queueConfiguration()} + */ +public class RequestPolicyBuilder { + + private final Map, List>>> policies; + + private final Scheduler scheduler; + + public RequestPolicyBuilder() { + this.policies = new LinkedHashMap<>(); + this.scheduler = Scheduler.DEFAULT; + } + + public RequestPolicyBuilder(final Map, List>>> policies) { + this.policies = policies; + this.scheduler = Scheduler.DEFAULT; + } + + public RequestPolicyBuilder(final Scheduler scheduler, + final Map, List>>> policies) { + this.policies = policies; + this.scheduler = scheduler; + } + + public RequestPolicyBuilder withScheduler(final ScheduledExecutorService scheduler) { + return new RequestPolicyBuilder(Scheduler.of(scheduler), policies); + } + + public RequestPolicyBuilder withScheduler(final ExecutorService scheduler) { + return new RequestPolicyBuilder(Scheduler.of(scheduler), policies); + } + + public RequestPolicyBuilder withScheduler(final Scheduler scheduler) { + return new RequestPolicyBuilder(scheduler, policies); + } + + public RequestPolicyBuilder withRequestMatching(final Predicate predicate, + Function fn) { + Map, List>>> policiesCopy = new LinkedHashMap<>( + policies); + policiesCopy.put(predicate, fn.apply(PolicyBuilder.of()).getPolicies()); + return new RequestPolicyBuilder(scheduler, policiesCopy); + } + + public PolicyMiddleware build() { + return PolicyMiddleware.of(new ArrayList<>(policies.entrySet()), scheduler); + } + + public static RequestPolicyBuilder of() { + return new RequestPolicyBuilder(); + } +} diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareImpl.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareImpl.java new file mode 100644 index 00000000000..43e597e79d2 --- /dev/null +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareImpl.java @@ -0,0 +1,51 @@ + +package io.vrap.rmf.base.client.http; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import io.vrap.rmf.base.client.ApiHttpRequest; +import io.vrap.rmf.base.client.ApiHttpResponse; + +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.Policy; +import dev.failsafe.spi.Scheduler; + +/** + * Implementation for a failsafe requests handling + */ +public class RequestPolicyMiddlewareImpl implements AutoCloseable, PolicyMiddleware { + + private final List, FailsafeExecutor>>> executors; + + RequestPolicyMiddlewareImpl(final Scheduler scheduler, + final List, List>>>> policies) { + this.executors = policies.stream() + .map(entry -> Map.entry(entry.getKey(), Failsafe.with(entry.getValue()).with(scheduler))) + .collect(Collectors.toList()); + } + + @Override + public CompletableFuture> invoke(final ApiHttpRequest request, + final Function>> next) { + + Optional>> failsafeExecutor = executors.stream() + .filter(entry -> entry.getKey().test(request)) + .findFirst() + .map(Map.Entry::getValue); + if (failsafeExecutor.isPresent()) { + return failsafeExecutor.get().getStageAsync(() -> next.apply(request)); + } + return next.apply(request); + } + + @Override + public void close() { + } +} From 8ede97888d5e82aa8ed9a8f2997634fa5b3de201 Mon Sep 17 00:00:00 2001 From: Jens Schulze Date: Fri, 2 May 2025 06:46:45 +0200 Subject: [PATCH 2/3] update docs --- .../utils/CommercetoolsTestUtils.java | 4 +- .../src/test/java/example/ExamplesTest.java | 21 ++- .../src/test/java/example/MigrationV2.java | 5 +- .../rmf/base/client/http/PolicyBuilder.java | 2 + .../client/http/RequestPolicyBuilder.java | 24 ++- .../http/RequestPolicyMiddlewareTest.java | 162 ++++++++++++++++++ 6 files changed, 195 insertions(+), 23 deletions(-) create mode 100644 rmf/rmf-java-base/src/test/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareTest.java diff --git a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/utils/CommercetoolsTestUtils.java b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/utils/CommercetoolsTestUtils.java index 7fc34db6106..2e8d9bec906 100644 --- a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/utils/CommercetoolsTestUtils.java +++ b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/utils/CommercetoolsTestUtils.java @@ -21,8 +21,8 @@ public class CommercetoolsTestUtils { static { ApiRootBuilder builder = ApiRootBuilder.ofEnvironmentVariables() .addConcurrentModificationMiddleware() - .withPolicies( - policyBuilder -> policyBuilder.withRetry(b -> b.maxRetries(5).statusCodes(singletonList(503)))) + .withRequestPolicies(policyBuilder -> policyBuilder.withAllOtherRequests( + request -> request.withRetry(b -> b.maxRetries(5).statusCodes(singletonList(503))))) .withErrorMiddleware(ErrorMiddleware.ExceptionMode.UNWRAP_COMPLETION_EXCEPTION); projectApiRoot = builder.buildProjectRoot(); } diff --git a/commercetools/internal-docs/src/test/java/example/ExamplesTest.java b/commercetools/internal-docs/src/test/java/example/ExamplesTest.java index e46709efe5c..625c0e4804b 100644 --- a/commercetools/internal-docs/src/test/java/example/ExamplesTest.java +++ b/commercetools/internal-docs/src/test/java/example/ExamplesTest.java @@ -52,8 +52,6 @@ import org.junit.jupiter.api.Test; import org.slf4j.event.Level; -import dev.failsafe.Failsafe; -import dev.failsafe.FailsafeExecutor; import reactor.netty.http.HttpProtocol; public class ExamplesTest { @@ -121,18 +119,14 @@ public void customUrls() { } public void timeoutMiddleware() { - dev.failsafe.Timeout> timeout = dev.failsafe.Timeout - .> builder(Duration.ofSeconds(10)) - .build(); - FailsafeExecutor> failsafeExecutor = Failsafe.with(timeout); - ProjectApiRoot apiRoot = ApiRootBuilder.of() .defaultClient(ClientCredentials.of() .withClientId("your-client-id") .withClientSecret("your-client-secret") .build(), ServiceRegion.GCP_EUROPE_WEST1) - .addMiddleware((request, next) -> failsafeExecutor.getStageAsync(() -> next.apply(request))) + .withRequestPolicies( + policies -> policies.withAllOtherRequests(request -> request.withTimeout(Duration.ofSeconds(10)))) .build("my-project"); } @@ -401,8 +395,12 @@ public void retry() { ProjectApiRoot apiRoot = ApiRootBuilder.of() .defaultClient(ClientCredentials.of().withClientId("clientId").withClientSecret("clientSecret").build(), ServiceRegion.GCP_EUROPE_WEST1) - .withPolicies(policies -> policies.withRetry(builder -> builder.maxRetries(5) - .statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504)))) + .withRequestPolicies( + policies -> policies + .withAllOtherRequests( + request -> request.withRetry(builder -> builder.maxRetries(5) + .statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, + GATEWAY_TIMEOUT_504))))) .build("my-project"); } @@ -590,7 +588,8 @@ public void httpConcurrentLimitation() { public void queueConcurrentLimitation() { ApiRootBuilder.of() // ... - .withPolicies(policies -> policies.withBulkhead(64, Duration.ofSeconds(10))) + .withRequestPolicies(policies -> policies + .withAllOtherRequests(request -> request.withBulkhead(64, Duration.ofSeconds(10)))) .build(); } diff --git a/commercetools/internal-docs/src/test/java/example/MigrationV2.java b/commercetools/internal-docs/src/test/java/example/MigrationV2.java index c92270c1e78..7d717b80660 100644 --- a/commercetools/internal-docs/src/test/java/example/MigrationV2.java +++ b/commercetools/internal-docs/src/test/java/example/MigrationV2.java @@ -47,8 +47,9 @@ public void retry() { .defaultClient(ServiceRegion.GCP_EUROPE_WEST1.getApiUrl(), ClientCredentials.of().withClientId("clientId").withClientSecret("clientSecret").build(), ServiceRegion.GCP_EUROPE_WEST1.getOAuthTokenUrl()) - .withPolicies(policies -> policies.withRetry(builder -> builder - .statusCodes(Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504)))) + .withRequestPolicies(policies -> policies + .withAllOtherRequests(request -> request.withRetry(builder -> builder.statusCodes( + Arrays.asList(BAD_GATEWAY_502, SERVICE_UNAVAILABLE_503, GATEWAY_TIMEOUT_504))))) .build("projectKey"); final CategoryPagedQueryResponse body = projectClient.categories().get().executeBlocking().getBody(); diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java index 43e8dcb0705..b2145e803e9 100644 --- a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/PolicyBuilder.java @@ -18,6 +18,8 @@ * *

The PolicyBuilder allows the combination of different policies for failing requests.

* + *

In case you need different policies based on the request use the {@link RequestPolicyBuilder} instead

+ * *

The order of policies matters. For example applying a {@link #withTimeout(Duration) timeout} before * {@link #withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count * against every single request even the retried ones.

diff --git a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java index 29b8685195c..61635ce8b94 100644 --- a/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java +++ b/rmf/rmf-java-base/src/main/java/io/vrap/rmf/base/client/http/RequestPolicyBuilder.java @@ -15,33 +15,34 @@ import dev.failsafe.spi.Scheduler; /** - *

PolicyBuilder

+ *

RequestPolicyBuilder

* - *

The PolicyBuilder allows the combination of different policies for failing requests.

+ *

The RequestPolicyBuilder allows the combination of different policies for failing requests and apply them to matching + * requests.

* - *

The order of policies matters. For example applying a {@link #withTimeout(Duration) timeout} before - * {@link #withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count + *

The order of policies matters. For example applying a {@link PolicyBuilder#withTimeout(Duration) timeout} before + * {@link PolicyBuilder#withRetry(RetryPolicyBuilder)} retry} will time out across all requests whereas applying a timeout after the retry count * against every single request even the retried ones.

* *

Retry

* *

Retrying on HTTP status codes

* - * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationStatusCodes()} + * {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#retryConfigurationStatusCodes()} * *

Retrying specific exceptions

* - * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#retryConfigurationExceptions()} + * {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#retryConfigurationExceptions()} * *

Timeout

* - * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#timeoutConfiguration()} + * {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#timeoutConfiguration()} * *

Bulkhead

* *

Implementation of a Queue to limit the number of concurrent requests handled by the client

* - * {@include.example io.vrap.rmf.base.client.http.PolicyMiddlewareTest#queueConfiguration()} + * {@include.example io.vrap.rmf.base.client.http.RequestPolicyMiddlewareTest#queueConfiguration()} */ public class RequestPolicyBuilder { @@ -85,6 +86,13 @@ public RequestPolicyBuilder withRequestMatching(final Predicate return new RequestPolicyBuilder(scheduler, policiesCopy); } + public RequestPolicyBuilder withAllOtherRequests(Function fn) { + Map, List>>> policiesCopy = new LinkedHashMap<>( + policies); + policiesCopy.put(apiHttpRequest -> true, fn.apply(PolicyBuilder.of()).getPolicies()); + return new RequestPolicyBuilder(scheduler, policiesCopy); + } + public PolicyMiddleware build() { return PolicyMiddleware.of(new ArrayList<>(policies.entrySet()), scheduler); } diff --git a/rmf/rmf-java-base/src/test/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareTest.java b/rmf/rmf-java-base/src/test/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareTest.java new file mode 100644 index 00000000000..1f28102312d --- /dev/null +++ b/rmf/rmf-java-base/src/test/java/io/vrap/rmf/base/client/http/RequestPolicyMiddlewareTest.java @@ -0,0 +1,162 @@ + +package io.vrap.rmf.base.client.http; + +import static io.vrap.rmf.base.client.utils.ClientUtils.blockingWait; +import static java.util.Collections.singletonList; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; + +import io.vrap.rmf.base.client.*; +import io.vrap.rmf.base.client.utils.json.JsonException; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import dev.failsafe.TimeoutBuilder; + +public class RequestPolicyMiddlewareTest { + + @Test + public void testWithStatusCode() { + PolicyMiddleware middleware = RequestPolicyBuilder.of() + .withRequestMatching(request -> request.getMethod() == ApiHttpMethod.GET, + request -> request.withRetry(builder -> builder.maxRetries(1))) + .build(); + + final ApiHttpRequest request = new ApiHttpRequest().withMethod(ApiHttpMethod.GET); + AtomicInteger count = new AtomicInteger(); + + final ApiHttpResponse apiHttpResponse = blockingWait(middleware.invoke(request, request1 -> { + count.getAndIncrement(); + return CompletableFuture.completedFuture(new ApiHttpResponse<>(503, null, null)); + }), Duration.ofSeconds(1)); + Assertions.assertThat(apiHttpResponse.getStatusCode()).isEqualTo(503); + Assertions.assertThat(count.get()).isEqualTo(2); + } + + @Test + public void testWithCoveredException() { + PolicyMiddleware middleware = RequestPolicyBuilder.of() + .withRequestMatching(request -> request.getMethod() == ApiHttpMethod.GET, + request -> request.withRetry(builder -> builder.maxRetries(1))) + .build(); + + final URI uri = URI.create("https://www.commercetools.com/"); + + final ApiHttpRequest request = new ApiHttpRequest(ApiHttpMethod.GET, uri, null, null); + AtomicInteger count = new AtomicInteger(); + + Assertions.assertThatExceptionOfType(ApiHttpException.class).isThrownBy(() -> { + blockingWait(middleware.invoke(request, request1 -> { + count.getAndIncrement(); + return CompletableFuture.supplyAsync(() -> { + throw new CompletionException(new ApiHttpException(503, null, null, null, null, request)); + }); + }), Duration.ofSeconds(1)); + }).matches(e -> e.getStatusCode() == 503); + Assertions.assertThat(count.get()).isEqualTo(2); + } + + @Test + public void testWithCoveredExceptionResponse() { + PolicyMiddleware middleware = RequestPolicyBuilder.of() + .withRequestMatching(request -> request.getMethod() == ApiHttpMethod.GET, + request -> request.withRetry(builder -> builder.maxRetries(1))) + .build(); + + final URI uri = URI.create("https://www.commercetools.com/"); + + final ApiHttpRequest request = new ApiHttpRequest(ApiHttpMethod.GET, uri, null, null); + AtomicInteger count = new AtomicInteger(); + final ApiHttpResponse response = new ApiHttpResponse<>(503, new ApiHttpHeaders(), + "{\"hello\": \"world\"}".getBytes(StandardCharsets.UTF_8), "Oops!"); + Assertions.assertThatExceptionOfType(ApiHttpException.class).isThrownBy(() -> { + blockingWait(middleware.invoke(request, request1 -> { + count.getAndIncrement(); + return CompletableFuture.supplyAsync(() -> { + throw new CompletionException(new ApiHttpException(503, null, null, null, response, request)); + }); + }), Duration.ofSeconds(1)); + }).matches(e -> e.getStatusCode() == 503); + Assertions.assertThat(count.get()).isEqualTo(2); + } + + @Test + public void testUncoveredException() { + PolicyMiddleware middleware = RequestPolicyBuilder.of() + .withRequestMatching(request -> request.getMethod() == ApiHttpMethod.GET, + request -> request.withRetry(builder -> builder.maxRetries(1))) + .build(); + + final ApiHttpRequest request = new ApiHttpRequest(); + AtomicInteger count = new AtomicInteger(); + + Assertions.assertThatExceptionOfType(ApiHttpException.class).isThrownBy(() -> { + blockingWait(middleware.invoke(request, request1 -> { + count.getAndIncrement(); + return CompletableFuture.supplyAsync(() -> { + throw new CompletionException(new ApiHttpException(504, null, null, null, null, request)); + }); + }), Duration.ofSeconds(1)); + }).matches(e -> e.getStatusCode() == 504); + Assertions.assertThat(count.get()).isEqualTo(1); + } + + @Test + public void testRetrySuccess() { + PolicyMiddleware middleware = RequestPolicyBuilder.of() + .withRequestMatching(request -> request.getMethod() == ApiHttpMethod.GET, + request -> request.withRetry(builder -> builder.maxRetries(1))) + .build(); + + final ApiHttpRequest request = new ApiHttpRequest(); + AtomicInteger count = new AtomicInteger(); + + final ApiHttpResponse apiHttpResponse = blockingWait(middleware.invoke(request, request1 -> { + count.getAndIncrement(); + return CompletableFuture.completedFuture(new ApiHttpResponse<>(200, null, null)); + }), Duration.ofSeconds(1)); + Assertions.assertThat(apiHttpResponse.getStatusCode()).isEqualTo(200); + Assertions.assertThat(count.get()).isEqualTo(1); + } + + public void retryConfigurationStatusCodes() { + final ApiHttpClient build = ClientBuilder.of() + // ... + .withRequestPolicies(policyBuilder -> policyBuilder + .withAllOtherRequests(request -> request.withRetry(retry -> retry.maxRetries(3) + .statusCodes(Arrays.asList(HttpStatusCode.SERVICE_UNAVAILABLE_503, + HttpStatusCode.INTERNAL_SERVER_ERROR_500))))) + .build(); + } + + public void retryConfigurationExceptions() { + final ApiHttpClient build = ClientBuilder.of() + // ... + .withRequestPolicies(policyBuilder -> policyBuilder.withAllOtherRequests(request -> request + .withRetry(retry -> retry.maxRetries(3).failures(singletonList(JsonException.class))))) + .build(); + } + + public void queueConfiguration() { + final ApiHttpClient build = ClientBuilder.of() + // ... + .withRequestPolicies(policyBuilder -> policyBuilder + .withAllOtherRequests(request -> request.withBulkhead(64, Duration.ofSeconds(10)))) + .build(); + } + + public void timeoutConfiguration() { + final ApiHttpClient build = ClientBuilder.of() + // ... + .withRequestPolicies(policyBuilder -> policyBuilder.withAllOtherRequests( + request -> request.withTimeout(Duration.ofSeconds(10), TimeoutBuilder::withInterrupt))) + .build(); + } +} From 83f6d625431457fb8a5c11f67e7adec818eea3c7 Mon Sep 17 00:00:00 2001 From: Jens Schulze Date: Fri, 2 May 2025 06:49:39 +0200 Subject: [PATCH 3/3] update test --- .../src/integrationTest/java/commercetools/TimeoutTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java index 6d2fa89e021..eabac6f2e44 100644 --- a/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java +++ b/commercetools/commercetools-sdk-java-api/src/integrationTest/java/commercetools/TimeoutTest.java @@ -119,7 +119,8 @@ public void requestPolicies() { .withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.POST), policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(10))) .withRequestMatching(apiHttpRequest -> apiHttpRequest.getMethod().equals(ApiHttpMethod.GET), - policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(1)))) + policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(1))) + .withAllOtherRequests(policyBuilder -> policyBuilder.withTimeout(Duration.ofSeconds(60)))) .build(projectKey); Assertions.assertThatExceptionOfType(TimeoutExceededException.class).isThrownBy(() -> {