Skip to content

Commit 0f4a2f1

Browse files
committed
Fix flaky testWritesRejectedDueToTimeLagBreach
The test assumed that a fixed number of index+refresh iterations during the failure phase would always accumulate enough time lag to trigger backpressure rejection. If the test machine is slow then upload time moving average might be inflated, raising the dynamic threshold (avgUploadTime * 10) beyond what the time lag can reach in only 3 iterations. Replace the fixed-iteration assertThrows with an assertBusy loop that keeps indexing until the backpressure rejection occurs or the 30-second timeout is reached. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 316c070 commit 0f4a2f1

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535
import java.util.Locale;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicReference;
3738
import java.util.stream.Collectors;
3839

3940
import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
@@ -44,28 +45,23 @@ public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreM
4445
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
4546
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
4647
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
47-
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
48+
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), "failure_streak_count");
4849
}
4950

5051
public void testWritesRejectedDueToBytesLagBreach() throws Exception {
5152
// Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the
5253
// indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections.
53-
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
54+
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), "bytes_lag");
5455
}
5556

5657
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
5758
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
5859
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
59-
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
60+
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), "time_lag");
6061
}
6162

62-
private void validateBackpressure(
63-
int initialDocSize,
64-
int initialDocsToIndex,
65-
int onFailureDocSize,
66-
int onFailureDocsToIndex,
67-
String breachMode
68-
) throws Exception {
63+
private void validateBackpressure(int initialDocSize, int initialDocsToIndex, int onFailureDocSize, String breachMode)
64+
throws Exception {
6965
Path location = randomRepoPath().toAbsolutePath();
7066
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);
7167

@@ -92,10 +88,17 @@ private void validateBackpressure(
9288

9389
jsonString = generateString(onFailureDocSize);
9490
BytesReference onFailureSource = new BytesArray(jsonString);
95-
OpenSearchRejectedExecutionException ex = assertThrows(
96-
OpenSearchRejectedExecutionException.class,
97-
() -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex)
98-
);
91+
AtomicReference<OpenSearchRejectedExecutionException> exRef = new AtomicReference<>();
92+
assertBusy(() -> {
93+
try {
94+
indexDocAndRefresh(onFailureSource, 1);
95+
} catch (OpenSearchRejectedExecutionException e) {
96+
exRef.set(e);
97+
return;
98+
}
99+
fail("Expected OpenSearchRejectedExecutionException to be thrown");
100+
}, 30, TimeUnit.SECONDS);
101+
OpenSearchRejectedExecutionException ex = exRef.get();
99102
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
100103
assertTrue(ex.getMessage().contains(breachMode));
101104

0 commit comments

Comments
 (0)