Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
Expand All @@ -44,28 +45,23 @@ public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreM
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), "failure_streak_count");
}

public void testWritesRejectedDueToBytesLagBreach() throws Exception {
// Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), "bytes_lag");
}

public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 3, "time_lag");
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), "time_lag");
}

private void validateBackpressure(
int initialDocSize,
int initialDocsToIndex,
int onFailureDocSize,
int onFailureDocsToIndex,
String breachMode
) throws Exception {
private void validateBackpressure(int initialDocSize, int initialDocsToIndex, int onFailureDocSize, String breachMode)
throws Exception {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

Expand All @@ -92,10 +88,17 @@ private void validateBackpressure(

jsonString = generateString(onFailureDocSize);
BytesReference onFailureSource = new BytesArray(jsonString);
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex)
);
AtomicReference<OpenSearchRejectedExecutionException> exRef = new AtomicReference<>();
assertBusy(() -> {
try {
indexDocAndRefresh(onFailureSource, 1);
} catch (OpenSearchRejectedExecutionException e) {
exRef.set(e);
return;
}
fail("Expected OpenSearchRejectedExecutionException to be thrown");
}, 30, TimeUnit.SECONDS);
OpenSearchRejectedExecutionException ex = exRef.get();
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

Expand Down
Loading