diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index af28bc3bb32d3..1526e371285ba 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -70,6 +70,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.LeakTracker; import java.io.Closeable; import java.io.EOFException; @@ -90,6 +91,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.IntSupplier; import java.util.function.LongUnaryOperator; import java.util.function.Predicate; import java.util.zip.CRC32; @@ -165,7 +167,8 @@ private static IOContext createReadOnceContext() { private final ShardLock shardLock; private final OnClose onClose; - private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::closeInternal); // close us once we are done + private final IntSupplier refCountSupplier; + private final RefCounted refCounter; private boolean hasIndexSort; public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) { @@ -192,6 +195,12 @@ public Store( assert onClose != null; assert shardLock != null; assert shardLock.getShardId().equals(shardId); + + // Close once all references are closed + final AbstractRefCounted wrappedRefCounter = AbstractRefCounted.of(this::closeInternal); + // keep a reference to AbstractRefCounted#refCount because the leak detector obscures it + this.refCountSupplier = wrappedRefCounter::refCount; + this.refCounter = LeakTracker.wrap(wrappedRefCounter); } public Directory directory() { @@ -235,7 +244,7 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc } final void ensureOpen() { - if (this.refCounter.refCount() <= 0) { + if (refCounter.hasReferences() == false) { throw new AlreadyClosedException("store is already closed"); } } @@ -437,7 +446,7 @@ public void close() { if (isClosed.compareAndSet(false, true)) { // only do this once! decRef(); - logger.debug("store reference count on close: {}", refCounter.refCount()); + logger.debug("store reference count on close: {}", refCountSupplier.getAsInt()); } } @@ -749,7 +758,7 @@ final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot * Returns the current reference count. */ public int refCount() { - return refCounter.refCount(); + return refCountSupplier.getAsInt(); } public void beforeClose() { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/MultiFileWriterTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/MultiFileWriterTests.java index b8a0ef6ed0419..b57a0d355236b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/MultiFileWriterTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/MultiFileWriterTests.java @@ -27,6 +27,7 @@ import java.util.Collections; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -45,6 +46,8 @@ public void setUp() throws Exception { indexShard = newShard(true); directory = newMockFSDirectory(indexShard.shardPath().resolveIndex()); directorySpy = spy(directory); + // The underlying directory will already be closed by #closeShards(indexShard) + doNothing().when(directorySpy).close(); store = createStore(indexShard.shardId(), indexShard.indexSettings(), directorySpy); } @@ -53,6 +56,7 @@ public void tearDown() throws Exception { super.tearDown(); directory.close(); closeShards(indexShard); + store.close(); } public void testWritesFile() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 93e64936031ee..688bcca83f336 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -62,6 +62,7 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; +import org.junit.Rule; import java.io.IOException; import java.nio.file.Path; @@ -104,6 +105,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { static final String REPO_TYPE = "fs"; private static final String TEST_REPO_NAME = "test-repo"; + @Rule + public ShardSnapshotTaskRunnerTests.DummyContextFactory dummyContextFactory = new ShardSnapshotTaskRunnerTests.DummyContextFactory(); + public void testRetrieveSnapshots() { final Client client = client(); final Path location = ESIntegTestCase.randomRepoPath(node().settings()); @@ -454,7 +458,7 @@ protected void snapshotFile(SnapshotShardContext context, BlobStoreIndexShardSna repository.updateState(clusterService.state()); repository.start(); // Generate some FileInfo, as the files that get uploaded as part of the shard snapshot - SnapshotShardContext context = ShardSnapshotTaskRunnerTests.dummyContext(); + SnapshotShardContext context = dummyContextFactory.dummyContext(); int noOfFiles = randomIntBetween(10, 100); BlockingQueue files = new LinkedBlockingQueue<>(noOfFiles); PlainActionFuture listenerCalled = new PlainActionFuture<>(); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java index f3c0f419cde7a..e788ec915f433 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/ShardSnapshotTaskRunnerTests.java @@ -32,6 +32,9 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Rule; +import org.junit.rules.ExternalResource; +import org.junit.rules.TestRule; import java.util.ArrayList; import java.util.List; @@ -44,6 +47,8 @@ public class ShardSnapshotTaskRunnerTests extends ESTestCase { + @Rule + public DummyContextFactory dummyContextFactory = new DummyContextFactory(); private ThreadPool threadPool; private Executor executor; @@ -109,34 +114,49 @@ public static BlobStoreIndexShardSnapshot.FileInfo dummyFileInfo() { return new BlobStoreIndexShardSnapshot.FileInfo(filename, metadata, null); } - public static SnapshotShardContext dummyContext() { - return dummyContext(new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()), randomMillisUpToYear9999()); - } + /** + * Creates {@link SnapshotShardContext}s, managing the lifecycle appropriately to avoid leak detection + */ + public static class DummyContextFactory extends ExternalResource implements TestRule { - public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) { - return dummyContext(snapshotId, startTime, randomIdentifier(), 1); - } + private final List dummyStores = new ArrayList<>(); - public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime, String indexName, int shardIndex) { - final var indexId = new IndexId(indexName, UUIDs.randomBase64UUID()); - final var shardId = new ShardId(indexId.getName(), UUIDs.randomBase64UUID(), shardIndex); - final var indexSettings = new IndexSettings( - IndexMetadata.builder(indexId.getName()).settings(indexSettings(IndexVersion.current(), 1, 0)).build(), - Settings.EMPTY - ); - final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId)); - return new SnapshotShardContext( - dummyStore, - null, - snapshotId, - indexId, - new SnapshotIndexCommit(new Engine.IndexCommitRef(null, () -> {})), - null, - IndexShardSnapshotStatus.newInitializing(null), - IndexVersion.current(), - startTime, - ActionListener.noop() - ); + public SnapshotShardContext dummyContext() { + return dummyContext(new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()), randomMillisUpToYear9999()); + } + + public SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) { + return dummyContext(snapshotId, startTime, randomIdentifier(), 1); + } + + public SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime, String indexName, int shardIndex) { + final var indexId = new IndexId(indexName, UUIDs.randomBase64UUID()); + final var shardId = new ShardId(indexId.getName(), UUIDs.randomBase64UUID(), shardIndex); + final var indexSettings = new IndexSettings( + IndexMetadata.builder(indexId.getName()).settings(indexSettings(IndexVersion.current(), 1, 0)).build(), + Settings.EMPTY + ); + final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId)); + dummyStores.add(dummyStore); + return new SnapshotShardContext( + dummyStore, + null, + snapshotId, + indexId, + new SnapshotIndexCommit(new Engine.IndexCommitRef(null, () -> {})), + null, + IndexShardSnapshotStatus.newInitializing(null), + IndexVersion.current(), + startTime, + ActionListener.noop() + ); + } + + @Override + public void after() { + dummyStores.forEach(Store::close); + dummyStores.clear(); + } } public void testShardSnapshotTaskRunner() throws Exception { @@ -146,7 +166,7 @@ public void testShardSnapshotTaskRunner() throws Exception { repo.setTaskRunner(taskRunner); int enqueuedSnapshots = randomIntBetween(maxTasks * 2, maxTasks * 10); for (int i = 0; i < enqueuedSnapshots; i++) { - threadPool.generic().execute(() -> taskRunner.enqueueShardSnapshot(dummyContext())); + threadPool.generic().execute(() -> taskRunner.enqueueShardSnapshot(dummyContextFactory.dummyContext())); } // Eventually all snapshots are finished assertBusy(() -> { @@ -170,13 +190,13 @@ record CapturedTask(SnapshotShardContext context, @Nullable BlobStoreIndexShardS // first snapshot, one shard, one file, but should execute the shard-level task before the file task final var earlyStartTime = randomLongBetween(1L, Long.MAX_VALUE - 1000); - final var s1Context = dummyContext(new SnapshotId(randomIdentifier(), randomUUID()), earlyStartTime); + final var s1Context = dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), randomUUID()), earlyStartTime); tasksInExpectedOrder.add(new CapturedTask(s1Context)); tasksInExpectedOrder.add(new CapturedTask(s1Context, dummyFileInfo())); // second snapshot, also one shard and one file, starts later than the first final var laterStartTime = randomLongBetween(earlyStartTime + 1, Long.MAX_VALUE); - final var s2Context = dummyContext(new SnapshotId(randomIdentifier(), "early-uuid"), laterStartTime); + final var s2Context = dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), "early-uuid"), laterStartTime); tasksInExpectedOrder.add(new CapturedTask(s2Context)); tasksInExpectedOrder.add(new CapturedTask(s2Context, dummyFileInfo())); @@ -184,9 +204,9 @@ record CapturedTask(SnapshotShardContext context, @Nullable BlobStoreIndexShardS final var snapshotId3 = new SnapshotId(randomIdentifier(), "later-uuid"); // the third snapshot has three shards, and their respective tasks should execute in shard-id then index-name order: - final var s3ContextShard1 = dummyContext(snapshotId3, laterStartTime, "early-index-name", 0); - final var s3ContextShard2 = dummyContext(snapshotId3, laterStartTime, "later-index-name", 0); - final var s3ContextShard3 = dummyContext(snapshotId3, laterStartTime, randomIdentifier(), 1); + final var s3ContextShard1 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, "early-index-name", 0); + final var s3ContextShard2 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, "later-index-name", 0); + final var s3ContextShard3 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, randomIdentifier(), 1); tasksInExpectedOrder.add(new CapturedTask(s3ContextShard1)); tasksInExpectedOrder.add(new CapturedTask(s3ContextShard2)); @@ -213,7 +233,9 @@ record CapturedTask(SnapshotShardContext context, @Nullable BlobStoreIndexShardS // prime the pipeline by executing a dummy task and waiting for it to block the executor, so that the rest of the tasks are sorted // by the underlying PriorityQueue before any of them start to execute - runner.enqueueShardSnapshot(dummyContext(new SnapshotId(randomIdentifier(), UUIDs.randomBase64UUID()), randomNonNegativeLong())); + runner.enqueueShardSnapshot( + dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), UUIDs.randomBase64UUID()), randomNonNegativeLong()) + ); safeAwait(readyLatch); tasksInExecutionOrder.clear(); // remove the dummy task diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 36cd84a0d32b9..6670e71a5dcff 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -45,6 +45,8 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -73,6 +75,7 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -88,6 +91,7 @@ public class FsRepositoryTests extends ESTestCase { public void testSnapshotAndRestore() throws IOException { + final List releasables = new ArrayList<>(); try (Directory directory = newDirectory()) { Path repo = createTempDir(); Settings settings = Settings.builder() @@ -114,6 +118,7 @@ public void testSnapshotAndRestore() throws IOException { IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings); ShardId shardId = new ShardId(idxSettings.getIndex(), 1); Store store = new Store(shardId, idxSettings, directory, new DummyShardLock(shardId)); + releasables.add(store::close); SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID()); @@ -207,10 +212,13 @@ public void testSnapshotAndRestore() throws IOException { .toList(); assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv")); assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration())); + } finally { + Releasables.close(releasables); } } public void testCleanUpWhenShardDataFilesFailToWrite() throws IOException { + final List releasables = new ArrayList<>(); try (Directory directory = newDirectory()) { Path repo = createTempDir(); Settings settings = Settings.builder() @@ -292,6 +300,7 @@ protected BlobContainer wrapChild(BlobContainer child) { ); final ShardId shardId1 = new ShardId(idxSettings.getIndex(), 1); final Store store1 = new Store(shardId1, idxSettings, directory, new DummyShardLock(shardId1)); + releasables.add(store1::close); final SnapshotId snapshotId = new SnapshotId("test", "test"); final IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID()); IndexCommit indexCommit1 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store1.directory()), store1.directory()); @@ -332,6 +341,7 @@ protected BlobContainer wrapChild(BlobContainer child) { // Scenario 2 - Shard data files will not be cleaned up if shard level snap file fails to write final ShardId shardId2 = new ShardId(idxSettings.getIndex(), 2); final Store store2 = new Store(shardId2, idxSettings, directory, new DummyShardLock(shardId2)); + releasables.add(store2::close); final IndexCommit indexCommit2 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store2.directory()), store2.directory()); final PlainActionFuture snapshot2Future = new PlainActionFuture<>(); canErrorForWriteBlob.set(false); @@ -358,6 +368,8 @@ protected BlobContainer wrapChild(BlobContainer child) { final List files = pathStream.filter(p -> p.getFileName().toString().startsWith("__")).toList(); assertThat(files, not(empty())); } + } finally { + Releasables.close(releasables); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java index 579e3fb43b194..2a94bfc6b19ab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java @@ -161,7 +161,7 @@ public void snapshotShard(SnapshotShardContext context) { Path dataPath = ((FSDirectory) unwrap).getDirectory().getParent(); // TODO should we have a snapshot tmp directory per shard that is maintained by the system? Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME); - final List toClose = new ArrayList<>(3); + final List toClose = new ArrayList<>(4); try { SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(new NIOFSDirectory(snapPath)); toClose.add(overlayDir); @@ -171,6 +171,7 @@ protected void closeInternal() { // do nothing; } }, Store.OnClose.EMPTY, mapperService.getIndexSettings().getIndexSortConfig().hasIndexSort()); + toClose.add(tempStore); Supplier querySupplier = mapperService.hasNested() ? () -> Queries.newNestedFilter(mapperService.getIndexSettings().getIndexVersionCreated()) : null; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java index 98df96eca7772..b6487c92641ac 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java @@ -566,8 +566,7 @@ private void testDirectories( } final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); - store.incRef(); - releasables.add(store::decRef); + releasables.add(store::close); try { final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());