Skip to content

Commit 14faf38

Browse files
authored
Make stale segments cleanup logic depend on segments map size as well (#20976)
Signed-off-by: rayshrey <rayshrey@amazon.com>
1 parent 2ea2500 commit 14faf38

File tree

7 files changed

+192
-1
lines changed

7 files changed

+192
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6363
- Delegate getMin/getMax methods for ExitableTerms ([#20775](https://github.com/opensearch-project/OpenSearch/pull/20775))
6464
- Fix terms lookup subquery fetch limit reading from non-existent index setting instead of cluster `max_clause_count` ([#20823](https://github.com/opensearch-project/OpenSearch/pull/20823))
6565
- Fix array_index_out_of_bounds_exception with wildcard and aggregations ([#20842](https://github.com/opensearch-project/OpenSearch/pull/20842))
66+
- Fix stale segment cleanup logic for remote store ([#20976](https://github.com/opensearch-project/OpenSearch/pull/20976))
6667
- Ensure that transient ThreadContext headers with propagators survive restore ([#169373](https://github.com/opensearch-project/OpenSearch/pull/20854))
6768
- Handle dependencies between analyzers ([#19248](https://github.com/opensearch-project/OpenSearch/pull/19248))
6869
- Fix `_field_caps` returning empty results and corrupted field names for `disable_objects: true` mappings ([#20800](https://github.com/opensearch-project/OpenSearch/pull/20800))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ public void apply(Settings value, Settings current, Settings previous) {
819819
SearchService.CONCURRENT_SEGMENT_SEARCH_PARTITION_MIN_SEGMENT_SIZE,
820820

821821
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
822+
RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING,
822823
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
823824
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
824825
RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING,

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ private boolean syncSegments() {
244244
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
245245
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
246246
// This is done to avoid delete post each refresh.
247-
if (isRefreshAfterCommit()) {
247+
// Also trigger cleanup if the uploaded segments map exceeds the configured threshold,
248+
// to prevent unbounded memory growth when flushes do not happen.
249+
if (isRefreshAfterCommit() || uploadedSegmentsMapExceedsThreshold()) {
248250
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
249251
}
250252

@@ -449,6 +451,11 @@ private boolean isRefreshAfterCommitSafe() {
449451
return false;
450452
}
451453

454+
private boolean uploadedSegmentsMapExceedsThreshold() {
455+
int threshold = indexShard.getRemoteStoreSettings().getUploadedSegmentsCleanupThreshold();
456+
return threshold != -1 && remoteDirectory.getSegmentsUploadedToRemoteStoreSize() > threshold;
457+
}
458+
452459
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
453460
throws IOException {
454461
final long maxSeqNo = indexShard.getIndexer().currentOngoingRefreshCheckpoint();

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,10 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
893893
return Collections.unmodifiableMap(this.segmentsUploadedToRemoteStore);
894894
}
895895

896+
public int getSegmentsUploadedToRemoteStoreSize() {
897+
return segmentsUploadedToRemoteStore.size();
898+
}
899+
896900
// Visible for testing
897901
Set<String> getMetadataFilesToFilterActiveSegments(
898902
final int lastNMetadataFilesToKeep,

server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
@PublicApi(since = "2.14.0")
2727
public class RemoteStoreSettings {
2828
private static final int MIN_CLUSTER_REMOTE_MAX_TRANSLOG_READERS = 100;
29+
private static final int MIN_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD = 100;
30+
private static final int MAX_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD = 100000;
31+
private static final int DEFAULT_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD = 1000;
2932

3033
/**
3134
* Used to specify the default translog buffer interval for remote store backed indexes.
@@ -174,6 +177,29 @@ public class RemoteStoreSettings {
174177
Property.Final
175178
);
176179

180+
/**
181+
* Controls the threshold for the number of segments uploaded to remote store map.
182+
* When the map size exceeds this threshold, stale segment cleanup is triggered even without a flush/commit.
183+
* {@code -1} disables threshold-based cleanup.
184+
*/
185+
public static final Setting<Integer> CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING = Setting.intSetting(
186+
"cluster.remote_store.uploaded_segments_cleanup_threshold",
187+
DEFAULT_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD,
188+
-1,
189+
v -> {
190+
if (v != -1 && (v < MIN_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD || v > MAX_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD)) {
191+
throw new IllegalArgumentException(
192+
"Value must be -1 or between "
193+
+ MIN_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD
194+
+ " and "
195+
+ MAX_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD
196+
);
197+
}
198+
},
199+
Property.NodeScope,
200+
Property.Dynamic
201+
);
202+
177203
/**
178204
* Controls the fixed prefix for the segments path on remote store.
179205
*/
@@ -208,6 +234,7 @@ public class RemoteStoreSettings {
208234
private static volatile TimeValue pinnedTimestampsLookbackInterval;
209235
private final String translogPathFixedPrefix;
210236
private final String segmentsPathFixedPrefix;
237+
private volatile int uploadedSegmentsCleanupThreshold;
211238

212239
public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
213240
clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
@@ -255,6 +282,12 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
255282

256283
translogPathFixedPrefix = CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(settings);
257284
segmentsPathFixedPrefix = CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(settings);
285+
286+
uploadedSegmentsCleanupThreshold = CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.get(settings);
287+
clusterSettings.addSettingsUpdateConsumer(
288+
CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING,
289+
this::setUploadedSegmentsCleanupThreshold
290+
);
258291
}
259292

260293
public TimeValue getClusterRemoteTranslogBufferInterval() {
@@ -355,4 +388,12 @@ public String getTranslogPathFixedPrefix() {
355388
public String getSegmentsPathFixedPrefix() {
356389
return segmentsPathFixedPrefix;
357390
}
391+
392+
public int getUploadedSegmentsCleanupThreshold() {
393+
return uploadedSegmentsCleanupThreshold;
394+
}
395+
396+
private void setUploadedSegmentsCleanupThreshold(int uploadedSegmentsCleanupThreshold) {
397+
this.uploadedSegmentsCleanupThreshold = uploadedSegmentsCleanupThreshold;
398+
}
358399
}

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,4 +906,78 @@ public void testRemoteSegmentStoreNotInSync() throws IOException {
906906
}
907907
}
908908

909+
public void testCleanupTriggeredWhenMapExceedsThreshold() throws IOException {
910+
int threshold = 10;
911+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = setupDirectoryWithThreshold(threshold);
912+
913+
indexAndRefreshWithoutFlush(100);
914+
915+
int mapSize = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStoreSize();
916+
assertTrue("Map size should be bounded by threshold cleanup, but was: " + mapSize, mapSize < 100);
917+
}
918+
919+
public void testCleanupNotTriggeredWhenThresholdDisabled() throws IOException {
920+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = setupDirectoryWithThreshold(-1);
921+
int initialMapSize = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStoreSize();
922+
923+
indexAndRefreshWithoutFlush(100);
924+
925+
int finalMapSize = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStoreSize();
926+
assertTrue(
927+
"Map size should have grown with threshold disabled, initial=" + initialMapSize + " final=" + finalMapSize,
928+
finalMapSize > initialMapSize
929+
);
930+
}
931+
932+
private RemoteSegmentStoreDirectory setupDirectoryWithThreshold(int threshold) throws IOException {
933+
indexShard = newStartedShard(
934+
true,
935+
Settings.builder()
936+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
937+
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs")
938+
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs")
939+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
940+
.build(),
941+
new InternalEngineFactory()
942+
);
943+
944+
indexDocs(1, 3);
945+
indexShard.refresh("test");
946+
947+
clusterService = ClusterServiceUtils.createClusterService(
948+
Settings.EMPTY,
949+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
950+
threadPool
951+
);
952+
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY);
953+
remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard);
954+
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
955+
956+
RemoteStoreSettings mockSettings = mock(RemoteStoreSettings.class);
957+
when(mockSettings.getUploadedSegmentsCleanupThreshold()).thenReturn(threshold);
958+
when(mockSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
959+
when(mockSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMinutes(30));
960+
961+
IndexShard spyShard = spy(indexShard);
962+
when(spyShard.getRemoteStoreSettings()).thenReturn(mockSettings);
963+
964+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(
965+
spyShard,
966+
SegmentReplicationCheckpointPublisher.EMPTY,
967+
tracker,
968+
mockSettings
969+
);
970+
971+
return (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
972+
.getDelegate();
973+
}
974+
975+
private void indexAndRefreshWithoutFlush(int iterations) throws IOException {
976+
for (int i = 0; i < iterations; i++) {
977+
indexDocs(10 + (i * 5), 5);
978+
indexShard.refresh("test");
979+
remoteStoreRefreshListener.afterRefresh(true);
980+
}
981+
}
982+
909983
}

server/src/test/java/org/opensearch/indices/RemoteStoreSettingsDynamicUpdateTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,67 @@ public void testDisableMaxRemoteReferencedTranslogFiles() {
127127
);
128128
assertEquals(-1, remoteStoreSettings.getMaxRemoteTranslogReaders());
129129
}
130+
131+
public void testUploadedSegmentsCleanupThreshold() {
132+
// Test default value
133+
assertEquals(1000, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
134+
135+
// Test override with valid value
136+
clusterSettings.applySettings(
137+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 5000).build()
138+
);
139+
assertEquals(5000, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
140+
141+
// Test disable with -1
142+
clusterSettings.applySettings(
143+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), -1).build()
144+
);
145+
assertEquals(-1, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
146+
147+
// Test value below -1 should fail
148+
assertThrows(
149+
IllegalArgumentException.class,
150+
() -> clusterSettings.applySettings(
151+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), -5).build()
152+
)
153+
);
154+
assertEquals(-1, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
155+
156+
// Test value below minimum (but not -1) should fail
157+
assertThrows(
158+
IllegalArgumentException.class,
159+
() -> clusterSettings.applySettings(
160+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 50).build()
161+
)
162+
);
163+
164+
// Test value above maximum should fail
165+
assertThrows(
166+
IllegalArgumentException.class,
167+
() -> clusterSettings.applySettings(
168+
Settings.builder()
169+
.put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 200000)
170+
.build()
171+
)
172+
);
173+
174+
// Test boundary values
175+
clusterSettings.applySettings(
176+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 100).build()
177+
);
178+
assertEquals(100, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
179+
180+
clusterSettings.applySettings(
181+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 100000).build()
182+
);
183+
assertEquals(100000, remoteStoreSettings.getUploadedSegmentsCleanupThreshold());
184+
185+
// Test 0 should fail (not -1 and below minimum)
186+
assertThrows(
187+
IllegalArgumentException.class,
188+
() -> clusterSettings.applySettings(
189+
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_UPLOADED_SEGMENTS_CLEANUP_THRESHOLD_SETTING.getKey(), 0).build()
190+
)
191+
);
192+
}
130193
}

0 commit comments

Comments
 (0)