Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -106,6 +107,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1021,21 +1023,13 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests
final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion;
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
try (var directoryReaderSupplier = acquireDirectoryReaderSupplier(SearcherScope.INTERNAL)) {
var indexReader = directoryReaderSupplier.getDirectoryReader();
if (engineConfig.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
assert engineConfig.getLeafSorter() == DataStream.TIMESERIES_LEAF_READERS_SORTER;
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(
searcher.getIndexReader(),
op.uid(),
op.id(),
loadSeqNo
);
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), op.id(), loadSeqNo);
} else {
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(
searcher.getIndexReader(),
op.uid(),
loadSeqNo
);
docIdAndVersion = VersionsAndSeqNoResolver.timeSeriesLoadDocIdAndVersion(indexReader, op.uid(), loadSeqNo);
}
}
if (docIdAndVersion != null) {
Expand Down Expand Up @@ -3470,4 +3464,70 @@ public LiveVersionMap getLiveVersionMap() {
protected long getPreCommitSegmentGeneration() {
return preCommitSegmentGeneration.get();
}

DirectoryReaderSupplier acquireDirectoryReaderSupplier(SearcherScope scope) throws EngineException {
assert scope == SearcherScope.INTERNAL : "acquireDirectoryReaderSupplier(...) isn't prepared for external usage";

/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
if (store.tryIncRef() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah see @tlrx, we don't need to ever acquire a store because we are guaranteed to have an engine reference here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we can just change that into an assertion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so yes, assertion should be enough :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushed: b81fa73

throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());
}
Releasable releasable = store::decRef;
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
releasable = null; // success - hand over the reference to the engine reader
return new DirectoryReaderSupplier(acquire) {

@Override
void doClose() {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
}
};
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire reader", ex);
throw new EngineException(shardId, "failed to acquire reader", ex);
} finally {
Releasables.close(releasable);
}
}

abstract static class DirectoryReaderSupplier implements Releasable {
Copy link
Contributor

@original-brownbear original-brownbear Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only extended once, we can make this one class for the reader supplier can't we? Or better yet, maybe just inline the logic to save some cycles? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed: 1ee2699

private final DirectoryReader directoryReader;
private final AtomicBoolean released = new AtomicBoolean(false);

DirectoryReaderSupplier(DirectoryReader directoryReader) {
this.directoryReader = directoryReader;
}

public DirectoryReader getDirectoryReader() {
return directoryReader;
}

@Override
public final void close() {
if (released.compareAndSet(false, true)) {
doClose();
} else {
assert false : "DirectoryReaderSupplier was released twice";
}
}

abstract void doClose();
}
}