Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -145,6 +148,7 @@ private void doExecuteForked(
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
final Executor singleThreadedExecutor = buildSingleThreadedExecutor();
assert task instanceof CancellableTask;
final CancellableTask fieldCapTask = (CancellableTask) task;
// retrieve the initial timestamp in case the action is a cross cluster search
Expand All @@ -168,9 +172,9 @@ private void doExecuteForked(

checkIndexBlocks(projectState, concreteIndices);
final FailureCollector indexFailures = new FailureCollector();
final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
final Map<String, FieldCapabilitiesIndexResponse> indexResponses = new HashMap<>();
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
final Map<String, FieldCapabilitiesIndexResponse> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
final Map<String, FieldCapabilitiesIndexResponse> indexMappingHashToResponses = new HashMap<>();
final Runnable releaseResourcesOnCancel = () -> {
LOGGER.trace("clear index responses on cancellation");
indexFailures.clear();
Expand Down Expand Up @@ -229,7 +233,7 @@ private void doExecuteForked(
final var finishedOrCancelled = new AtomicBoolean();
fieldCapTask.addListener(() -> {
if (finishedOrCancelled.compareAndSet(false, true)) {
releaseResourcesOnCancel.run();
singleThreadedExecutor.execute(releaseResourcesOnCancel);
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
Expand All @@ -250,7 +254,7 @@ private void doExecuteForked(
localIndices,
nowInMillis,
concreteIndices,
searchCoordinationExecutor,
singleThreadedExecutor,
handleIndexResponse,
handleIndexFailure,
refs.acquire()::close
Expand All @@ -265,7 +269,7 @@ private void doExecuteForked(
var remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(
clusterAlias,
searchCoordinationExecutor,
singleThreadedExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
Expand Down Expand Up @@ -300,7 +304,7 @@ private void doExecuteForked(
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
// TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
new ForkingOnFailureActionListener<>(
searchCoordinationExecutor,
singleThreadedExecutor,
true,
ActionListener.releaseAfter(remoteListener, refs.acquire())
)
Expand All @@ -309,6 +313,29 @@ private void doExecuteForked(
}
}

private Executor buildSingleThreadedExecutor() {
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor);
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
r.run();
}
}

@Override
public void onFailure(Exception e) {
if (r instanceof AbstractRunnable abstractRunnable) {
abstractRunnable.onFailure(e);
} else {
// should be impossible, we should always submit an AbstractRunnable
logger.error("unexpected failure running " + r, e);
assert false : new AssertionError("unexpected failure running " + r, e);
}
}
});
}

public interface RemoteRequestExecutor {
void executeRemoteRequest(
RemoteClusterClient remoteClient,
Expand Down Expand Up @@ -546,7 +573,7 @@ private static void innerMerge(
* list, these failures will be skipped because they have no affect on the final response.
*/
private static final class FailureCollector {
private final Map<String, Exception> failuresByIndex = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Exception> failuresByIndex = new HashMap<>();

List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
Map<Tuple<String, String>, FieldCapabilitiesFailure> indexFailures = new HashMap<>();
Expand Down