Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.lucene.util.CombinedBitSet;
import org.elasticsearch.search.dfs.AggregatedDfs;
Expand All @@ -53,7 +52,6 @@
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

Expand All @@ -80,7 +78,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
// don't create slices with less than this number of docs
private final int minimumDocsPerSlice;

private final Set<Thread> timeoutOverwrites = ConcurrentCollections.newConcurrentSet();
private volatile boolean timeExceeded = false;

/** constructor for non-concurrent search */
Expand Down Expand Up @@ -356,6 +353,8 @@ private <C extends Collector, T> T search(Weight weight, CollectorManager<C, T>
}
}

private static final ThreadLocal<Boolean> timeoutOverwrites = ThreadLocal.withInitial(() -> false);

/**
* Similar to the lucene implementation, with the following changes made:
* 1) postCollection is performed after each segment is collected. This is needed for aggregations, performed by search threads
Expand All @@ -379,12 +378,12 @@ public void search(List<LeafReaderContext> leaves, Weight weight, Collector coll
try {
// Search phase has finished, no longer need to check for timeout
// otherwise the aggregation post-collection phase might get cancelled.
boolean added = timeoutOverwrites.add(Thread.currentThread());
assert added;
assert timeoutOverwrites.get() == false;
timeoutOverwrites.set(true);
doAggregationPostCollection(collector);
} finally {
boolean removed = timeoutOverwrites.remove(Thread.currentThread());
assert removed;
assert timeoutOverwrites.get();
timeoutOverwrites.set(false);
}
}
}
Expand All @@ -402,7 +401,7 @@ public boolean timeExceeded() {
}

public void throwTimeExceededException() {
if (timeoutOverwrites.contains(Thread.currentThread()) == false) {
if (timeoutOverwrites.get() == false) {
throw new TimeExceededException();
}
}
Expand Down