Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/EngineRef.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.shard;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.engine.Engine;

public final class EngineRef extends AbstractRefCounted implements Releasable {

private final Releasable releasable;
private final Engine engine;

public EngineRef(Engine engine, Releasable releasable) {
this.engine = engine;
this.releasable = releasable;
}

@Nullable
public Engine getEngineOrNull() {
return engine;
}

public Engine getEngine() {
var engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
}
return engine;
}

@Override
protected void closeInternal() {
releasable.close();
}

@Override
public void close() {
decRef();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.shard;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.engine.Engine;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

class EngineReferenceManager {

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // fair
private final Releasable releaseExclusiveLock = () -> lock.writeLock().unlock(); // reuse this to avoid allocation for each op
private final Releasable releaseLock = () -> lock.readLock().unlock(); // reuse this to avoid allocation for each op

private volatile Engine current;

/**
* @return a releasable reference to a given {@link Engine}, preventing it to be changed until the reference is released. Note that the
* {@link Engine} referenced by the {@link EngineRef} may be null and may be closed at anytime in case of an engine failure, but is
* guaranteed to not be changed (and closed) by an engine reset.
*/
public EngineRef getEngineRef() {
lock.readLock().lock();
return new EngineRef(this.current, Releasables.assertOnce(releaseLock));
}

/**
* Acquires a lock that prevents the {@link Engine} to be changed until the returned releasable is released.
* @return
*/
Releasable acquireEngineLock() {
lock.writeLock().lock();
return Releasables.assertOnce(releaseExclusiveLock);
}

boolean isEngineLockHeldByCurrentThread() {
return lock.writeLock().isHeldByCurrentThread();
}

/**
* @return the (possibly null) current reference to the {@link Engine}
*/
@Nullable
Engine get() {
return this.current;
}

@Nullable
Engine getEngineAndSet(Supplier<Engine> supplier) {
assert supplier != null : "supplier cannot be null";
assert isEngineLockHeldByCurrentThread();
Engine previous = this.current;
this.current = supplier.get();
return previous;
}
}
73 changes: 42 additions & 31 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// ensure happens-before relation between addRefreshListener() and postRecovery()
private volatile SubscribableListener<Void> postRecoveryComplete;
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();

private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager();
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
Expand Down Expand Up @@ -1277,10 +1277,13 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
if (indexSettings.getIndexVersionCreated().isLegacyIndexVersion()) {
throw new IllegalStateException("get operations not allowed on a legacy index");
}
if (translogOnly) {
return getEngine().getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
try (var engineRef = engineReferenceManager.getEngineRef()) {
var engine = engineRef.getEngine();
if (translogOnly) {
return engine.getFromTranslog(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
}
return engine.get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
}
return getEngine().get(get, mappingLookup, mapperService.documentParser(), searcherWrapper);
}

/**
Expand Down Expand Up @@ -1613,10 +1616,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
synchronized (engineMutex) {
try (var engineRef = engineReferenceManager.getEngineRef()) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
final Engine engine = getEngineOrNull();
final Engine engine = engineRef.getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
Expand Down Expand Up @@ -1776,14 +1779,14 @@ public CacheHelper getReaderCacheHelper() {
}

public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.acquireEngineLock()) {
try {
synchronized (mutex) {
changeState(IndexShardState.CLOSED, reason);
}
checkAndCallWaitForEngineOrClosedShardListeners();
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
final Engine engine = engineReferenceManager.getEngineAndSet(() -> null);
closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
@Override
public void run() throws Exception {
Expand Down Expand Up @@ -1857,7 +1860,7 @@ public void prepareForIndexRecovery() {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.setStage(RecoveryState.Stage.INDEX);
assert currentEngineReference.get() == null;
assert assertEngineReferenceIsNull("prepare for recovery, engine should be null");
}

/**
Expand Down Expand Up @@ -1936,8 +1939,8 @@ private void doLocalRecovery(
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
synchronized (engineMutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
try (var ignored = engineReferenceManager.acquireEngineLock()) {
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
}
}, (recoveryCompleteListener, ignoredRef) -> {
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
Expand Down Expand Up @@ -2167,13 +2170,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
+ recoveryState.getRecoverySource()
+ "] but got "
+ getRetentionLeases();
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
try (var ignored = engineReferenceManager.acquireEngineLock()) {
assert assertEngineReferenceIsNull("engine is running");
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
// we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = createEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
var previous = engineReferenceManager.getEngineAndSet(() -> newEngine);
assert previous == null;
// We set active because we are now writing operations to the engine; this way,
// we can flush if we go idle after some time and become inactive.
active.set(true);
Expand Down Expand Up @@ -2241,7 +2245,7 @@ private boolean assertLastestCommitUserData() throws IOException {
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
assert engineReferenceManager.isEngineLockHeldByCurrentThread();
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint);
refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo);
Expand All @@ -2252,9 +2256,9 @@ private void onNewEngine(Engine newEngine) {
*/
public void performRecoveryRestart() throws IOException {
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.acquireEngineLock()) {
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
IOUtils.close(currentEngineReference.getAndSet(null));
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> null));
resetRecoveryStage();
}
}
Expand All @@ -2264,7 +2268,7 @@ public void performRecoveryRestart() throws IOException {
*/
public void resetRecoveryStage() {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert currentEngineReference.get() == null;
assert assertEngineReferenceIsNull("reset recovery stage, engine should be null");
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -3298,7 +3302,7 @@ Engine getEngine() {
* closed.
*/
public Engine getEngineOrNull() {
return this.currentEngineReference.get();
return engineReferenceManager.get();
}

public void startRecovery(
Expand Down Expand Up @@ -4312,11 +4316,11 @@ public void resetEngine() {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert waitForEngineOrClosedShardListeners.isDone();
try {
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.acquireEngineLock()) {
verifyNotClosed();
getEngine().prepareForEngineReset();
var newEngine = createEngine(newEngineConfig(replicationTracker));
IOUtils.close(currentEngineReference.getAndSet(newEngine));
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> newEngine));
onNewEngine(newEngine);
}
onSettingsChanged();
Expand All @@ -4342,7 +4346,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
SetOnce<Engine> newEngineReference = new SetOnce<>();
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.acquireEngineLock()) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
Expand All @@ -4357,7 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.getEngineRef()) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
Expand All @@ -4368,7 +4372,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {

@Override
public IndexCommitRef acquireSafeIndexCommit() {
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.getEngineRef()) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
}
Expand All @@ -4379,17 +4383,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
@Override
public void close() throws IOException {
Engine newEngine;
synchronized (engineMutex) {
try (var engineRef = engineReferenceManager.getEngineRef()) {
newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
if (newEngine == engineRef.getEngineOrNull()) {
// we successfully installed the new engine so do not close it.
newEngine = null;
}
}
IOUtils.close(super::close, newEngine);
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
IOUtils.close(engineReferenceManager.getEngineAndSet(() -> readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand All @@ -4403,9 +4407,9 @@ public void close() throws IOException {
);
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
try (var ignored = engineReferenceManager.acquireEngineLock()) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
IOUtils.close(engineReferenceManager.getEngineAndSet(newEngineReference::get));
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
Expand Down Expand Up @@ -4516,4 +4520,11 @@ public void ensureMutable(ActionListener<Void> listener) {
l.onResponse(null);
}));
}

private boolean assertEngineReferenceIsNull(String message) {
// use accessor with no lock as this asserting method can be called anywhere,
// including under the refresh lock of the index reader.
assert engineReferenceManager.get() == null : message;
return true;
}
}