diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java index 2ba7b50786f52..1a815b5fa1818 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; @@ -44,28 +45,23 @@ public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreM public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception { // Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions // fail leading to consecutive failure limit getting exceeded and leading to rejections. - validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count"); + validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), "failure_streak_count"); } public void testWritesRejectedDueToBytesLagBreach() throws Exception { // Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the // indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections. - validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag"); + validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), "bytes_lag"); } public void testWritesRejectedDueToTimeLagBreach() throws Exception { // Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the // indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections. - validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag"); + validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), "time_lag"); } - private void validateBackpressure( - int initialDocSize, - int initialDocsToIndex, - int onFailureDocSize, - int onFailureDocsToIndex, - String breachMode - ) throws Exception { + private void validateBackpressure(int initialDocSize, int initialDocsToIndex, int onFailureDocSize, String breachMode) + throws Exception { Path location = randomRepoPath().toAbsolutePath(); String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); @@ -92,10 +88,17 @@ private void validateBackpressure( jsonString = generateString(onFailureDocSize); BytesReference onFailureSource = new BytesArray(jsonString); - OpenSearchRejectedExecutionException ex = assertThrows( - OpenSearchRejectedExecutionException.class, - () -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex) - ); + AtomicReference exRef = new AtomicReference<>(); + assertBusy(() -> { + try { + indexDocAndRefresh(onFailureSource, 1); + } catch (OpenSearchRejectedExecutionException e) { + exRef.set(e); + return; + } + fail("Expected OpenSearchRejectedExecutionException to be thrown"); + }, 30, TimeUnit.SECONDS); + OpenSearchRejectedExecutionException ex = exRef.get(); assertTrue(ex.getMessage().contains("rejected execution on primary shard")); assertTrue(ex.getMessage().contains(breachMode));