Skip to content

Commit efebd52

Browse files
committed
Ensure that RefreshListener do not access engine under refresh lock
1 parent 88b5900 commit efebd52

File tree

10 files changed

+314
-74
lines changed

10 files changed

+314
-74
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,18 @@ public void refreshShard(
4949
) {
5050
switch (policy) {
5151
case NONE -> listener.onResponse(false);
52-
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
53-
@Override
54-
public void onResponse(Boolean forced) {
55-
if (location != null && indexShard.routingEntry().isSearchable() == false) {
56-
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
57-
} else {
58-
listener.onResponse(forced);
59-
}
60-
}
61-
62-
@Override
63-
public void onFailure(Exception e) {
64-
listener.onFailure(e);
52+
case WAIT_UNTIL -> {
53+
ActionListener<Boolean> wrapped;
54+
if (location != null && indexShard.routingEntry().isSearchable() == false) {
55+
var engineOrNull = indexShard.getEngineOrNull();
56+
wrapped = listener.delegateFailure(
57+
(l, forced) -> refreshUnpromotables(indexShard, engineOrNull, location, listener, forced, postWriteRefreshTimeout)
58+
);
59+
} else {
60+
wrapped = listener;
6561
}
66-
});
62+
waitUntil(indexShard, location, wrapped);
63+
}
6764
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
6865
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
6966
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
@@ -103,17 +100,16 @@ private static void waitUntil(IndexShard indexShard, Translog.Location location,
103100

104101
private void refreshUnpromotables(
105102
IndexShard indexShard,
103+
Engine engineOrNull, // to avoid accessing it under the RefreshListener's refreshLock
106104
Translog.Location location,
107105
ActionListener<Boolean> listener,
108106
boolean forced,
109107
@Nullable TimeValue postWriteRefreshTimeout
110108
) {
111-
Engine engineOrNull = indexShard.getEngineOrNull();
112109
if (engineOrNull == null) {
113110
listener.onFailure(new AlreadyClosedException("Engine closed during refresh."));
114111
return;
115112
}
116-
117113
engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> {
118114
try (
119115
ThreadContext.StoredContext ignore = transportService.getThreadPool()
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.search.ReferenceManager;
13+
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
14+
import org.elasticsearch.core.Assertions;
15+
import org.elasticsearch.core.Nullable;
16+
import org.elasticsearch.core.SuppressForbidden;
17+
18+
import java.io.IOException;
19+
import java.util.Map;
20+
import java.util.Objects;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
23+
@SuppressForbidden(reason = "reference counting is required here")
24+
public abstract class AbstractReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
25+
26+
@Nullable // if assertions are disabled
27+
private final Map<RefreshListener, AssertingRefreshListener> assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null;
28+
29+
30+
@Override
31+
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
32+
return reference.tryIncRef();
33+
}
34+
35+
@Override
36+
protected int getRefCount(ElasticsearchDirectoryReader reference) {
37+
return reference.getRefCount();
38+
}
39+
40+
@Override
41+
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
42+
reference.decRef();
43+
}
44+
45+
@Override
46+
public final void addListener(RefreshListener listener) {
47+
if (Assertions.ENABLED == false) {
48+
super.addListener(listener);
49+
return;
50+
}
51+
52+
final var assertingListener = new AssertingRefreshListener(listener);
53+
var previous = assertingListeners.put(listener, assertingListener);
54+
assert previous == null : "listener already added";
55+
super.addListener(assertingListener);
56+
}
57+
58+
@Override
59+
public final void removeListener(RefreshListener listener) {
60+
if (Assertions.ENABLED == false) {
61+
super.removeListener(listener);
62+
return;
63+
}
64+
65+
final var assertingListener = assertingListeners.remove(listener);
66+
assert assertingListener != null : "listener already removed";
67+
super.removeListener(assertingListener);
68+
}
69+
70+
/**
71+
* A delegating {@link RefreshListener} used to assert that refresh listeners are not accessing the engine within before/after refresh
72+
* methods.
73+
*/
74+
private static class AssertingRefreshListener implements RefreshListener {
75+
76+
private final RefreshListener delegate;
77+
78+
private AssertingRefreshListener(RefreshListener delegate) {
79+
this.delegate = Objects.requireNonNull(delegate);
80+
if (Assertions.ENABLED == false) {
81+
throw new AssertionError("Only use this when assertions are enabled");
82+
}
83+
}
84+
85+
@Override
86+
public void beforeRefresh() throws IOException {
87+
SafeEngineAccessThreadLocal.accessStart();
88+
try {
89+
delegate.beforeRefresh();
90+
} finally {
91+
SafeEngineAccessThreadLocal.accessEnd();
92+
}
93+
}
94+
95+
@Override
96+
public void afterRefresh(boolean didRefresh) throws IOException {
97+
SafeEngineAccessThreadLocal.accessStart();
98+
try {
99+
delegate.afterRefresh(didRefresh);
100+
} finally {
101+
SafeEngineAccessThreadLocal.accessEnd();
102+
}
103+
}
104+
}
105+
}

server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.lucene.index.DirectoryReader;
13-
import org.apache.lucene.search.ReferenceManager;
1413
import org.apache.lucene.search.SearcherManager;
1514
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
16-
import org.elasticsearch.core.SuppressForbidden;
1715

1816
import java.io.IOException;
1917

@@ -25,8 +23,7 @@
2523
* @see SearcherManager
2624
*
2725
*/
28-
@SuppressForbidden(reason = "reference counting is required here")
29-
public class ElasticsearchReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
26+
public class ElasticsearchReaderManager extends AbstractReaderManager {
3027

3128
/**
3229
* Creates and returns a new ElasticsearchReaderManager from the given
@@ -39,23 +36,8 @@ public ElasticsearchReaderManager(ElasticsearchDirectoryReader reader) {
3936
this.current = reader;
4037
}
4138

42-
@Override
43-
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
44-
reference.decRef();
45-
}
46-
4739
@Override
4840
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
4941
return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
5042
}
51-
52-
@Override
53-
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
54-
return reference.tryIncRef();
55-
}
56-
57-
@Override
58-
protected int getRefCount(ElasticsearchDirectoryReader reference) {
59-
return reference.getRefCount();
60-
}
6143
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.search.ReferenceManager;
13+
14+
/**
15+
* A type of {@link ReferenceManager.RefreshListener} that is called back when a new {@link Engine} is instanciated.
16+
*/
17+
public interface EngineAwareRefreshListener extends ReferenceManager.RefreshListener {
18+
19+
void onNewEngine(Engine engine);
20+
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import org.elasticsearch.core.IOUtils;
7272
import org.elasticsearch.core.Nullable;
7373
import org.elasticsearch.core.Releasable;
74-
import org.elasticsearch.core.SuppressForbidden;
7574
import org.elasticsearch.core.Tuple;
7675
import org.elasticsearch.env.Environment;
7776
import org.elasticsearch.index.IndexMode;
@@ -453,8 +452,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) {
453452
* this specialized implementation an external refresh will immediately be reflected on the internal reader
454453
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
455454
*/
456-
@SuppressForbidden(reason = "reference counting is required here")
457-
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
455+
private static final class ExternalReaderManager extends AbstractReaderManager {
458456
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
459457
private final ElasticsearchReaderManager internalReaderManager;
460458
private boolean isWarmedUp; // guarded by refreshLock
@@ -495,21 +493,6 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea
495493
return newReader; // steal the reference
496494
}
497495
}
498-
499-
@Override
500-
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
501-
return reference.tryIncRef();
502-
}
503-
504-
@Override
505-
protected int getRefCount(ElasticsearchDirectoryReader reference) {
506-
return reference.getRefCount();
507-
}
508-
509-
@Override
510-
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
511-
reference.decRef();
512-
}
513496
}
514497

