Skip to content

Commit eb42398

Browse files
author
Kavya Aggarwal
committed
Add tiered-storage module with stored fields prefetch support
Signed-off-by: Kavya Aggarwal <kavyaagg@amazon.com>
1 parent fb5d661 commit eb42398

File tree

8 files changed

+591
-0
lines changed

8 files changed

+591
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
*
9+
* Modifications Copyright OpenSearch Contributors. See
10+
* GitHub history for details.
11+
*/
12+
13+
opensearchplugin {
14+
description = 'Module for tiered storage and writable warm index support'
15+
classname = 'org.opensearch.storage.TieredStoragePlugin'
16+
}
17+
18+
test {
19+
include '**/*Tests.class'
20+
include '**/*Test.class'
21+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage;
10+
11+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
12+
import org.opensearch.cluster.service.ClusterService;
13+
import org.opensearch.common.settings.Setting;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
16+
import org.opensearch.core.xcontent.NamedXContentRegistry;
17+
import org.opensearch.env.Environment;
18+
import org.opensearch.env.NodeEnvironment;
19+
import org.opensearch.index.IndexModule;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.repositories.RepositoriesService;
22+
import org.opensearch.script.ScriptService;
23+
import org.opensearch.storage.prefetch.StoredFieldsPrefetch;
24+
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
25+
import org.opensearch.threadpool.ThreadPool;
26+
import org.opensearch.transport.client.Client;
27+
import org.opensearch.watcher.ResourceWatcherService;
28+
29+
import java.util.Collection;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.function.Supplier;
33+
34+
/**
35+
* Plugin to support writable warm index and other related features
36+
*/
37+
public class TieredStoragePlugin extends Plugin { // implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin //
38+
39+
/**
40+
* Default constructor.
41+
*/
42+
public TieredStoragePlugin() {}
43+
44+
/**
45+
* Index type for optimised downloads on hot indices.
46+
*/
47+
// public static final String HOT_BLOCK_EAGER_FETCH_INDEX_TYPE = "hot_block_eager_fetch";
48+
// private static final String REMOTE_DOWNLOAD = "remote_download";
49+
// private final SetOnce<ThreadPool> threadpool = new SetOnce<>();
50+
// public static final String TIERED_COMPOSITE_INDEX_TYPE = "tiered-storage";
51+
private TieredStoragePrefetchSettings tieredStoragePrefetchSettings;
52+
// private TierActionMetrics tierActionMetrics;
53+
54+
// private final List<Setting<?>> tieredStorageSettings = Stream.concat(
55+
// Stream.of(
56+
// // TieringUtils.H2W_MAX_CONCURRENT_TIEIRNG_REQUESTS,
57+
// // TieringUtils.W2H_MAX_CONCURRENT_TIEIRNG_REQUESTS,
58+
// // TieringUtils.JVM_USAGE_TIERING_THRESHOLD_PERCENT,
59+
// // TieringUtils.FILECACHE_ACTIVE_USAGE_TIERING_THRESHOLD_PERCENT,
60+
// TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
61+
// TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
62+
// ),
63+
// // TIERED_STORAGE_SEARCH_SLOWLOG_SETTINGS.stream()
64+
// ).toList();
65+
66+
// @Override
67+
// public Map<String, IndexStorePlugin.DirectoryFactory> getDirectoryFactories() {
68+
// return Collections.emptyMap();
69+
// }
70+
71+
// @Override
72+
// public Map<String, IndexStorePlugin.CompositeDirectoryFactory> getCompositeDirectoryFactories() {
73+
// final Map<String, IndexStorePlugin.CompositeDirectoryFactory> registry = new HashMap<>();
74+
// registry.put(HOT_BLOCK_EAGER_FETCH_INDEX_TYPE, new OSBlockHotDirectoryFactory(() -> threadpool.get()));
75+
// registry.put(TIERED_COMPOSITE_INDEX_TYPE, new TieredDirectoryFactory(getPrefetchSettingsSupplier()));
76+
// return registry;
77+
// }
78+
79+
@Override
80+
public List<Setting<?>> getSettings() {
81+
return List.of(
82+
TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
83+
TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
84+
);
85+
}
86+
87+
// public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
88+
// final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
89+
// ExecutorBuilder<?> executorBuilder = new ScalingExecutorBuilder(
90+
// REMOTE_DOWNLOAD,
91+
// 1,
92+
// ThreadPool.twiceAllocatedProcessors(allocatedProcessors),
93+
// TimeValue.timeValueMinutes(5)
94+
// );
95+
// return List.of(executorBuilder);
96+
// }
97+
98+
// public ThreadPool getThreadpool() {
99+
// return threadpool.get();
100+
// }
101+
102+
/**
103+
* Returns a supplier for the tiered storage prefetch settings.
104+
* @return supplier of {@link TieredStoragePrefetchSettings}
105+
*/
106+
public Supplier<TieredStoragePrefetchSettings> getPrefetchSettingsSupplier() {
107+
return () -> this.tieredStoragePrefetchSettings;
108+
}
109+
110+
// @Override
111+
// public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
112+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
113+
// return List.of(
114+
// new ActionHandler<>(HotToWarmTierAction.INSTANCE, TransportHotToWarmTierAction.class),
115+
// new ActionHandler<>(WarmToHotTierAction.INSTANCE, TransportWarmToHotTierAction.class),
116+
// new ActionHandler<>(CancelTieringAction.INSTANCE, TransportCancelTierAction.class),
117+
// new ActionHandler<>(ListTieringStatusAction.INSTANCE, TransportListTieringStatusAction.class),
118+
// new ActionHandler<>(GetTieringStatusAction.INSTANCE, TransportGetTieringStatusAction.class));
119+
// } else {
120+
// return List.of();
121+
// }
122+
// }
123+
124+
// @Override
125+
// public List<RestHandler> getRestHandlers(
126+
// Settings settings, RestController restController,
127+
// ClusterSettings clusterSettings,
128+
// IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
129+
// IndexNameExpressionResolver indexNameExpressionResolver,
130+
// Supplier<DiscoveryNodes> nodesInCluster
131+
// ) {
132+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
133+
// return List.of(
134+
// new RestHotToWarmTierAction(),
135+
// new RestWarmToHotTierAction(),
136+
// new RestCancelTierAction(),
137+
// new RestGetTieringStatusAction(),
138+
// new RestListTieringStatusAction());
139+
// } else {
140+
// return List.of();
141+
// }
142+
// }
143+
144+
@Override
145+
public Collection<Object> createComponents(
146+
Client client,
147+
ClusterService clusterService,
148+
ThreadPool threadPool,
149+
ResourceWatcherService resourceWatcherService,
150+
ScriptService scriptService,
151+
NamedXContentRegistry xContentRegistry,
152+
Environment environment,
153+
NodeEnvironment nodeEnvironment,
154+
NamedWriteableRegistry namedWriteableRegistry,
155+
IndexNameExpressionResolver indexNameExpressionResolver,
156+
Supplier<RepositoriesService> repositoriesServiceSupplier
157+
// Tracer tracer,
158+
// MetricsRegistry metricsRegistry
159+
) {
160+
// this.tierActionMetrics = new TierActionMetrics(metricsRegistry);
161+
this.tieredStoragePrefetchSettings = new TieredStoragePrefetchSettings(clusterService.getClusterSettings());
162+
// this.threadpool.set(threadPool);
163+
return Collections.emptyList();
164+
}
165+
166+
@Override
167+
public void onIndexModule(IndexModule indexModule) {
168+
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
169+
// indexModule.addSearchOperationListener(new TieredStorageSearchSlowLog(indexModule.getIndexSettings()));
170+
indexModule.addSearchOperationListener(new StoredFieldsPrefetch(getPrefetchSettingsSupplier()));
171+
}
172+
}
173+
174+
// @Override
175+
// public Collection<Module> createGuiceModules() {
176+
// List<Module> modules = new ArrayList<>();
177+
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
178+
// modules.add(new AbstractModule() {
179+
// @Override
180+
// protected void configure() {
181+
// bind(HotToWarmTieringService.class).asEagerSingleton();
182+
// bind(WarmToHotTieringService.class).asEagerSingleton();
183+
// bind(TierActionMetrics.class).toInstance(tierActionMetrics);
184+
// }
185+
// });
186+
// }
187+
// return Collections.unmodifiableCollection(modules);
188+
// }
189+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Tiered storage plugin for writable warm index support.
11+
*/
12+
package org.opensearch.storage;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.prefetch;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.index.FilterLeafReader;
14+
import org.apache.lucene.index.LeafReader;
15+
import org.apache.lucene.index.LeafReaderContext;
16+
import org.apache.lucene.index.ReaderUtil;
17+
import org.apache.lucene.index.SegmentReader;
18+
import org.apache.lucene.index.StoredFields;
19+
import org.apache.lucene.util.BitSet;
20+
import org.opensearch.ExceptionsHelper;
21+
import org.opensearch.common.lucene.search.Queries;
22+
import org.opensearch.index.shard.SearchOperationListener;
23+
import org.opensearch.search.internal.SearchContext;
24+
25+
import java.io.IOException;
26+
import java.util.function.Supplier;
27+
28+
/**
29+
* Search operation listener that prefetches stored fields for tiered storage indices.
30+
*/
31+
public class StoredFieldsPrefetch implements SearchOperationListener {
32+
33+
private static final Logger log = LogManager.getLogger(StoredFieldsPrefetch.class);
34+
private final Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier;
35+
36+
/**
37+
* Creates a new StoredFieldsPrefetch instance.
38+
* @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
39+
*/
40+
public StoredFieldsPrefetch(Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier) {
41+
this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
42+
}
43+
44+
@Override
45+
public void onPreFetchPhase(SearchContext searchContext) {
46+
// Based on cluster settings
47+
if (checkIfStoredFieldsPrefetchEnabled()) {
48+
executePrefetch(searchContext);
49+
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(true);
50+
}
51+
// else {
52+
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(false);
53+
// }
54+
}
55+
56+
private void executePrefetch(SearchContext context) {
57+
int currentReaderIndex = -1;
58+
LeafReaderContext currentReaderContext = null;
59+
StoredFields currentReader = null;
60+
log.info("Stored Field Execute prefetch was triggered: {}", context.docIdsToLoadSize());
61+
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
62+
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
63+
try {
64+
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
65+
if (currentReaderIndex != readerIndex) {
66+
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
67+
currentReaderIndex = readerIndex;
68+
69+
// Unwrap the reader here
70+
LeafReader innerLeafReader = currentReaderContext.reader();
71+
while (innerLeafReader instanceof FilterLeafReader) {
72+
innerLeafReader = ((FilterLeafReader) innerLeafReader).getDelegate();
73+
}
74+
// never be the case, just sanity check
75+
if (!(innerLeafReader instanceof SegmentReader)) {
76+
// disable prefetch on stored fields for this case
77+
return;
78+
}
79+
currentReader = innerLeafReader.storedFields();
80+
}
81+
assert currentReaderContext != null;
82+
log.debug(
83+
"Prefetching stored fields for index shard: "
84+
+ context.indexShard().shardId()
85+
+ ", docId: "
86+
+ docId
87+
+ " readerIndex: "
88+
+ readerIndex
89+
);
90+
91+
// nested docs logic
92+
final int subDocId = docId - currentReaderContext.docBase;
93+
final int rootDocId = findRootDocumentIfNested(context, currentReaderContext, subDocId);
94+
if (rootDocId != -1) {
95+
currentReader.prefetch(rootDocId);
96+
}
97+
currentReader.prefetch(subDocId);
98+
} catch (Exception e) {
99+
throw ExceptionsHelper.convertToOpenSearchException(e);
100+
}
101+
}
102+
}
103+
104+
private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {
105+
if (context.mapperService().hasNested()) {
106+
BitSet bits = context.bitsetFilterCache().getBitSetProducer(Queries.newNonNestedFilter()).getBitSet(subReaderContext);
107+
if (!bits.get(subDocId)) {
108+
return bits.nextSetBit(subDocId);
109+
}
110+
}
111+
return -1;
112+
}
113+
114+
private boolean checkIfStoredFieldsPrefetchEnabled() {
115+
return tieredStoragePrefetchSettingsSupplier.get().isStoredFieldsPrefetchEnabled();
116+
}
117+
}

0 commit comments

Comments
 (0)