Skip to content

Commit 9bd4634

Browse files
authored
Don't consider retetion leases hold by replicas for remote store back… (#20874)
* Don't consider retention leases hold by replicas for remote store backed indices Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 7c055e3 commit 9bd4634

File tree

2 files changed

+113
-6
lines changed

2 files changed

+113
-6
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,40 @@
88

99
package org.opensearch.remotestore;
1010

11+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.admin.indices.stats.ShardStats;
14+
import org.opensearch.cluster.ClusterState;
1115
import org.opensearch.cluster.metadata.IndexMetadata;
16+
import org.opensearch.cluster.node.DiscoveryNode;
17+
import org.opensearch.cluster.routing.ShardRouting;
18+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
19+
import org.opensearch.common.Priority;
1220
import org.opensearch.common.settings.Settings;
21+
import org.opensearch.common.unit.TimeValue;
22+
import org.opensearch.index.IndexService;
23+
import org.opensearch.index.seqno.ReplicationTracker;
24+
import org.opensearch.index.seqno.RetentionLease;
25+
import org.opensearch.index.seqno.RetentionLeases;
1326
import org.opensearch.indices.recovery.IndexPrimaryRelocationIT;
1427
import org.opensearch.indices.replication.common.ReplicationType;
28+
import org.opensearch.plugins.Plugin;
29+
import org.opensearch.test.InternalSettingsPlugin;
1530
import org.opensearch.test.OpenSearchIntegTestCase;
1631

1732
import java.nio.file.Path;
18-
19-
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
33+
import java.util.Collection;
34+
import java.util.List;
35+
import java.util.stream.Collectors;
2036

2137
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2238
public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT {
2339

40+
@Override
41+
protected Collection<Class<? extends Plugin>> nodePlugins() {
42+
return List.of(InternalSettingsPlugin.class);
43+
}
44+
2445
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
2546

2647
protected Path absolutePath;
@@ -48,4 +69,85 @@ public void testPrimaryRelocationWhileIndexing() throws Exception {
4869
internalCluster().startClusterManagerOnlyNode();
4970
super.testPrimaryRelocationWhileIndexing();
5071
}
72+
73+
public void testRetentionLeaseAfterPrimaryRelocation() throws Exception {
74+
internalCluster().startClusterManagerOnlyNode();
75+
internalCluster().startDataOnlyNode();
76+
final Settings settings = Settings.builder()
77+
.put(indexSettings())
78+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "5s")
79+
.build();
80+
createIndex("test", settings);
81+
ensureGreen("test");
82+
83+
// index a doc so the shard is not empty
84+
client().prepareIndex("test").setId("1").setSource("field", "value").get();
85+
refresh("test");
86+
87+
final String dataNodeB = internalCluster().startDataOnlyNode();
88+
89+
// get source node id
90+
ClusterState state = client().admin().cluster().prepareState().get().getState();
91+
ShardRouting primaryShard = state.routingTable().shardRoutingTable("test", 0).primaryShard();
92+
final String sourceNodeId = primaryShard.currentNodeId();
93+
final DiscoveryNode targetNode = state.nodes()
94+
.getDataNodes()
95+
.values()
96+
.stream()
97+
.filter(n -> n.getId().equals(sourceNodeId) == false)
98+
.findFirst()
99+
.orElseThrow();
100+
101+
// relocate primary
102+
client().admin()
103+
.cluster()
104+
.prepareReroute()
105+
.add(new MoveAllocationCommand("test", 0, sourceNodeId, targetNode.getId()))
106+
.execute()
107+
.actionGet();
108+
ClusterHealthResponse clusterHealthResponse = client().admin()
109+
.cluster()
110+
.prepareHealth()
111+
.setTimeout(TimeValue.timeValueSeconds(60))
112+
.setWaitForEvents(Priority.LANGUID)
113+
.setWaitForNoRelocatingShards(true)
114+
.execute()
115+
.actionGet();
116+
assertFalse("timed out waiting for relocation", clusterHealthResponse.isTimedOut());
117+
ensureGreen("test");
118+
119+
// add a replica after relocation
120+
client().admin()
121+
.indices()
122+
.prepareUpdateSettings("test")
123+
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build())
124+
.get();
125+
ensureGreen("test");
126+
127+
// verify only one retention lease exists and it belongs to the primary
128+
assertBusy(() -> {
129+
final IndicesStatsResponse statsResponse = client().admin().indices().prepareStats("test").get();
130+
for (ShardStats shardStats : statsResponse.getShards()) {
131+
if (shardStats.getShardRouting().primary() == false) {
132+
continue;
133+
}
134+
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
135+
final List<RetentionLease> peerRecoveryLeases = retentionLeases.leases()
136+
.stream()
137+
.filter(l -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(l.source()))
138+
.collect(Collectors.toList());
139+
assertEquals(
140+
"expected exactly one peer recovery retention lease but got " + peerRecoveryLeases,
141+
1,
142+
peerRecoveryLeases.size()
143+
);
144+
145+
// the single lease should be for the current primary node
146+
assertEquals(
147+
ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardStats.getShardRouting().currentNodeId()),
148+
peerRecoveryLeases.get(0).id()
149+
);
150+
}
151+
});
152+
}
51153
}

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,15 @@ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boo
285285
// the primary calculates the non-expired retention leases and syncs them to replicas
286286
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
287287
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
288-
final Set<String> leaseIdsForCurrentPeers = routingTable.assignedShards()
289-
.stream()
290-
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
291-
.collect(Collectors.toSet());
288+
final Set<String> leaseIdsForCurrentPeers;
289+
if (indexSettings.isRemoteStoreEnabled()) {
290+
leaseIdsForCurrentPeers = Collections.singleton(getPeerRecoveryRetentionLeaseId(routingTable.primaryShard().currentNodeId()));
291+
} else {
292+
leaseIdsForCurrentPeers = routingTable.assignedShards()
293+
.stream()
294+
.map(ReplicationTracker::getPeerRecoveryRetentionLeaseId)
295+
.collect(Collectors.toSet());
296+
}
292297
final boolean allShardsStarted = routingTable.allShardsStarted();
293298
final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo();
294299
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases.leases()

0 commit comments

Comments
 (0)