Skip to content

Commit 4e87c8d

Browse files
author
Kavya Aggarwal
committed
Add tiered-storage module with stored fields prefetch support
Signed-off-by: Kavya Aggarwal <kavyaagg@amazon.com>
1 parent fb5d661 commit 4e87c8d

File tree

9 files changed

+596
-0
lines changed

9 files changed

+596
-0
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Add support for enabling pluggable data formats, starting with phase-1 of decoupling shard from engine, and introducing basic abstractions ([#20675](https://github.com/opensearch-project/OpenSearch/pull/20675))
2626

2727
- Add warmup phase to wait for lag to catch up in pull-based ingestion before serving ([#20526](https://github.com/opensearch-project/OpenSearch/pull/20526))
28+
- Add tiered-storage module with stored fields prefetch support ([#20962](https://github.com/opensearch-project/OpenSearch/pull/20962))
29+
2830
### Changed
2931
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))
3032
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
*
9+
* Modifications Copyright OpenSearch Contributors. See
10+
* GitHub history for details.
11+
*/
12+
13+
opensearchplugin {
14+
description = 'Module for tiered storage and writable warm index support'
15+
classname = 'org.opensearch.storage.TieredStoragePlugin'
16+
}
17+
18+
test {
19+
include '**/*Tests.class'
20+
include '**/*Test.class'
21+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage;
10+
11+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.common.settings.Setting;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
16+
import org.opensearch.core.xcontent.NamedXContentRegistry;
17+
import org.opensearch.env.Environment;
18+
import org.opensearch.env.NodeEnvironment;
19+
import org.opensearch.index.IndexModule;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.repositories.RepositoriesService;
22+
import org.opensearch.script.ScriptService;
23+
import org.opensearch.storage.prefetch.StoredFieldsPrefetch;
24+
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
25+
import org.opensearch.threadpool.ThreadPool;
26+
import org.opensearch.transport.client.Client;
27+
import org.opensearch.watcher.ResourceWatcherService;
28+
29+
import java.util.Collection;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.function.Supplier;
33+
34+
/**
35+
* Plugin to support writable warm index and other related features
36+
*/
37+
public class TieredStoragePlugin extends Plugin { // implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin //
38+
39+
/**
40+
* Default constructor.
41+
*/
42+
public TieredStoragePlugin() {}
43+
44+
/**
45+
* Index type for optimised downloads on hot indices.
46+
*/
47+
// public static final String HOT_BLOCK_EAGER_FETCH_INDEX_TYPE = "hot_block_eager_fetch";
48+
// private static final String REMOTE_DOWNLOAD = "remote_download";
49+
// private final SetOnce<ThreadPool> threadpool = new SetOnce<>();
50+
// public static final String TIERED_COMPOSITE_INDEX_TYPE = "tiered-storage";
51+
private TieredStoragePrefetchSettings tieredStoragePrefetchSettings;
52+
// private TierActionMetrics tierActionMetrics;
53+
54+
// private final List<Setting<?>> tieredStorageSettings = Stream.concat(
55+
// Stream.of(
56+
// // TieringUtils.H2W_MAX_CONCURRENT_TIEIRNG_REQUESTS,
57+
// // TieringUtils.W2H_MAX_CONCURRENT_TIEIRNG_REQUESTS,
58+
// // TieringUtils.JVM_USAGE_TIERING_THRESHOLD_PERCENT,
59+
// // TieringUtils.FILECACHE_ACTIVE_USAGE_TIERING_THRESHOLD_PERCENT,
60+
// TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
61+
// TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
62+
// ),
63+
// // TIERED_STORAGE_SEARCH_SLOWLOG_SETTINGS.stream()
64+
// ).toList();
65+
66+
// @Override
67+
// public Map<String, IndexStorePlugin.DirectoryFactory> getDirectoryFactories() {
68+
// return Collections.emptyMap();
69+
// }
70+
71+
// @Override
72+
// public Map<String, IndexStorePlugin.CompositeDirectoryFactory> getCompositeDirectoryFactories() {
73+
// final Map<String, IndexStorePlugin.CompositeDirectoryFactory> registry = new HashMap<>();
74+
// registry.put(HOT_BLOCK_EAGER_FETCH_INDEX_TYPE, new OSBlockHotDirectoryFactory(() -> threadpool.get()));
75+
// registry.put(TIERED_COMPOSITE_INDEX_TYPE, new TieredDirectoryFactory(getPrefetchSettingsSupplier()));
76+
// return registry;
77+
// }
78+
79+
@Override
80+
public List<Setting<?>> getSettings() {
81+
return List.of(
82+
TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
83+
TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
84+
);
85+
}
86+
87+
// public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
88+
// final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
89+
// ExecutorBuilder<?> executorBuilder = new ScalingExecutorBuilder(
90+
// REMOTE_DOWNLOAD,
91+
// 1,
92+
// ThreadPool.twiceAllocatedProcessors(allocatedProcessors),
93+
// TimeValue.timeValueMinutes(5)
94+
// );
95+
// return List.of(executorBuilder);
96+
// }
97+
98+
// public ThreadPool getThreadpool() {
99+
// return threadpool.get();
100+
// }
101+
102+
/**
103+
* Returns a supplier for the tiered storage prefetch settings.
104+
* @return supplier of {@link TieredStoragePrefetchSettings}
105+
*/
106+
public Supplier<TieredStoragePrefetchSettings> getPrefetchSettingsSupplier() {
107+
return () -> this.tieredStoragePrefetchSettings;
108+
}
109+
110+
// @Override
111+
// public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
112+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
113+
// return List.of(
114+
// new ActionHandler<>(HotToWarmTierAction.INSTANCE, TransportHotToWarmTierAction.class),
115+
// new ActionHandler<>(WarmToHotTierAction.INSTANCE, TransportWarmToHotTierAction.class),
116+
// new ActionHandler<>(CancelTieringAction.INSTANCE, TransportCancelTierAction.class),
117+
// new ActionHandler<>(ListTieringStatusAction.INSTANCE, TransportListTieringStatusAction.class),
118+
// new ActionHandler<>(GetTieringStatusAction.INSTANCE, TransportGetTieringStatusAction.class));
119+
// } else {
120+
// return List.of();
121+
// }
122+
// }
123+
124+
// @Override
125+
// public List<RestHandler> getRestHandlers(
126+
// Settings settings, RestController restController,
127+
// ClusterSettings clusterSettings,
128+
// IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
129+
// IndexNameExpressionResolver indexNameExpressionResolver,
130+
// Supplier<DiscoveryNodes> nodesInCluster
131+
// ) {
132+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
133+
// return List.of(
134+
// new RestHotToWarmTierAction(),
135+
// new RestWarmToHotTierAction(),
136+
// new RestCancelTierAction(),
137+
// new RestGetTieringStatusAction(),
138+
// new RestListTieringStatusAction());
139+
// } else {
140+
// return List.of();
141+
// }
142+
// }
143+
144+
@Override
145+
public Collection<Object> createComponents(
146+
Client client,
147+
ClusterService clusterService,
148+
ThreadPool threadPool,
149+
ResourceWatcherService resourceWatcherService,
150+
ScriptService scriptService,
151+
NamedXContentRegistry xContentRegistry,
152+
Environment environment,
153+
NodeEnvironment nodeEnvironment,
154+
NamedWriteableRegistry namedWriteableRegistry,
155+
IndexNameExpressionResolver indexNameExpressionResolver,
156+
Supplier<RepositoriesService> repositoriesServiceSupplier
157+
// Tracer tracer,
158+
// MetricsRegistry metricsRegistry
159+
) {
160+
// this.tierActionMetrics = new TierActionMetrics(metricsRegistry);
161+
this.tieredStoragePrefetchSettings = new TieredStoragePrefetchSettings(clusterService.getClusterSettings());
162+
// this.threadpool.set(threadPool);
163+
return Collections.emptyList();
164+
}
165+
166+
@Override
167+
public void onIndexModule(IndexModule indexModule) {
168+
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
169+
// indexModule.addSearchOperationListener(new TieredStorageSearchSlowLog(indexModule.getIndexSettings()));
170+
indexModule.addSearchOperationListener(new StoredFieldsPrefetch(getPrefetchSettingsSupplier()));
171+
}
172+
}
173+
174+
// @Override
175+
// public Collection<Module> createGuiceModules() {
176+
// List<Module> modules = new ArrayList<>();
177+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
178+
// modules.add(new AbstractModule() {
179+
// @Override
180+
// protected void configure() {
181+
// bind(HotToWarmTieringService.class).asEagerSingleton();
182+
// bind(WarmToHotTieringService.class).asEagerSingleton();
183+
// bind(TierActionMetrics.class).toInstance(tierActionMetrics);
184+
// }
185+
// });
186+
// }
187+
// return Collections.unmodifiableCollection(modules);
188+
// }
189+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Tiered storage plugin for writable warm index support.
11+
*/
12+
package org.opensearch.storage;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.prefetch;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.index.FilterLeafReader;
14+
import org.apache.lucene.index.LeafReader;
15+
import org.apache.lucene.index.LeafReaderContext;
16+
import org.apache.lucene.index.ReaderUtil;
17+
import org.apache.lucene.index.SegmentReader;
18+
import org.apache.lucene.index.StoredFields;
19+
import org.apache.lucene.util.BitSet;
20+
import org.opensearch.common.lucene.search.Queries;
21+
import org.opensearch.index.shard.SearchOperationListener;
22+
import org.opensearch.search.internal.SearchContext;
23+
24+
import java.io.IOException;
25+
import java.util.function.Supplier;
26+
27+
/**
28+
* Search operation listener that prefetches stored fields for tiered storage indices.
29+
*/
30+
public class StoredFieldsPrefetch implements SearchOperationListener {
31+
32+
private static final Logger log = LogManager.getLogger(StoredFieldsPrefetch.class);
33+
private final Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier;
34+
35+
/**
36+
* Creates a new StoredFieldsPrefetch instance.
37+
* @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
38+
*/
39+
public StoredFieldsPrefetch(Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier) {
40+
this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
41+
}
42+
43+
@Override
44+
public void onPreFetchPhase(SearchContext searchContext) {
45+
// Based on cluster settings
46+
if (checkIfStoredFieldsPrefetchEnabled()) {
47+
executePrefetch(searchContext);
48+
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(true);
49+
}
50+
// else {
51+
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(false);
52+
// }
53+
}
54+
55+
private void executePrefetch(SearchContext context) {
56+
int currentReaderIndex = -1;
57+
LeafReaderContext currentReaderContext = null;
58+
StoredFields currentReader = null;
59+
log.debug("Stored Field Execute prefetch was triggered: {}", context.docIdsToLoadSize());
60+
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
61+
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
62+
try {
63+
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
64+
if (currentReaderIndex != readerIndex) {
65+
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
66+
currentReaderIndex = readerIndex;
67+
68+
// Unwrap the reader here
69+
LeafReader innerLeafReader = currentReaderContext.reader();
70+
while (innerLeafReader instanceof FilterLeafReader) {
71+
innerLeafReader = ((FilterLeafReader) innerLeafReader).getDelegate();
72+
}
73+
// never be the case, just sanity check
74+
if (!(innerLeafReader instanceof SegmentReader)) {
75+
// disable prefetch on stored fields for this segment
76+
log.warn("Unexpected reader type [{}], skipping stored fields prefetch", innerLeafReader.getClass().getName());
77+
currentReader = null;
78+
continue;
79+
}
80+
currentReader = innerLeafReader.storedFields();
81+
}
82+
assert currentReaderContext != null;
83+
if (currentReader == null) {
84+
continue;
85+
}
86+
log.debug(
87+
"Prefetching stored fields for index shard: {}, docId: {}, readerIndex: {}",
88+
context.indexShard().shardId(),
89+
docId,
90+
readerIndex
91+
);
92+
93+
// nested docs logic
94+
final int subDocId = docId - currentReaderContext.docBase;
95+
final int rootDocId = findRootDocumentIfNested(context, currentReaderContext, subDocId);
96+
if (rootDocId != -1) {
97+
currentReader.prefetch(rootDocId);
98+
}
99+
currentReader.prefetch(subDocId);
100+
} catch (Exception e) {
101+
log.warn("Failed to prefetch stored fields for docId: " + docId, e);
102+
}
103+
}
104+
}
105+
106+
private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {
107+
if (context.mapperService().hasNested()) {
108+
BitSet bits = context.bitsetFilterCache().getBitSetProducer(Queries.newNonNestedFilter()).getBitSet(subReaderContext);
109+
if (bits != null && !bits.get(subDocId)) {
110+
return bits.nextSetBit(subDocId);
111+
}
112+
}
113+
return -1;
114+
}
115+
116+
private boolean checkIfStoredFieldsPrefetchEnabled() {
117+
TieredStoragePrefetchSettings settings = tieredStoragePrefetchSettingsSupplier.get();
118+
return settings != null && settings.isStoredFieldsPrefetchEnabled();
119+
}
120+
}

0 commit comments

Comments
 (0)