515498
@Override
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.util.SetOnce;
13+
import org.elasticsearch.core.Assertions;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.index.shard.IndexShard;
16+
17+
/**
18+
* This class is used to assert that a thread does not access the current engine reference using the method
19+
* {@link IndexShard#getEngineOrNull()} when it is executing some protected code blocks. A protected code block is a
20+
* block of code which starts when {@link #accessStart()} is called and ends when {@link #accessEnd()} is called.
21+
*/
22+
public final class SafeEngineAccessThreadLocal {
23+
24+
@Nullable // if assertions are disabled
25+
private static final ThreadLocal<Accessor> threadLocalAccessor;
26+
static {
27+
threadLocalAccessor = Assertions.ENABLED ? new ThreadLocal<>() : null;
28+
}
29+
30+
private static class Accessor {
31+
32+
private final Thread thread;
33+
private final SetOnce<AssertionError> failure;
34+
35+
private Accessor(Thread thread) {
36+
this.thread = thread;
37+
this.failure = new SetOnce<>();
38+
}
39+
40+
private void setFailure(AssertionError error) {
41+
failure.set(error);
42+
}
43+
44+
private boolean isFailed() {
45+
return failure.get() != null;
46+
}
47+
48+
private AssertionError getFailure() {
49+
return failure.get();
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return thread.toString();
55+
}
56+
}
57+
58+
private SafeEngineAccessThreadLocal() {}
59+
60+
private static Accessor getAccessorSafe() {
61+
final var accessor = threadLocalAccessor.get();
62+
if (accessor != null && accessor.isFailed()) {
63+
throw new AssertionError("Current thread has made an unsafe access to the engine", accessor.getFailure());
64+
}
65+
return accessor;
66+
}
67+
68+
public static void accessStart() {
69+
ensureAssertionsEnabled();
70+
final var accessor = getAccessorSafe();
71+
assert accessor == null : "current accessor already set";
72+
threadLocalAccessor.set(new Accessor(Thread.currentThread()));
73+
}
74+
75+
public static void accessEnd() {
76+
ensureAssertionsEnabled();
77+
final var accessor = getAccessorSafe();
78+
assert accessor != null : "current accessor not set";
79+
threadLocalAccessor.remove();
80+
}
81+
82+
/**
83+
* Use this method to assert that the current thread has not entered a protected execution code block.
84+
*/
85+
public static boolean assertNoAccessByCurrentThread() {
86+
ensureAssertionsEnabled();
87+
final var accessor = getAccessorSafe();
88+
if (accessor != null) {
89+
var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method";
90+
accessor.setFailure(new AssertionError(message)); // to be thrown later
91+
assert false : message;
92+
}
93+
return true;
94+
}
95+
96+
private static void ensureAssertionsEnabled() {
97+
if (Assertions.ENABLED == false) {
98+
throw new AssertionError("Only use this method when assertions are enabled");
99+
}
100+
assert threadLocalAccessor != null;
101+
}
102+
}

0 commit comments

Comments
 (0)