Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.LeakTracker;

import java.io.Closeable;
import java.io.EOFException;
Expand All @@ -90,6 +91,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -165,7 +167,8 @@ private static IOContext createReadOnceContext() {
private final ShardLock shardLock;
private final OnClose onClose;

private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::closeInternal); // close us once we are done
private final IntSupplier refCountSupplier;
private final RefCounted refCounter;
private boolean hasIndexSort;

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
Expand All @@ -192,6 +195,12 @@ public Store(
assert onClose != null;
assert shardLock != null;
assert shardLock.getShardId().equals(shardId);

// Close once all references are closed
final AbstractRefCounted wrappedRefCounter = AbstractRefCounted.of(this::closeInternal);
// keep a reference to AbstractRefCounted#refCount because the leak detector obscures it
this.refCountSupplier = wrappedRefCounter::refCount;
this.refCounter = LeakTracker.wrap(wrappedRefCounter);
}

public Directory directory() {
Expand Down Expand Up @@ -235,7 +244,7 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
if (refCounter.hasReferences() == false) {
throw new AlreadyClosedException("store is already closed");
}
}
Expand Down Expand Up @@ -437,7 +446,7 @@ public void close() {
if (isClosed.compareAndSet(false, true)) {
// only do this once!
decRef();
logger.debug("store reference count on close: {}", refCounter.refCount());
logger.debug("store reference count on close: {}", refCountSupplier.getAsInt());
}
}

Expand Down Expand Up @@ -749,7 +758,7 @@ final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot
* Returns the current reference count.
*/
public int refCount() {
return refCounter.refCount();
return refCountSupplier.getAsInt();
}

public void beforeClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;

import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -45,6 +46,8 @@ public void setUp() throws Exception {
indexShard = newShard(true);
directory = newMockFSDirectory(indexShard.shardPath().resolveIndex());
directorySpy = spy(directory);
// The underlying directory will already be closed by #closeShards(indexShard)
doNothing().when(directorySpy).close();
store = createStore(indexShard.shardId(), indexShard.indexSettings(), directorySpy);
}

Expand All @@ -53,6 +56,7 @@ public void tearDown() throws Exception {
super.tearDown();
directory.close();
closeShards(indexShard);
store.close();
}

public void testWritesFile() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

public class ShardSnapshotTaskRunnerTests extends ESTestCase {

private static List<Store> dummyStores;
private ThreadPool threadPool;
private Executor executor;

Expand All @@ -52,12 +53,16 @@ public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("test");
executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
dummyStores = new ArrayList<>();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
TestThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
// Don't trigger leak detection
dummyStores.forEach(Store::close);
dummyStores.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably does not work since the dummyContext helper method is also used by a different test class BlobStoreRepositoryTests. One possible solution is to let the caller manages it. The helper method returns a SnapshotShardContext which has store() method to access the store and close it?

}

private static class MockedRepo {
Expand Down Expand Up @@ -125,6 +130,7 @@ public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, fin
Settings.EMPTY
);
final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId));
dummyStores.add(dummyStore);
return new SnapshotShardContext(
dummyStore,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -73,6 +75,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand All @@ -88,6 +91,7 @@
public class FsRepositoryTests extends ESTestCase {

public void testSnapshotAndRestore() throws IOException {
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
Path repo = createTempDir();
Settings settings = Settings.builder()
Expand All @@ -114,6 +118,7 @@ public void testSnapshotAndRestore() throws IOException {
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);
ShardId shardId = new ShardId(idxSettings.getIndex(), 1);
Store store = new Store(shardId, idxSettings, directory, new DummyShardLock(shardId));
releasables.add(store::close);
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());

Expand Down Expand Up @@ -207,10 +212,13 @@ public void testSnapshotAndRestore() throws IOException {
.toList();
assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv"));
assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration()));
} finally {
Releasables.close(releasables);
}
}

public void testCleanUpWhenShardDataFilesFailToWrite() throws IOException {
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
Path repo = createTempDir();
Settings settings = Settings.builder()
Expand Down Expand Up @@ -292,6 +300,7 @@ protected BlobContainer wrapChild(BlobContainer child) {
);
final ShardId shardId1 = new ShardId(idxSettings.getIndex(), 1);
final Store store1 = new Store(shardId1, idxSettings, directory, new DummyShardLock(shardId1));
releasables.add(store1::close);
final SnapshotId snapshotId = new SnapshotId("test", "test");
final IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
IndexCommit indexCommit1 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store1.directory()), store1.directory());
Expand Down Expand Up @@ -332,6 +341,7 @@ protected BlobContainer wrapChild(BlobContainer child) {
// Scenario 2 - Shard data files will not be cleaned up if shard level snap file fails to write
final ShardId shardId2 = new ShardId(idxSettings.getIndex(), 2);
final Store store2 = new Store(shardId2, idxSettings, directory, new DummyShardLock(shardId2));
releasables.add(store2::close);
final IndexCommit indexCommit2 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store2.directory()), store2.directory());
final PlainActionFuture<ShardSnapshotResult> snapshot2Future = new PlainActionFuture<>();
canErrorForWriteBlob.set(false);
Expand All @@ -358,6 +368,8 @@ protected BlobContainer wrapChild(BlobContainer child) {
final List<Path> files = pathStream.filter(p -> p.getFileName().toString().startsWith("__")).toList();
assertThat(files, not(empty()));
}
} finally {
Releasables.close(releasables);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void snapshotShard(SnapshotShardContext context) {
Path dataPath = ((FSDirectory) unwrap).getDirectory().getParent();
// TODO should we have a snapshot tmp directory per shard that is maintained by the system?
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
final List<Closeable> toClose = new ArrayList<>(3);
final List<Closeable> toClose = new ArrayList<>(4);
try {
SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(new NIOFSDirectory(snapPath));
toClose.add(overlayDir);
Expand All @@ -171,6 +171,7 @@ protected void closeInternal() {
// do nothing;
}
}, Store.OnClose.EMPTY, mapperService.getIndexSettings().getIndexSortConfig().hasIndexSort());
toClose.add(tempStore);
Supplier<Query> querySupplier = mapperService.hasNested()
? () -> Queries.newNestedFilter(mapperService.getIndexSettings().getIndexVersionCreated())
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,7 @@ private void testDirectories(
}

final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
store.incRef();
releasables.add(store::decRef);
releasables.add(store::close);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the lifecycle was being managed like above, and whether I've fundamentally changed something here. It seems like we should have a single reference from creation, so we don't need to incRef?

try {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());
Expand Down