Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,18 @@ public void refreshShard(
) {
switch (policy) {
case NONE -> listener.onResponse(false);
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
case WAIT_UNTIL -> {
ActionListener<Boolean> wrapped;
if (location != null && indexShard.routingEntry().isSearchable() == false) {
var engineOrNull = indexShard.getEngineOrNull();
wrapped = listener.delegateFailure(
(l, forced) -> refreshUnpromotables(indexShard, engineOrNull, location, listener, forced, postWriteRefreshTimeout)
);
} else {
wrapped = listener;
}
});
waitUntil(indexShard, location, wrapped);
}
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
Expand Down Expand Up @@ -103,17 +100,16 @@ private static void waitUntil(IndexShard indexShard, Translog.Location location,

private void refreshUnpromotables(
IndexShard indexShard,
Engine engineOrNull, // to avoid accessing it under the RefreshListener's refreshLock
Translog.Location location,
ActionListener<Boolean> listener,
boolean forced,
@Nullable TimeValue postWriteRefreshTimeout
) {
Engine engineOrNull = indexShard.getEngineOrNull();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invocation has been caught by the new SafeEngineAccessThreadLocal

if (engineOrNull == null) {
listener.onFailure(new AlreadyClosedException("Engine closed during refresh."));
return;
}

engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> {
try (
ThreadContext.StoredContext ignore = transportService.getThreadPool()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,29 +501,31 @@ public void handleException(TransportException exp) {
);
} else {
setPhase(replicationTask, "primary");
// Resolve the engine upfront to avoid an unsafe access if the responseListener is called by a refresh listener
final var engineOrNull = syncGlobalCheckpointAfterOperation ? primaryShardReference.indexShard.getEngineOrNull() : null;

final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
adaptResponse(response, primaryShardReference.indexShard);

if (syncGlobalCheckpointAfterOperation) {
final var primary = primaryShardReference.indexShard;
if (engineOrNull != null) {
try {
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
// TODO ensure engine is still open
var seqNoStats = engineOrNull.getSeqNoStats(primary.getLastKnownGlobalCheckpoint());
primary.maybeSyncGlobalCheckpoint("post-operation", seqNoStats);
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
() -> format(
"%s failed to execute post-operation global checkpoint sync",
primaryShardReference.indexShard.shardId()
),
() -> format("%s failed to execute post-operation global checkpoint sync", primary.shardId()),
e
);
}
}
}

assert primaryShardReference.indexShard.isPrimaryMode();
assert primary.isPrimaryMode();
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.search.ReferenceManager;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@SuppressForbidden(reason = "reference counting is required here")
public abstract class AbstractReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {

@Nullable // if assertions are disabled
private final Map<RefreshListener, AssertingRefreshListener> assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null;

@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}

@Override
public final void addListener(RefreshListener listener) {
if (Assertions.ENABLED == false) {
super.addListener(listener);
return;
}

final var assertingListener = new AssertingRefreshListener(listener);
var previous = assertingListeners.put(listener, assertingListener);
assert previous == null : "listener already added";
super.addListener(assertingListener);
}

@Override
public final void removeListener(RefreshListener listener) {
if (Assertions.ENABLED == false) {
super.removeListener(listener);
return;
}

final var assertingListener = assertingListeners.remove(listener);
assert assertingListener != null : "listener already removed";
super.removeListener(assertingListener);
}

/**
* A delegating {@link RefreshListener} used to assert that refresh listeners are not accessing the engine within before/after refresh
* methods.
*/
private static class AssertingRefreshListener implements RefreshListener {

private final RefreshListener delegate;

private AssertingRefreshListener(RefreshListener delegate) {
this.delegate = Objects.requireNonNull(delegate);
if (Assertions.ENABLED == false) {
throw new AssertionError("Only use this when assertions are enabled");
}
}

@Override
public void beforeRefresh() throws IOException {
SafeEngineAccessThreadLocal.accessStart();
try {
delegate.beforeRefresh();
} finally {
SafeEngineAccessThreadLocal.accessEnd();
}
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
SafeEngineAccessThreadLocal.accessStart();
try {
delegate.afterRefresh(didRefresh);
} finally {
SafeEngineAccessThreadLocal.accessEnd();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.SuppressForbidden;

import java.io.IOException;

Expand All @@ -25,8 +23,7 @@
* @see SearcherManager
*
*/
@SuppressForbidden(reason = "reference counting is required here")
public class ElasticsearchReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
public class ElasticsearchReaderManager extends AbstractReaderManager {

/**
* Creates and returns a new ElasticsearchReaderManager from the given
Expand All @@ -39,23 +36,8 @@ public ElasticsearchReaderManager(ElasticsearchDirectoryReader reader) {
this.current = reader;
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}

@Override
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
}

@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.search.ReferenceManager;

/**
* A type of {@link ReferenceManager.RefreshListener} that is called back when a new {@link Engine} is instanciated.
*/
public interface EngineAwareRefreshListener extends ReferenceManager.RefreshListener {

void onNewEngine(Engine engine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -305,9 +304,15 @@ public InternalEngine(EngineConfig engineConfig) {
// don't allow commits until we are done with recovering
pendingTranslogRecovery.set(true);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
if (listener instanceof EngineAwareRefreshListener engineListener) {
engineListener.onNewEngine(this);
}
this.externalReaderManager.addListener(listener);
}
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
if (listener instanceof EngineAwareRefreshListener engineListener) {
engineListener.onNewEngine(this);
}
this.internalReaderManager.addListener(listener);
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
Expand Down Expand Up @@ -453,8 +458,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) {
* this specialized implementation an external refresh will immediately be reflected on the internal reader
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
*/
@SuppressForbidden(reason = "reference counting is required here")
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
private static final class ExternalReaderManager extends AbstractReaderManager {
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
private final ElasticsearchReaderManager internalReaderManager;
private boolean isWarmedUp; // guarded by refreshLock
Expand Down Expand Up @@ -495,21 +499,6 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea
return newReader; // steal the reference
}
}

@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}
}

@Override
Expand Down
Loading