Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,18 @@ public void refreshShard(
) {
switch (policy) {
case NONE -> listener.onResponse(false);
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
case WAIT_UNTIL -> {
ActionListener<Boolean> wrapped;
if (location != null && indexShard.routingEntry().isSearchable() == false) {
var engineOrNull = indexShard.getEngineOrNull();
wrapped = listener.delegateFailure(
(l, forced) -> refreshUnpromotables(indexShard, engineOrNull, location, listener, forced, postWriteRefreshTimeout)
);
} else {
wrapped = listener;
}
});
waitUntil(indexShard, location, wrapped);
}
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
Expand Down Expand Up @@ -103,17 +100,16 @@ private static void waitUntil(IndexShard indexShard, Translog.Location location,

private void refreshUnpromotables(
IndexShard indexShard,
Engine engineOrNull, // to avoid accessing it under the RefreshListener's refreshLock
Translog.Location location,
ActionListener<Boolean> listener,
boolean forced,
@Nullable TimeValue postWriteRefreshTimeout
) {
Engine engineOrNull = indexShard.getEngineOrNull();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invocation has been caught by the new SafeEngineAccessThreadLocal

if (engineOrNull == null) {
listener.onFailure(new AlreadyClosedException("Engine closed during refresh."));
return;
}

engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> {
try (
ThreadContext.StoredContext ignore = transportService.getThreadPool()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.search.ReferenceManager;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@SuppressForbidden(reason = "reference counting is required here")
public abstract class AbstractReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {

@Nullable // if assertions are disabled
private final Map<RefreshListener, AssertingRefreshListener> assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null;


@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}

@Override
public final void addListener(RefreshListener listener) {
if (Assertions.ENABLED == false) {
super.addListener(listener);
return;
}

final var assertingListener = new AssertingRefreshListener(listener);
var previous = assertingListeners.put(listener, assertingListener);
assert previous == null : "listener already added";
super.addListener(assertingListener);
}

@Override
public final void removeListener(RefreshListener listener) {
if (Assertions.ENABLED == false) {
super.removeListener(listener);
return;
}

final var assertingListener = assertingListeners.remove(listener);
assert assertingListener != null : "listener already removed";
super.removeListener(assertingListener);
}

/**
* A delegating {@link RefreshListener} used to assert that refresh listeners are not accessing the engine within before/after refresh
* methods.
*/
private static class AssertingRefreshListener implements RefreshListener {

private final RefreshListener delegate;

private AssertingRefreshListener(RefreshListener delegate) {
this.delegate = Objects.requireNonNull(delegate);
if (Assertions.ENABLED == false) {
throw new AssertionError("Only use this when assertions are enabled");
}
}

@Override
public void beforeRefresh() throws IOException {
SafeEngineAccessThreadLocal.accessStart();
try {
delegate.beforeRefresh();
} finally {
SafeEngineAccessThreadLocal.accessEnd();
}
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
SafeEngineAccessThreadLocal.accessStart();
try {
delegate.afterRefresh(didRefresh);
} finally {
SafeEngineAccessThreadLocal.accessEnd();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.SuppressForbidden;

import java.io.IOException;

Expand All @@ -25,8 +23,7 @@
* @see SearcherManager
*
*/
@SuppressForbidden(reason = "reference counting is required here")
public class ElasticsearchReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
public class ElasticsearchReaderManager extends AbstractReaderManager {

/**
* Creates and returns a new ElasticsearchReaderManager from the given
Expand All @@ -39,23 +36,8 @@ public ElasticsearchReaderManager(ElasticsearchDirectoryReader reader) {
this.current = reader;
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}

@Override
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
}

@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.search.ReferenceManager;

/**
* A type of {@link ReferenceManager.RefreshListener} that is called back when a new {@link Engine} is instanciated.
*/
public interface EngineAwareRefreshListener extends ReferenceManager.RefreshListener {

void onNewEngine(Engine engine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -453,8 +452,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) {
* this specialized implementation an external refresh will immediately be reflected on the internal reader
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
*/
@SuppressForbidden(reason = "reference counting is required here")
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
private static final class ExternalReaderManager extends AbstractReaderManager {
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
private final ElasticsearchReaderManager internalReaderManager;
private boolean isWarmedUp; // guarded by refreshLock
Expand Down Expand Up @@ -495,21 +493,6 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea
return newReader; // steal the reference
}
}

@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}

@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.IndexShard;

/**
* This class is used to assert that a thread does not access the current engine reference using the method
* {@link IndexShard#getEngineOrNull()} when it is executing some protected code blocks. A protected code block is a
* block of code which starts when {@link #accessStart()} is called and ends when {@link #accessEnd()} is called.
*/
public final class SafeEngineAccessThreadLocal {

@Nullable // if assertions are disabled
private static final ThreadLocal<Accessor> threadLocalAccessor;
static {
threadLocalAccessor = Assertions.ENABLED ? new ThreadLocal<>() : null;
}

private static class Accessor {

private final Thread thread;
private final SetOnce<AssertionError> failure;

private Accessor(Thread thread) {
this.thread = thread;
this.failure = new SetOnce<>();
}

private void setFailure(AssertionError error) {
failure.set(error);
}

private boolean isFailed() {
return failure.get() != null;
}

private AssertionError getFailure() {
return failure.get();
}

@Override
public String toString() {
return thread.toString();
}
}

private SafeEngineAccessThreadLocal() {}

private static Accessor getAccessorSafe() {
final var accessor = threadLocalAccessor.get();
if (accessor != null && accessor.isFailed()) {
throw new AssertionError("Current thread has made an unsafe access to the engine", accessor.getFailure());
}
return accessor;
}

public static void accessStart() {
ensureAssertionsEnabled();
final var accessor = getAccessorSafe();
assert accessor == null : "current accessor already set";
threadLocalAccessor.set(new Accessor(Thread.currentThread()));
}

public static void accessEnd() {
ensureAssertionsEnabled();
final var accessor = getAccessorSafe();
assert accessor != null : "current accessor not set";
threadLocalAccessor.remove();
}

/**
* Use this method to assert that the current thread has not entered a protected execution code block.
*/
public static boolean assertNoAccessByCurrentThread() {
ensureAssertionsEnabled();
final var accessor = getAccessorSafe();
if (accessor != null) {
var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method";
accessor.setFailure(new AssertionError(message)); // to be thrown later
assert false : message;
}
return true;
}

private static void ensureAssertionsEnabled() {
if (Assertions.ENABLED == false) {
throw new AssertionError("Only use this method when assertions are enabled");
}
assert threadLocalAccessor != null;
}
}
Loading