Skip to content

Commit 71f4671

Browse files
authored
Add FileCache Prune Rest API (#19321)
Signed-off-by: tanishq ranjan <[email protected]>
1 parent 5df1f80 commit 71f4671

File tree

13 files changed

+1666
-0
lines changed

13 files changed

+1666
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.filecache;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
14+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
15+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
16+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
17+
import org.opensearch.action.support.PlainActionFuture;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.index.IndexModule;
20+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
21+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
22+
import org.opensearch.repositories.fs.FsRepository;
23+
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
24+
import org.opensearch.transport.client.Client;
25+
26+
import java.util.concurrent.TimeUnit;
27+
28+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
29+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.greaterThan;
32+
33+
/**
34+
* Integration tests for File Cache Prune API.
35+
* Validates cache pruning with real data in cluster environment.
36+
*
37+
* @opensearch.internal
38+
*/
39+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
40+
public class PruneFileCacheIT extends AbstractSnapshotIntegTestCase {
41+
42+
@Override
43+
protected boolean addMockInternalEngine() {
44+
return false;
45+
}
46+
47+
@Override
48+
protected Settings.Builder randomRepositorySettings() {
49+
final Settings.Builder settings = Settings.builder();
50+
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
51+
settings.put(FsRepository.BASE_PATH_SETTING.getKey(), "file_cache_prune_it");
52+
return settings;
53+
}
54+
55+
/**
56+
* Tests file cache pruning with real data on single warm node.
57+
*/
58+
public void testPruneCacheWithRealData() throws Exception {
59+
final String indexName = "test-idx";
60+
final String restoredIndexName = indexName + "-copy";
61+
final String repoName = "test-repo";
62+
final String snapshotName = "test-snap";
63+
final Client client = client();
64+
65+
logger.info("--> Create index with documents on data node");
66+
internalCluster().ensureAtLeastNumDataNodes(1);
67+
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
68+
69+
logger.info("--> Create repository and take snapshot");
70+
createRepositoryWithSettings(null, repoName);
71+
takeSnapshot(client, snapshotName, repoName, indexName);
72+
deleteIndicesAndEnsureGreen(client, indexName);
73+
74+
logger.info("--> Start warm node and restore as searchable snapshot");
75+
internalCluster().ensureAtLeastNumWarmNodes(1);
76+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
77+
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
78+
79+
logger.info("--> Trigger cache population by running multiple queries");
80+
// Run multiple queries to ensure cache is populated
81+
for (int i = 0; i < 3; i++) {
82+
assertDocCount(restoredIndexName, 100L);
83+
}
84+
85+
assertBusy(() -> {
86+
long usage = getFileCacheUsage();
87+
assertTrue("Cache should be populated after index access", usage > 0);
88+
}, 30, TimeUnit.SECONDS);
89+
90+
long usageBefore = getFileCacheUsage();
91+
logger.info("--> File cache usage before prune: {} bytes", usageBefore);
92+
assertTrue("File cache should have data before prune", usageBefore > 0);
93+
94+
PruneFileCacheRequest request = new PruneFileCacheRequest();
95+
PlainActionFuture<PruneFileCacheResponse> future = new PlainActionFuture<>();
96+
client.execute(PruneFileCacheAction.INSTANCE, request, future);
97+
PruneFileCacheResponse response = future.actionGet();
98+
99+
logger.info("--> Prune response: pruned {} bytes from {} nodes", response.getTotalPrunedBytes(), response.getNodes().size());
100+
101+
// Verify response first - this is the key assertion
102+
assertNotNull("Response should not be null", response);
103+
assertEquals("Should have 1 successful node", 1, response.getNodes().size());
104+
assertEquals("Should have no failures", 0, response.failures().size());
105+
assertTrue("Operation should be successful", response.isCompletelySuccessful());
106+
assertTrue("Operation should be acknowledged", response.isAcknowledged());
107+
108+
// The key assertion: pruned bytes should be > 0 (proves API actually worked)
109+
assertTrue("Should have pruned bytes", response.getTotalPrunedBytes() > 0);
110+
111+
// Verify cache usage after prune
112+
long usageAfter = getFileCacheUsage();
113+
logger.info("--> File cache usage after prune: {} bytes", usageAfter);
114+
115+
// Cache should be reduced (might not be zero if files are still referenced)
116+
assertTrue("Cache usage should be reduced after prune", usageAfter <= usageBefore);
117+
118+
// The pruned bytes should roughly match the reduction
119+
long actualReduction = usageBefore - usageAfter;
120+
logger.info("--> Actual cache reduction: {} bytes, reported pruned: {} bytes", actualReduction, response.getTotalPrunedBytes());
121+
122+
assertDocCount(restoredIndexName, 100L);
123+
}
124+
125+
/**
126+
* Tests prune API response structure and metrics validation.
127+
*/
128+
public void testPruneResponseMetrics() throws Exception {
129+
final String indexName = "test-idx";
130+
final String restoredIndexName = indexName + "-copy";
131+
final String repoName = "test-repo";
132+
final String snapshotName = "test-snap";
133+
final Client client = client();
134+
135+
logger.info("--> Setup simple scenario to test API response metrics");
136+
internalCluster().ensureAtLeastNumDataNodes(1);
137+
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
138+
139+
createRepositoryWithSettings(null, repoName);
140+
takeSnapshot(client, snapshotName, repoName, indexName);
141+
deleteIndicesAndEnsureGreen(client, indexName);
142+
143+
internalCluster().ensureAtLeastNumWarmNodes(1);
144+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
145+
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
146+
147+
logger.info("--> Populate cache and measure before state");
148+
assertDocCount(restoredIndexName, 100L);
149+
150+
assertBusy(() -> {
151+
long usage = getFileCacheUsage();
152+
assertTrue("Cache should be populated", usage > 0);
153+
}, 30, TimeUnit.SECONDS);
154+
155+
long usageBefore = getFileCacheUsage();
156+
157+
PruneFileCacheRequest request = new PruneFileCacheRequest();
158+
PlainActionFuture<PruneFileCacheResponse> future = new PlainActionFuture<>();
159+
client.execute(PruneFileCacheAction.INSTANCE, request, future);
160+
PruneFileCacheResponse response = future.actionGet();
161+
162+
assertNotNull("Response should not be null", response);
163+
assertTrue("Should report acknowledged", response.isAcknowledged());
164+
assertEquals("Should target 1 warm node", 1, response.getNodes().size());
165+
assertEquals("Should have 0 failures", 0, response.failures().size());
166+
assertTrue("Should be successful", response.isCompletelySuccessful());
167+
168+
NodePruneFileCacheResponse nodeResponse = response.getNodes().get(0);
169+
assertNotNull("Node response should not be null", nodeResponse);
170+
assertTrue("Node should have cache capacity", nodeResponse.getCacheCapacity() > 0);
171+
assertTrue("Node should report pruned bytes", nodeResponse.getPrunedBytes() >= 0);
172+
173+
long usageAfter = getFileCacheUsage();
174+
long expectedPruned = usageBefore - usageAfter;
175+
assertEquals("Response should match actual cache reduction", expectedPruned, response.getTotalPrunedBytes());
176+
}
177+
178+
/**
179+
* Creates index with documents and ensures cluster health is green.
180+
*/
181+
private void createIndexWithDocsAndEnsureGreen(int numReplicas, int numDocs, String indexName) throws InterruptedException {
182+
createIndex(
183+
indexName,
184+
Settings.builder()
185+
.put(SETTING_NUMBER_OF_REPLICAS, numReplicas)
186+
.put(SETTING_NUMBER_OF_SHARDS, 1)
187+
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey())
188+
.build()
189+
);
190+
ensureGreen();
191+
indexRandomDocs(indexName, numDocs);
192+
ensureGreen();
193+
}
194+
195+
/**
196+
* Creates snapshot repository with optional custom settings.
197+
*/
198+
private void createRepositoryWithSettings(Settings.Builder repositorySettings, String repoName) {
199+
if (repositorySettings == null) {
200+
createRepository(repoName, FsRepository.TYPE);
201+
} else {
202+
createRepository(repoName, FsRepository.TYPE, repositorySettings);
203+
}
204+
}
205+
206+
/**
207+
* Creates snapshot and validates success.
208+
*/
209+
private void takeSnapshot(Client client, String snapshotName, String repoName, String... indices) {
210+
final var response = client.admin()
211+
.cluster()
212+
.prepareCreateSnapshot(repoName, snapshotName)
213+
.setWaitForCompletion(true)
214+
.setIndices(indices)
215+
.get();
216+
217+
assertThat(response.getSnapshotInfo().successfulShards(), greaterThan(0));
218+
assertThat(response.getSnapshotInfo().successfulShards(), equalTo(response.getSnapshotInfo().totalShards()));
219+
}
220+
221+
/**
222+
* Deletes indices and ensures cluster health is green.
223+
*/
224+
private void deleteIndicesAndEnsureGreen(Client client, String... indices) {
225+
assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged());
226+
ensureGreen();
227+
}
228+
229+
/**
230+
* Restores snapshot as searchable snapshot and ensures cluster health is green.
231+
*/
232+
private void restoreSnapshotAndEnsureGreen(Client client, String snapshotName, String repoName) {
233+
client.admin()
234+
.cluster()
235+
.prepareRestoreSnapshot(repoName, snapshotName)
236+
.setRenamePattern("(.+)")
237+
.setRenameReplacement("$1-copy")
238+
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
239+
.setWaitForCompletion(true)
240+
.execute()
241+
.actionGet();
242+
ensureGreen();
243+
}
244+
245+
/**
246+
* Validates that indices are configured as remote snapshot type.
247+
*/
248+
private void assertRemoteSnapshotIndexSettings(Client client, String... indexNames) {
249+
var settingsResponse = client.admin().indices().prepareGetSettings(indexNames).execute().actionGet();
250+
251+
assertEquals(indexNames.length, settingsResponse.getIndexToSettings().keySet().size());
252+
253+
for (String indexName : indexNames) {
254+
assertEquals(
255+
IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey(),
256+
settingsResponse.getSetting(indexName, IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
257+
);
258+
}
259+
}
260+
261+
/**
262+
* Returns total file cache usage across all warm nodes in bytes.
263+
*/
264+
private long getFileCacheUsage() {
265+
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
266+
267+
long totalUsage = 0L;
268+
for (NodeStats stats : response.getNodes()) {
269+
if (stats.getNode().isWarmNode()) {
270+
AggregateFileCacheStats fcStats = stats.getFileCacheStats();
271+
if (fcStats != null) {
272+
totalUsage += fcStats.getUsed().getBytes();
273+
}
274+
}
275+
}
276+
return totalUsage;
277+
}
278+
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.opensearch.action.admin.cluster.decommission.awareness.get.TransportGetDecommissionStateAction;
4747
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
4848
import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction;
49+
import org.opensearch.action.admin.cluster.filecache.PruneFileCacheAction;
50+
import org.opensearch.action.admin.cluster.filecache.TransportPruneFileCacheAction;
4951
import org.opensearch.action.admin.cluster.health.ClusterHealthAction;
5052
import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction;
5153
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
@@ -378,6 +380,7 @@
378380
import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction;
379381
import org.opensearch.rest.action.admin.cluster.RestNodesUsageAction;
380382
import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
383+
import org.opensearch.rest.action.admin.cluster.RestPruneCacheAction;
381384
import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction;
382385
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
383386
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
@@ -661,6 +664,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
661664
actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
662665
actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
663666
actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
667+
actions.register(PruneFileCacheAction.INSTANCE, TransportPruneFileCacheAction.class);
664668
actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
665669
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
666670
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
@@ -870,6 +874,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
870874
registerHandler.accept(new RestClusterRerouteAction(settingsFilter));
871875
registerHandler.accept(new RestClusterSearchShardsAction());
872876
registerHandler.accept(new RestPendingClusterTasksAction());
877+
// FileCache API
878+
registerHandler.accept(new RestPruneCacheAction());
873879
registerHandler.accept(new RestPutRepositoryAction());
874880
registerHandler.accept(new RestGetRepositoriesAction(settingsFilter));
875881
registerHandler.accept(new RestDeleteRepositoryAction());

0 commit comments

Comments
 (0)