Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static class SearcherWrapperPlugin extends Plugin {
public void onIndexModule(IndexModule indexModule) {
super.onIndexModule(indexModule);
if (enabled) {
indexModule.setReaderWrapper(indexService -> {
indexModule.addReaderWrapper(indexService -> {
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper = EngineTestCase.randomReaderWrapper();
return reader -> {
calls.incrementAndGet();
Expand Down
34 changes: 26 additions & 8 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ public interface DirectoryWrapper {
private final AnalysisRegistry analysisRegistry;
private final EngineFactory engineFactory;
private final SetOnce<DirectoryWrapper> indexDirectoryWrapper = new SetOnce<>();
private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
new SetOnce<>();
private final List<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrappers =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimczi i see you've done work in this area previously. Do you see any problem with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main worry is the ordering. Are we sure that it's ok if the ordering is not guaranteed? Like would we take different decisions in the extension if the live docs are different depending on which directory is last?
If not, we should ensure that the directory implementation applies the recommendation in the javadoc (like delegating cache helper, ...). Maybe that should be enforced too if that's not the case already (in tests or asserts).

Copy link
Contributor Author

@lkts lkts Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay. Both readers take live docs from the wrapped reader and apply a bitset on top of them e.g.
https://github.com/elastic/elasticsearch/blob/main/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java#L207.

My new implementation essentially boils down to:

static class FilterBits implements Bits {
        private final Bits original;
        private final Bits filteredOut;

        FilterBits(Bits original, BitSet filteredOut) {
            this.original = original;
            this.filteredOut = filteredOut;
        }

        @Override
        public boolean get(int index) {
            return original.get(index) && (filteredOut.get(index) == false);
        }

        @Override
        public int length() {
            return original.length();
        }
    }

If security wrapper comes first then all "unsecure" docs are not live and stay that way since original.get(index) is false. If a new wrapper comes first then security works normally except some of the docs that need to be excluded are already excluded by the new wrapper.

There are checks for the contract of the wrappers already in here

final ElasticsearchDirectoryReader elasticsearchDirectoryReader = ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only dangerous code path i can see is if wrapper uses something like FilterLeafReader.unwrap() which we don't do and IMO it is kinda obvious that this is a dangerous path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep we only to ensure that the ElasticsearchDirectoryReader can be extracted to get the shard infos.
From the description of the feature, it looks fine. Let's make sure that we don't add more to it, this is still a very expert thing to do and hopefully not an extension point for any plugin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have prior art in making sure that certain extension points are only used by Elastic supplied plugins (modules) and not by arbitrary 3rd party plugins

For example

if (plugin instanceof RestServerActionPlugin restPlugin) {
var newInstance = function.apply(restPlugin);
if (newInstance != null) {
logger.debug("Using custom {} from plugin {}", type, plugin.getClass().getName());
if (isInternalPlugin(plugin) == false) {
throw new IllegalArgumentException(
"The "
+ plugin.getClass().getName()
+ " plugin tried to install a custom "
+ type
+ ". This functionality is not available to external plugins."
);
}
if (result != null) {
throw new IllegalArgumentException("Cannot have more than one plugin implementing a " + type);
}
result = newInstance;
}
}

@rjernst probably has a view on the preferred way to enforce that

new ArrayList<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, IndexVersion, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
Expand Down Expand Up @@ -368,9 +368,9 @@ public void addSimilarity(String name, TriFunction<Settings, IndexVersion, Scrip
}

/**
* Sets the factory for creating new {@link DirectoryReader} wrapper instances.
* Adds a new instance of factory creating new {@link DirectoryReader} wrapper instances.
* The factory ({@link Function}) is called once the IndexService is fully constructed.
* NOTE: this method can only be called once per index. Multiple wrappers are not supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether there was any reason beyond "not implemented yet" why there might have been a restriction on more than a single wrapper? I ask because the implementation is so small it doesn't seem like it would necessarily have been an impediment on its own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added as a protection to disallow other plugins than security to mess with this low level api.

* The order of execution of wrappers is not guaranteed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do execute them in order and should define it instead. It might be important to performance which wrapper goes first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean something like if the first wrapper filters out many documents, the second wrapper has less work to do? How do we decide what the correct order should be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep the order of application of wrappers based on the order in which they were added (we already do). The problem with the order here is that this is called by plugins and the order in which plugins are set up is not defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So real clients of this API can't really rely on the order in reality.

* <p>
* The {@link CheckedFunction} is invoked each time a {@link Engine.Searcher} is requested to do an operation,
* for example search, and must return a new directory reader wrapping the provided directory reader or if no
Expand All @@ -384,11 +384,11 @@ public void addSimilarity(String name, TriFunction<Settings, IndexVersion, Scrip
* The returned reader is closed once it goes out of scope.
* </p>
*/
public void setReaderWrapper(
public void addReaderWrapper(
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> indexReaderWrapperFactory
) {
ensureNotFrozen();
this.indexReaderWrapper.set(indexReaderWrapperFactory);
this.indexReaderWrappers.add(indexReaderWrapperFactory);
}

/**
Expand Down Expand Up @@ -499,8 +499,26 @@ public IndexService newIndexService(
QueryRewriteInterceptor queryRewriteInterceptor
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
.get() == null ? (shard) -> null : indexReaderWrapper.get();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory =
switch (indexReaderWrappers.size()) {
case 0 -> indexService -> null;
case 1 -> indexReaderWrappers.get(0);
default -> indexService -> {
// Call factories only one time when creating index service to avoid duplicate work.
var wrappers = new ArrayList<CheckedFunction<DirectoryReader, DirectoryReader, IOException>>();
for (var indexReaderWrapper : indexReaderWrappers) {
wrappers.add(indexReaderWrapper.apply(indexService));
}

return directoryReader -> {
var wrapped = wrappers.get(0).apply(directoryReader);
for (int i = 1; i < wrappers.size(); i++) {
wrapped = wrappers.get(i).apply(wrapped);
}
return wrapped;
};
};
};
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
Expand Down
117 changes: 115 additions & 2 deletions server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import org.apache.lucene.analysis.standard.StandardTokenizer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInvertState;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.TermStatistics;
Expand Down Expand Up @@ -267,14 +270,78 @@ public void testWrapperIsBound() throws IOException {
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
module.setReaderWrapper(s -> new Wrapper());
module.addReaderWrapper(s -> new Wrapper());

IndexService indexService = newIndexService(module);
assertTrue(indexService.getReaderWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.getEngineFactory());
closeIndexService(indexService);
}

public void testMultipleReaderWrappers() throws IOException {
final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class);
IndexModule module = new IndexModule(
indexSettings,
emptyAnalysisRegistry,
engineFactory,
Collections.emptyMap(),
() -> true,
indexNameExpressionResolver,
Collections.emptyMap(),
mock(SlowLogFieldProvider.class),
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);

class Wrapper extends FilterDirectoryReader {
final String name;
final DirectoryReader wrappedReader;

Wrapper(String name, DirectoryReader in) throws IOException {
super(in, new SubReaderWrapper() {
@Override
public LeafReader wrap(LeafReader reader) {
return reader;
}
});
this.name = name;
this.wrappedReader = in;
}

@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return null;
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
}

module.addReaderWrapper(s -> reader -> new Wrapper("A", reader));
module.addReaderWrapper(s -> reader -> new Wrapper("B", reader));

IndexService indexService = newIndexService(module);

var wrapper = indexService.getReaderWrapper();

try (var directory = newDirectory(); var reader = new DummyDirectoryReader(directory)) {
var wrapped = wrapper.apply(reader);
// B is the outermost wrapper since it was added last.
assertTrue(wrapped instanceof Wrapper w && w.name.equals("B"));
var secondLevel = ((Wrapper) wrapped).wrappedReader;
assertTrue(secondLevel instanceof Wrapper w && w.name.equals("A"));
var thirdLevel = ((Wrapper) secondLevel).wrappedReader;
assertTrue(thirdLevel instanceof DummyDirectoryReader);
}

closeIndexService(indexService);
}

public void testRegisterIndexStore() throws IOException {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
Expand Down Expand Up @@ -489,7 +556,7 @@ public void testFrozen() {
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexEventListener(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexOperationListener(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSimilarity(null, null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderWrapper(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addReaderWrapper(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheProvider(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setDirectoryWrapper(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setIndexCommitListener(null)).getMessage());
Expand Down Expand Up @@ -894,4 +961,50 @@ protected WrappedDirectory(Directory in, ShardRouting shardRouting) {
this.shardRouting = shardRouting;
}
}

private static class DummyDirectoryReader extends DirectoryReader {
DummyDirectoryReader(Directory directory) throws IOException {
super(directory, new LeafReader[0], null);
}

@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
return null;
}

@Override
public long getVersion() {
return 0;
}

@Override
public boolean isCurrent() throws IOException {
return false;
}

@Override
public IndexCommit getIndexCommit() throws IOException {
return null;
}

@Override
protected void doClose() throws IOException {

}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
public static class ReaderWrapperCountPlugin extends Plugin {
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.setReaderWrapper(service -> SearchServiceSingleNodeTests::apply);
indexModule.addReaderWrapper(service -> SearchServiceSingleNodeTests::apply);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,7 @@ public void onIndexModule(IndexModule module) {
assert getLicenseState() != null;
if (XPackSettings.DLS_FLS_ENABLED.get(settings)) {
assert dlsBitsetCache.get() != null;
module.setReaderWrapper(
module.addReaderWrapper(
indexService -> new SecurityIndexReaderWrapper(
shardId -> indexService.newSearchExecutionContext(
shardId.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public void testOnIndexModuleIsNoOpWithSecurityDisabled() throws Exception {
);
security.onIndexModule(indexModule);
// indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems outdated by this change. I did not "debug" why the comment was made, might be worth looking into. Sounds like this "setReaderWrapper" really verifies that no reader wrapper was set yet?

indexModule.setReaderWrapper(null);
indexModule.addReaderWrapper(null);
}

public void testFilteredSettings() throws Exception {
Expand Down