Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.LeakTracker;

import java.io.Closeable;
import java.io.EOFException;
Expand All @@ -90,6 +91,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -165,7 +167,8 @@ private static IOContext createReadOnceContext() {
private final ShardLock shardLock;
private final OnClose onClose;

private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::closeInternal); // close us once we are done
private final IntSupplier refCountSupplier;
private final RefCounted refCounter;
private boolean hasIndexSort;

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
Expand All @@ -192,6 +195,12 @@ public Store(
assert onClose != null;
assert shardLock != null;
assert shardLock.getShardId().equals(shardId);

// Close once all references are closed
final AbstractRefCounted wrappedRefCounter = AbstractRefCounted.of(this::closeInternal);
// keep a reference to AbstractRefCounted#refCount because the leak detector obscures it
this.refCountSupplier = wrappedRefCounter::refCount;
this.refCounter = LeakTracker.wrap(wrappedRefCounter);
}

public Directory directory() {
Expand Down Expand Up @@ -235,7 +244,7 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

final void ensureOpen() {
if (this.refCounter.refCount() <= 0) {
if (refCounter.hasReferences() == false) {
throw new AlreadyClosedException("store is already closed");
}
}
Expand Down Expand Up @@ -437,7 +446,7 @@ public void close() {
if (isClosed.compareAndSet(false, true)) {
// only do this once!
decRef();
logger.debug("store reference count on close: {}", refCounter.refCount());
logger.debug("store reference count on close: {}", refCountSupplier.getAsInt());
}
}

Expand Down Expand Up @@ -749,7 +758,7 @@ final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot
* Returns the current reference count.
*/
public int refCount() {
return refCounter.refCount();
return refCountSupplier.getAsInt();
}

public void beforeClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;

import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand All @@ -45,6 +46,8 @@ public void setUp() throws Exception {
indexShard = newShard(true);
directory = newMockFSDirectory(indexShard.shardPath().resolveIndex());
directorySpy = spy(directory);
// The underlying directory will already be closed by #closeShards(indexShard)
doNothing().when(directorySpy).close();
store = createStore(indexShard.shardId(), indexShard.indexSettings(), directorySpy);
}

Expand All @@ -53,6 +56,7 @@ public void tearDown() throws Exception {
super.tearDown();
directory.close();
closeShards(indexShard);
store.close();
}

public void testWritesFile() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Rule;

import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -104,6 +105,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
static final String REPO_TYPE = "fs";
private static final String TEST_REPO_NAME = "test-repo";

@Rule
public ShardSnapshotTaskRunnerTests.DummyContextFactory dummyContextFactory = new ShardSnapshotTaskRunnerTests.DummyContextFactory();

public void testRetrieveSnapshots() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down Expand Up @@ -454,7 +458,7 @@ protected void snapshotFile(SnapshotShardContext context, BlobStoreIndexShardSna
repository.updateState(clusterService.state());
repository.start();
// Generate some FileInfo, as the files that get uploaded as part of the shard snapshot
SnapshotShardContext context = ShardSnapshotTaskRunnerTests.dummyContext();
SnapshotShardContext context = dummyContextFactory.dummyContext();
int noOfFiles = randomIntBetween(10, 100);
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(noOfFiles);
PlainActionFuture<Void> listenerCalled = new PlainActionFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -44,6 +47,8 @@

public class ShardSnapshotTaskRunnerTests extends ESTestCase {

@Rule
public DummyContextFactory dummyContextFactory = new DummyContextFactory();
private ThreadPool threadPool;
private Executor executor;

Expand Down Expand Up @@ -109,34 +114,49 @@ public static BlobStoreIndexShardSnapshot.FileInfo dummyFileInfo() {
return new BlobStoreIndexShardSnapshot.FileInfo(filename, metadata, null);
}

public static SnapshotShardContext dummyContext() {
return dummyContext(new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()), randomMillisUpToYear9999());
}
/**
* Creates {@link SnapshotShardContext}s, managing the lifecycle appropriately to avoid leak detection
*/
public static class DummyContextFactory extends ExternalResource implements TestRule {

public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) {
return dummyContext(snapshotId, startTime, randomIdentifier(), 1);
}
private final List<Store> dummyStores = new ArrayList<>();

public static SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime, String indexName, int shardIndex) {
final var indexId = new IndexId(indexName, UUIDs.randomBase64UUID());
final var shardId = new ShardId(indexId.getName(), UUIDs.randomBase64UUID(), shardIndex);
final var indexSettings = new IndexSettings(
IndexMetadata.builder(indexId.getName()).settings(indexSettings(IndexVersion.current(), 1, 0)).build(),
Settings.EMPTY
);
final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId));
return new SnapshotShardContext(
dummyStore,
null,
snapshotId,
indexId,
new SnapshotIndexCommit(new Engine.IndexCommitRef(null, () -> {})),
null,
IndexShardSnapshotStatus.newInitializing(null),
IndexVersion.current(),
startTime,
ActionListener.noop()
);
public SnapshotShardContext dummyContext() {
return dummyContext(new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()), randomMillisUpToYear9999());
}

public SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime) {
return dummyContext(snapshotId, startTime, randomIdentifier(), 1);
}

public SnapshotShardContext dummyContext(final SnapshotId snapshotId, final long startTime, String indexName, int shardIndex) {
final var indexId = new IndexId(indexName, UUIDs.randomBase64UUID());
final var shardId = new ShardId(indexId.getName(), UUIDs.randomBase64UUID(), shardIndex);
final var indexSettings = new IndexSettings(
IndexMetadata.builder(indexId.getName()).settings(indexSettings(IndexVersion.current(), 1, 0)).build(),
Settings.EMPTY
);
final var dummyStore = new Store(shardId, indexSettings, new ByteBuffersDirectory(), new DummyShardLock(shardId));
dummyStores.add(dummyStore);
return new SnapshotShardContext(
dummyStore,
null,
snapshotId,
indexId,
new SnapshotIndexCommit(new Engine.IndexCommitRef(null, () -> {})),
null,
IndexShardSnapshotStatus.newInitializing(null),
IndexVersion.current(),
startTime,
ActionListener.noop()
);
}

@Override
public void after() {
dummyStores.forEach(Store::close);
dummyStores.clear();
}
}

public void testShardSnapshotTaskRunner() throws Exception {
Expand All @@ -146,7 +166,7 @@ public void testShardSnapshotTaskRunner() throws Exception {
repo.setTaskRunner(taskRunner);
int enqueuedSnapshots = randomIntBetween(maxTasks * 2, maxTasks * 10);
for (int i = 0; i < enqueuedSnapshots; i++) {
threadPool.generic().execute(() -> taskRunner.enqueueShardSnapshot(dummyContext()));
threadPool.generic().execute(() -> taskRunner.enqueueShardSnapshot(dummyContextFactory.dummyContext()));
}
// Eventually all snapshots are finished
assertBusy(() -> {
Expand All @@ -170,23 +190,23 @@ record CapturedTask(SnapshotShardContext context, @Nullable BlobStoreIndexShardS

// first snapshot, one shard, one file, but should execute the shard-level task before the file task
final var earlyStartTime = randomLongBetween(1L, Long.MAX_VALUE - 1000);
final var s1Context = dummyContext(new SnapshotId(randomIdentifier(), randomUUID()), earlyStartTime);
final var s1Context = dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), randomUUID()), earlyStartTime);
tasksInExpectedOrder.add(new CapturedTask(s1Context));
tasksInExpectedOrder.add(new CapturedTask(s1Context, dummyFileInfo()));

// second snapshot, also one shard and one file, starts later than the first
final var laterStartTime = randomLongBetween(earlyStartTime + 1, Long.MAX_VALUE);
final var s2Context = dummyContext(new SnapshotId(randomIdentifier(), "early-uuid"), laterStartTime);
final var s2Context = dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), "early-uuid"), laterStartTime);
tasksInExpectedOrder.add(new CapturedTask(s2Context));
tasksInExpectedOrder.add(new CapturedTask(s2Context, dummyFileInfo()));

// third snapshot, starts at the same time as the second but has a later UUID
final var snapshotId3 = new SnapshotId(randomIdentifier(), "later-uuid");

// the third snapshot has three shards, and their respective tasks should execute in shard-id then index-name order:
final var s3ContextShard1 = dummyContext(snapshotId3, laterStartTime, "early-index-name", 0);
final var s3ContextShard2 = dummyContext(snapshotId3, laterStartTime, "later-index-name", 0);
final var s3ContextShard3 = dummyContext(snapshotId3, laterStartTime, randomIdentifier(), 1);
final var s3ContextShard1 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, "early-index-name", 0);
final var s3ContextShard2 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, "later-index-name", 0);
final var s3ContextShard3 = dummyContextFactory.dummyContext(snapshotId3, laterStartTime, randomIdentifier(), 1);

