Skip to content

Commit 26438cb

Browse files
committed
Ensure no circular reference in translog tragic exception (#55959)
We generate a circular reference exception in translog in 6.8 in the following scenario: - The first rollGeneration hits "too many open files" exception when it's copying a checkpoint file. We will set the tragic exception and close the translog - The second rollGeneration hits AlreadyClosedException as the current writer is closed. We will suppress the ACE to the current tragic exception. Unfortunately, this leads to a circular reference as ACE already suppresses the tragic exception. Other factors that help to manifest this bug: - We do not fail the engine on AlreadyClosedException in flush - We do not check for ensureOpen before rolling a new generation Closes #55893
1 parent 6f244eb commit 26438cb

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@
3838
import java.util.Arrays;
3939
import java.util.Collections;
4040
import java.util.HashSet;
41+
import java.util.IdentityHashMap;
4142
import java.util.LinkedList;
4243
import java.util.List;
4344
import java.util.Objects;
4445
import java.util.Optional;
4546
import java.util.Queue;
4647
import java.util.Set;
48+
import java.util.function.Predicate;
4749
import java.util.stream.Collectors;
4850

4951
public final class ExceptionsHelper {
@@ -252,6 +254,31 @@ public static Optional<Error> maybeError(final Throwable cause, final Logger log
252254
return Optional.empty();
253255
}
254256

257+
@SuppressWarnings("unchecked")
258+
public static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwable cause, Predicate<Throwable> predicate) {
259+
if (predicate.test(cause)) {
260+
return Optional.of((T) cause);
261+
}
262+
263+
final Queue<Throwable> queue = new LinkedList<>();
264+
queue.add(cause);
265+
final Set<Throwable> seen = Collections.newSetFromMap(new IdentityHashMap<>());
266+
while (queue.isEmpty() == false) {
267+
final Throwable current = queue.remove();
268+
if (seen.add(current) == false) {
269+
continue;
270+
}
271+
if (predicate.test(current)) {
272+
return Optional.of((T) current);
273+
}
274+
Collections.addAll(queue, current.getSuppressed());
275+
if (current.getCause() != null) {
276+
queue.add(current.getCause());
277+
}
278+
}
279+
return Optional.empty();
280+
}
281+
255282
/**
256283
* See {@link #maybeError(Throwable, Logger)}. Uses the class-local logger.
257284
*/

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
18381838
refresh("version_table_flush", SearcherScope.INTERNAL);
18391839
translog.trimUnreferencedReaders();
18401840
} catch (AlreadyClosedException e) {
1841+
failOnTragicEvent(e);
18411842
throw e;
18421843
} catch (Exception e) {
18431844
throw new FlushFailedEngineException(shardId, e);

server/src/main/java/org/elasticsearch/index/translog/TragicExceptionHolder.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22+
import org.apache.lucene.store.AlreadyClosedException;
23+
import org.elasticsearch.ExceptionsHelper;
24+
2225
import java.util.concurrent.atomic.AtomicReference;
2326

2427
public class TragicExceptionHolder {
@@ -30,10 +33,15 @@ public class TragicExceptionHolder {
3033
*/
3134
public void setTragicException(Exception ex) {
3235
assert ex != null;
33-
if (tragedy.compareAndSet(null, ex) == false) {
34-
if (tragedy.get() != ex) { // to ensure there is no self-suppression
35-
tragedy.get().addSuppressed(ex);
36-
}
36+
if (tragedy.compareAndSet(null, ex)) {
37+
return; // first exception
38+
}
39+
final Exception tragedy = this.tragedy.get();
40+
// ensure no circular reference
41+
if (ExceptionsHelper.unwrapCausesAndSuppressed(ex, e -> e == tragedy).isPresent()) {
42+
assert ex == tragedy || ex instanceof AlreadyClosedException : new AssertionError("must be ACE or tragic exception", ex);
43+
} else {
44+
tragedy.addSuppressed(ex);
3745
}
3846
}
3947

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1702,6 +1702,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
17021702
*/
17031703
public void rollGeneration() throws IOException {
17041704
try (Releasable ignored = writeLock.acquire()) {
1705+
ensureOpen();
17051706
try {
17061707
final TranslogReader reader = current.closeIntoReader();
17071708
readers.add(reader);

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,12 @@
9898
import java.util.Deque;
9999
import java.util.HashMap;
100100
import java.util.HashSet;
101+
import java.util.IdentityHashMap;
101102
import java.util.Iterator;
103+
import java.util.LinkedList;
102104
import java.util.List;
103105
import java.util.Map;
106+
import java.util.Queue;
104107
import java.util.Set;
105108
import java.util.concurrent.ArrayBlockingQueue;
106109
import java.util.concurrent.BlockingQueue;
@@ -3270,4 +3273,58 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
32703273
}
32713274
}
32723275
}
3276+
3277+
public void testEnsureNoCircularException() throws Exception {
3278+
final AtomicBoolean failedToSyncCheckpoint = new AtomicBoolean();
3279+
final ChannelFactory channelFactory = (file, openOption) -> {
3280+
final FileChannel channel = FileChannel.open(file, openOption);
3281+
return new FilterFileChannel(channel) {
3282+
@Override
3283+
public void force(boolean metaData) throws IOException {
3284+
if (failedToSyncCheckpoint.get()) {
3285+
throw new IOException("simulated");
3286+
}
3287+
super.force(metaData);
3288+
}
3289+
};
3290+
};
3291+
final TranslogConfig config = getTranslogConfig(createTempDir());
3292+
final String translogUUID = Translog.createEmptyTranslog(
3293+
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
3294+
final Translog translog = new Translog(config, translogUUID, createTranslogDeletionPolicy(),
3295+
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) {
3296+
@Override
3297+
ChannelFactory getChannelFactory() {
3298+
return channelFactory;
3299+
}
3300+
};
3301+
try {
3302+
translog.add(new Translog.Index("1", "_doc", 1, primaryTerm.get(), new byte[]{1}));
3303+
failedToSyncCheckpoint.set(true);
3304+
expectThrows(IOException.class, translog::rollGeneration);
3305+
final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, translog::rollGeneration);
3306+
if (hasCircularReference(alreadyClosedException)) {
3307+
throw new AssertionError("detect circular reference exception", alreadyClosedException);
3308+
}
3309+
} finally {
3310+
IOUtils.close(translog);
3311+
}
3312+
}
3313+
3314+
static boolean hasCircularReference(Exception cause) {
3315+
final Queue<Throwable> queue = new LinkedList<>();
3316+
queue.add(cause);
3317+
final Set<Throwable> seen = Collections.newSetFromMap(new IdentityHashMap<>());
3318+
while (queue.isEmpty() == false) {
3319+
final Throwable current = queue.remove();
3320+
if (seen.add(current) == false) {
3321+
return true;
3322+
}
3323+
Collections.addAll(queue, current.getSuppressed());
3324+
if (current.getCause() != null) {
3325+
queue.add(current.getCause());
3326+
}
3327+
}
3328+
return false;
3329+
}
32733330
}

0 commit comments

Comments
 (0)