Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/144562.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: Search
issues: []
pr: 144562
summary: Adjust the formula for "adaptive replica selection"
type: bug
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

Expand All @@ -38,6 +39,8 @@ public final class ResponseCollectorService implements ClusterStateListener {
*/
public static final double ALPHA = 0.3;

public static final FeatureFlag ARS_FORMULA_ADJUSTMENT_FEATURE_FLAG = new FeatureFlag("ars_formula_adjustment");

private final ConcurrentMap<String, NodeStatistics> nodeIdToStats = ConcurrentCollections.newConcurrentMap();

public ResponseCollectorService(ClusterService clusterService) {
Expand Down Expand Up @@ -172,8 +175,12 @@ private double innerRank(long outstandingRequests) {
// defines service time as the inverse of service rate (muBarS).
double muBarSInverse = serviceTime / FACTOR;

// The final formula
return rS - muBarSInverse + Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse;
double innerRank = Math.pow(qHatS, queueAdjustmentFactor) * muBarSInverse;
// When the feature flag is enabled, the rS - muBarSInverse term is dropped.
if (ARS_FORMULA_ADJUSTMENT_FEATURE_FLAG.isEnabled() == false) {
innerRank += rS - muBarSInverse;
}
return innerRank;
}

public double rank(long outstandingRequests) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.node.ResponseCollectorService.ComputedNodeStats;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -19,13 +20,26 @@
*/
public class ComputedNodeStatsTests extends ESTestCase {

private static final boolean ARS_ADJUSTMENT = ResponseCollectorService.ARS_FORMULA_ADJUSTMENT_FEATURE_FLAG.isEnabled();

public void testBasicInvariants() {
// When queue size estimate is 0, the rank should equal response time.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(150.0));
if (ARS_ADJUSTMENT) {
// With the adjustment, rank = qHatS^3 * muBarSInverse.
// When queueSize=0 and outstandingRequests=0: qHatS = 1, so rank = muBarSInverse = serviceTime.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(100.0));

stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(19.0));
} else {
// Without the adjustment, rank = rS - muBarSInverse + qHatS^3 * muBarSInverse.
// When queueSize=0 and outstandingRequests=0: qHatS = 1, so rank = rS.
ComputedNodeStats stats = createStats(0, 150, 100);
assertThat(stats.rank(0), equalTo(150.0));

stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(20.0));
stats = createStats(0, 20, 19);
assertThat(stats.rank(0), equalTo(20.0));
}
}

public void testParameterScaling() {
Expand All @@ -38,14 +52,16 @@ public void testParameterScaling() {
second = createStats(1, 200, 200);
assertTrue(first.rank(3) < second.rank(3));

// A larger response time should always result in a larger rank.
first = createStats(2, 150, 100);
second = createStats(2, 200, 100);
assertTrue(first.rank(1) < second.rank(1));
if (ARS_ADJUSTMENT == false) {
// A larger response time should always result in a larger rank (only when the response time term is included).
first = createStats(2, 150, 100);
second = createStats(2, 200, 100);
assertTrue(first.rank(1) < second.rank(1));

first = createStats(2, 150, 150);
second = createStats(2, 200, 150);
assertTrue(first.rank(1) < second.rank(1));
first = createStats(2, 150, 150);
second = createStats(2, 200, 150);
assertTrue(first.rank(1) < second.rank(1));
}

// More queued requests should always result in a larger rank.
first = createStats(2, 150, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,21 @@ public void testNodeRemoval() throws Exception {
assertTrue(nodeStats.containsKey("node1"));
assertFalse(nodeStats.containsKey("node2"));
}

public void testArsFormulaAdjustmentFeatureFlag() {
// 100ms response time, 10ms service time
collector.addNodeStatistics("node1", 1, 100 * 1_000_000L, 10 * 1_000_000L);
double rank = collector.getAllNodeStatistics().get("node1").rank(1);

if (ResponseCollectorService.ARS_FORMULA_ADJUSTMENT_FEATURE_FLAG.isEnabled()) {
// With the adjustment enabled, the response time component (rS - muBarSInverse) is dropped,
// so rank should equal just the queue-based term: qHatS^3 * muBarSInverse
// qHatS = 1 + 1*1 + 1 = 3, muBarSInverse = 10ms, so rank = 27 * 10 = 270
assertThat(rank, equalTo(270.0));
} else {
// Without the adjustment, rank = (rS - muBarSInverse) + qHatS^3 * muBarSInverse
// = (100 - 10) + 270 = 360
assertThat(rank, equalTo(360.0));
}
}
}
Loading