Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
Expand Down Expand Up @@ -79,11 +78,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final TransportVersion minTransportVersion;
protected final Map<String, AliasFilter> aliasFilter;
protected final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps;
protected final SearchTimeProvider timeProvider;
Expand All @@ -93,8 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;

Expand Down Expand Up @@ -142,7 +138,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Arrays.sort(shardIterators);
this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode;
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
this.pendingExecutionsPerNode = maxConcurrentRequestsPerNode < shardsIts.size() ? new ConcurrentHashMap<>() : null;
this.timeProvider = timeProvider;
this.logger = logger;
this.searchTransportService = searchTransportService;
Expand All @@ -153,7 +149,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.nodeIdToConnection = nodeIdToConnection;
this.concreteIndexBoosts = concreteIndexBoosts;
this.clusterStateVersion = clusterState.version();
this.minTransportVersion = clusterState.getMinTransportVersion();
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
// register the release of the query consumer to free up the circuit breaker memory
Expand Down Expand Up @@ -254,7 +249,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
}

protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutionsPerNode = this.pendingExecutionsPerNode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why assign this to a local variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still faster than repeatedly accessing a field https://openjdk.org/jeps/8349536 :) Doesn't matter too much here, but I had a follow-up planned where some of these fields stop being final and get nulled out. I code up a bigger change and then break pieces out of it for review :) and while it's sort of an irrelelvant line here it avoids some git-history-churn and a trivial amount of CPU/memory work doing it this way I guess :D

if (pendingExecutionsPerNode != null) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
shard.getNodeId(),
n -> new PendingExecutions(maxConcurrentRequestsPerNode)
Expand Down Expand Up @@ -464,7 +460,7 @@ void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Excepti
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
synchronized (this.shardFailures) {
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(getNumShards());
Expand Down Expand Up @@ -585,10 +581,6 @@ private SearchResponse buildSearchResponse(
);
}

boolean buildPointInTimeFromSearchResults() {
return false;
}

/**
* Builds and sends the final search response back to the user.
*
Expand All @@ -602,23 +594,25 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
if (allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null;
final BytesReference searchContextId;
if (buildPointInTimeFromSearchResults()) {
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures);
} else {
if (request.source() != null
&& request.source().pointInTimeBuilder() != null
&& request.source().pointInTimeBuilder().singleSession() == false) {
searchContextId = request.source().pointInTimeBuilder().getEncodedId();
} else {
searchContextId = null;
}
}
ActionListener.respondAndRelease(listener, buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
ActionListener.respondAndRelease(
listener,
buildSearchResponse(
internalSearchResponse,
failures,
request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null,
buildSearchContextId(failures)
)
);
}
}

protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
var source = request.source();
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
? source.pointInTimeBuilder().getEncodedId()
: null;
}

/**
* This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
* will this method immediately fail the search request and return the failure to the issuer of the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -217,6 +219,7 @@ void runOpenPointInTimePhase(
) {
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
TransportVersion minTransportVersion = clusterState.getMinTransportVersion();
new AbstractSearchAsyncAction<>(
actionName,
logger,
Expand Down Expand Up @@ -266,8 +269,8 @@ protected void run() {
}

@Override
boolean buildPointInTimeFromSearchResults() {
return true;
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
return SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, minTransportVersion, failures);
}
}.start();
}
Expand Down