tasksInExpectedOrder.add(new CapturedTask(s3ContextShard1));
tasksInExpectedOrder.add(new CapturedTask(s3ContextShard2));
Expand All @@ -213,7 +233,9 @@ record CapturedTask(SnapshotShardContext context, @Nullable BlobStoreIndexShardS

// prime the pipeline by executing a dummy task and waiting for it to block the executor, so that the rest of the tasks are sorted
// by the underlying PriorityQueue before any of them start to execute
runner.enqueueShardSnapshot(dummyContext(new SnapshotId(randomIdentifier(), UUIDs.randomBase64UUID()), randomNonNegativeLong()));
runner.enqueueShardSnapshot(
dummyContextFactory.dummyContext(new SnapshotId(randomIdentifier(), UUIDs.randomBase64UUID()), randomNonNegativeLong())
);
safeAwait(readyLatch);
tasksInExecutionOrder.clear(); // remove the dummy task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -73,6 +75,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand All @@ -88,6 +91,7 @@
public class FsRepositoryTests extends ESTestCase {

public void testSnapshotAndRestore() throws IOException {
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
Path repo = createTempDir();
Settings settings = Settings.builder()
Expand All @@ -114,6 +118,7 @@ public void testSnapshotAndRestore() throws IOException {
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);
ShardId shardId = new ShardId(idxSettings.getIndex(), 1);
Store store = new Store(shardId, idxSettings, directory, new DummyShardLock(shardId));
releasables.add(store::close);
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());

Expand Down Expand Up @@ -207,10 +212,13 @@ public void testSnapshotAndRestore() throws IOException {
.toList();
assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv"));
assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration()));
} finally {
Releasables.close(releasables);
}
}

public void testCleanUpWhenShardDataFilesFailToWrite() throws IOException {
final List<Releasable> releasables = new ArrayList<>();
try (Directory directory = newDirectory()) {
Path repo = createTempDir();
Settings settings = Settings.builder()
Expand Down Expand Up @@ -292,6 +300,7 @@ protected BlobContainer wrapChild(BlobContainer child) {
);
final ShardId shardId1 = new ShardId(idxSettings.getIndex(), 1);
final Store store1 = new Store(shardId1, idxSettings, directory, new DummyShardLock(shardId1));
releasables.add(store1::close);
final SnapshotId snapshotId = new SnapshotId("test", "test");
final IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
IndexCommit indexCommit1 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store1.directory()), store1.directory());
Expand Down Expand Up @@ -332,6 +341,7 @@ protected BlobContainer wrapChild(BlobContainer child) {
// Scenario 2 - Shard data files will not be cleaned up if shard level snap file fails to write
final ShardId shardId2 = new ShardId(idxSettings.getIndex(), 2);
final Store store2 = new Store(shardId2, idxSettings, directory, new DummyShardLock(shardId2));
releasables.add(store2::close);
final IndexCommit indexCommit2 = Lucene.getIndexCommit(Lucene.readSegmentInfos(store2.directory()), store2.directory());
final PlainActionFuture<ShardSnapshotResult> snapshot2Future = new PlainActionFuture<>();
canErrorForWriteBlob.set(false);
Expand All @@ -358,6 +368,8 @@ protected BlobContainer wrapChild(BlobContainer child) {
final List<Path> files = pathStream.filter(p -> p.getFileName().toString().startsWith("__")).toList();
assertThat(files, not(empty()));
}
} finally {
Releasables.close(releasables);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void snapshotShard(SnapshotShardContext context) {
Path dataPath = ((FSDirectory) unwrap).getDirectory().getParent();
// TODO should we have a snapshot tmp directory per shard that is maintained by the system?
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
final List<Closeable> toClose = new ArrayList<>(3);
final List<Closeable> toClose = new ArrayList<>(4);
try {
SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(new NIOFSDirectory(snapPath));
toClose.add(overlayDir);
Expand All @@ -171,6 +171,7 @@ protected void closeInternal() {
// do nothing;
}
}, Store.OnClose.EMPTY, mapperService.getIndexSettings().getIndexSortConfig().hasIndexSort());
toClose.add(tempStore);
Supplier<Query> querySupplier = mapperService.hasNested()
? () -> Queries.newNestedFilter(mapperService.getIndexSettings().getIndexVersionCreated())
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,7 @@ private void testDirectories(
}

final Store store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
store.incRef();
releasables.add(store::decRef);
releasables.add(store::close);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why the lifecycle was being managed like above, and whether I've fundamentally changed something here. It seems like we should have a single reference from creation, so we don't need to incRef?

try {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
final IndexCommit indexCommit = Lucene.getIndexCommit(segmentInfos, store.directory());
Expand Down