Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for enabling pluggable data formats, starting with phase-1 of decoupling shard from engine, and introducing basic abstractions ([#20675](https://github.com/opensearch-project/OpenSearch/pull/20675))

- Add warmup phase to wait for lag to catch up in pull-based ingestion before serving ([#20526](https://github.com/opensearch-project/OpenSearch/pull/20526))
- Add tiered-storage module with stored fields prefetch support ([#20962](https://github.com/opensearch-project/OpenSearch/pull/20962))

### Changed
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
Expand Down
21 changes: 21 additions & 0 deletions modules/tiered-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

opensearchplugin {
description = 'Module for tiered storage and writable warm index support'
classname = 'org.opensearch.storage.TieredStoragePlugin'
}

test {
include '**/*Tests.class'
include '**/*Test.class'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.storage;

import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.storage.prefetch.StoredFieldsPrefetch;
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

/**
* Plugin to support writable warm index and other related features
*/
public class TieredStoragePlugin extends Plugin { // implements IndexStorePlugin, ActionPlugin, TelemetryAwarePlugin //

/**
* Default constructor.
*/
public TieredStoragePlugin() {}

/**
* Index type for optimised downloads on hot indices.
*/
// public static final String HOT_BLOCK_EAGER_FETCH_INDEX_TYPE = "hot_block_eager_fetch";
// private static final String REMOTE_DOWNLOAD = "remote_download";
// private final SetOnce<ThreadPool> threadpool = new SetOnce<>();
// public static final String TIERED_COMPOSITE_INDEX_TYPE = "tiered-storage";
private TieredStoragePrefetchSettings tieredStoragePrefetchSettings;
// private TierActionMetrics tierActionMetrics;

// private final List<Setting<?>> tieredStorageSettings = Stream.concat(
// Stream.of(
// // TieringUtils.H2W_MAX_CONCURRENT_TIEIRNG_REQUESTS,
// // TieringUtils.W2H_MAX_CONCURRENT_TIEIRNG_REQUESTS,
// // TieringUtils.JVM_USAGE_TIERING_THRESHOLD_PERCENT,
// // TieringUtils.FILECACHE_ACTIVE_USAGE_TIERING_THRESHOLD_PERCENT,
// TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
// TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
// ),
// // TIERED_STORAGE_SEARCH_SLOWLOG_SETTINGS.stream()
// ).toList();

// @Override
// public Map<String, IndexStorePlugin.DirectoryFactory> getDirectoryFactories() {
// return Collections.emptyMap();
// }

// @Override
// public Map<String, IndexStorePlugin.CompositeDirectoryFactory> getCompositeDirectoryFactories() {
// final Map<String, IndexStorePlugin.CompositeDirectoryFactory> registry = new HashMap<>();
// registry.put(HOT_BLOCK_EAGER_FETCH_INDEX_TYPE, new OSBlockHotDirectoryFactory(() -> threadpool.get()));
// registry.put(TIERED_COMPOSITE_INDEX_TYPE, new TieredDirectoryFactory(getPrefetchSettingsSupplier()));
// return registry;
// }

@Override
public List<Setting<?>> getSettings() {
return List.of(
TieredStoragePrefetchSettings.READ_AHEAD_BLOCK_COUNT,
TieredStoragePrefetchSettings.STORED_FIELDS_PREFETCH_ENABLED_SETTING
);
}

// public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
// ExecutorBuilder<?> executorBuilder = new ScalingExecutorBuilder(
// REMOTE_DOWNLOAD,
// 1,
// ThreadPool.twiceAllocatedProcessors(allocatedProcessors),
// TimeValue.timeValueMinutes(5)
// );
// return List.of(executorBuilder);
// }

// public ThreadPool getThreadpool() {
// return threadpool.get();
// }

/**
* Returns a supplier for the tiered storage prefetch settings.
* @return supplier of {@link TieredStoragePrefetchSettings}
*/
public Supplier<TieredStoragePrefetchSettings> getPrefetchSettingsSupplier() {
return () -> this.tieredStoragePrefetchSettings;
}

// @Override
// public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
// return List.of(
// new ActionHandler<>(HotToWarmTierAction.INSTANCE, TransportHotToWarmTierAction.class),
// new ActionHandler<>(WarmToHotTierAction.INSTANCE, TransportWarmToHotTierAction.class),
// new ActionHandler<>(CancelTieringAction.INSTANCE, TransportCancelTierAction.class),
// new ActionHandler<>(ListTieringStatusAction.INSTANCE, TransportListTieringStatusAction.class),
// new ActionHandler<>(GetTieringStatusAction.INSTANCE, TransportGetTieringStatusAction.class));
// } else {
// return List.of();
// }
// }

// @Override
// public List<RestHandler> getRestHandlers(
// Settings settings, RestController restController,
// ClusterSettings clusterSettings,
// IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
// IndexNameExpressionResolver indexNameExpressionResolver,
// Supplier<DiscoveryNodes> nodesInCluster
// ) {
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
// return List.of(
// new RestHotToWarmTierAction(),
// new RestWarmToHotTierAction(),
// new RestCancelTierAction(),
// new RestGetTieringStatusAction(),
// new RestListTieringStatusAction());
// } else {
// return List.of();
// }
// }

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
// Tracer tracer,
// MetricsRegistry metricsRegistry
) {
// this.tierActionMetrics = new TierActionMetrics(metricsRegistry);
this.tieredStoragePrefetchSettings = new TieredStoragePrefetchSettings(clusterService.getClusterSettings());
// this.threadpool.set(threadPool);
return Collections.emptyList();
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
// indexModule.addSearchOperationListener(new TieredStorageSearchSlowLog(indexModule.getIndexSettings()));
indexModule.addSearchOperationListener(new StoredFieldsPrefetch(getPrefetchSettingsSupplier()));
}
}

// @Override
// public Collection<Module> createGuiceModules() {
// List<Module> modules = new ArrayList<>();
// if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_EXPERIMENTAL_FLAG)) {
// modules.add(new AbstractModule() {
// @Override
// protected void configure() {
// bind(HotToWarmTieringService.class).asEagerSingleton();
// bind(WarmToHotTieringService.class).asEagerSingleton();
// bind(TierActionMetrics.class).toInstance(tierActionMetrics);
// }
// });
// }
// return Collections.unmodifiableCollection(modules);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Tiered storage plugin for writable warm index support.
*/
package org.opensearch.storage;
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.storage.prefetch;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.function.Supplier;

/**
* Search operation listener that prefetches stored fields for tiered storage indices.
*/
public class StoredFieldsPrefetch implements SearchOperationListener {

private static final Logger log = LogManager.getLogger(StoredFieldsPrefetch.class);
private final Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier;

/**
* Creates a new StoredFieldsPrefetch instance.
* @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
*/
public StoredFieldsPrefetch(Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier) {
this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
// Based on cluster settings
if (checkIfStoredFieldsPrefetchEnabled()) {
executePrefetch(searchContext);
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(true);
}
// else {
// TieredStorageQueryMetricService.getInstance().recordStoredFieldsPrefetch(false);
// }
}

private void executePrefetch(SearchContext context) {
int currentReaderIndex = -1;
LeafReaderContext currentReaderContext = null;
StoredFields currentReader = null;
log.debug("Stored Field Execute prefetch was triggered: {}", context.docIdsToLoadSize());
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
try {
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
if (currentReaderIndex != readerIndex) {
currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
currentReaderIndex = readerIndex;

// Unwrap the reader here
LeafReader innerLeafReader = currentReaderContext.reader();
while (innerLeafReader instanceof FilterLeafReader) {
innerLeafReader = ((FilterLeafReader) innerLeafReader).getDelegate();
}
// never be the case, just sanity check
if (!(innerLeafReader instanceof SegmentReader)) {
// disable prefetch on stored fields for this segment
log.warn("Unexpected reader type [{}], skipping stored fields prefetch", innerLeafReader.getClass().getName());
currentReader = null;
continue;
}
currentReader = innerLeafReader.storedFields();
}
assert currentReaderContext != null;
if (currentReader == null) {
continue;
}
log.debug(
"Prefetching stored fields for index shard: {}, docId: {}, readerIndex: {}",
context.indexShard().shardId(),
docId,
readerIndex
);

// nested docs logic
final int subDocId = docId - currentReaderContext.docBase;
final int rootDocId = findRootDocumentIfNested(context, currentReaderContext, subDocId);
if (rootDocId != -1) {
currentReader.prefetch(rootDocId);
}
currentReader.prefetch(subDocId);
} catch (Exception e) {
log.warn("Failed to prefetch stored fields for docId: " + docId, e);
}
}
}

private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {
if (context.mapperService().hasNested()) {
BitSet bits = context.bitsetFilterCache().getBitSetProducer(Queries.newNonNestedFilter()).getBitSet(subReaderContext);
if (bits != null && !bits.get(subDocId)) {
return bits.nextSetBit(subDocId);
}
}
return -1;
}

private boolean checkIfStoredFieldsPrefetchEnabled() {
TieredStoragePrefetchSettings settings = tieredStoragePrefetchSettingsSupplier.get();
return settings != null && settings.isStoredFieldsPrefetchEnabled();
}
}
Loading
Loading