Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,18 @@ protected Writeable.Reader<GetResponse> getResponseReader() {
}

@Override
protected Executor getExecutor(GetRequest request, ShardId shardId) {
protected Executor getExecutor(ShardId shardId) {
final ClusterState clusterState = clusterService.state();
if (projectResolver.getProjectMetadata(clusterState).getIndexSafe(shardId.getIndex()).isSystem()) {
return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName()));
} else {
return super.getExecutor(request, shardId);
return super.getExecutor(shardId);
}
}

private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
if (request.refresh() && request.realtime() == false) {
getExecutor(request, shardId).execute(ActionRunnable.wrap(listener, l -> {
getExecutor(shardId).execute(ActionRunnable.wrap(listener, l -> {
var indexShard = getIndexShard(shardId);
indexShard.externalRefresh("refresh_flag_get", l.map(r -> shardOperation(request, shardId)));
}));
Expand Down Expand Up @@ -300,7 +300,7 @@ private void tryGetFromTranslog(GetRequest request, IndexShard indexShard, Disco
indexShard.waitForPrimaryTermAndGeneration(r.primaryTerm(), r.segmentGeneration(), termAndGenerationListener);
}
}
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
}), TransportGetFromTranslogAction.Response::new, getExecutor(shardId))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha
}

@Override
protected Executor getExecutor(MultiGetShardRequest request, ShardId shardId) {
protected Executor getExecutor(ShardId shardId) {
final ClusterState clusterState = clusterService.state();
if (projectResolver.getProjectMetadata(clusterState).index(shardId.getIndex()).isSystem()) {
return threadPool.executor(executorSelector.executorForGet(shardId.getIndexName()));
} else {
return super.getExecutor(request, shardId);
return super.getExecutor(shardId);
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ private void tryShardMultiGetFromTranslog(
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
final ActionListener<Long> termAndGenerationListener = ContextPreservingActionListener.wrapPreservingContext(
listener.delegateFailureAndWrap(
(ll, aLong) -> getExecutor(request, shardId).execute(
(ll, aLong) -> getExecutor(shardId).execute(
ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId))
)
),
Expand All @@ -299,7 +299,7 @@ private void tryShardMultiGetFromTranslog(
indexShard.waitForPrimaryTermAndGeneration(r.primaryTerm(), r.segmentGeneration(), termAndGenerationListener);
}
}
}), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId))
}), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(shardId))
);
}

Expand Down Expand Up @@ -353,7 +353,7 @@ private void getAndAddToResponse(
private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener)
throws IOException {
if (request.refresh() && request.realtime() == false) {
getExecutor(request, shardId).execute(ActionRunnable.wrap(listener, l -> {
getExecutor(shardId).execute(ActionRunnable.wrap(listener, l -> {
var indexShard = getIndexShard(shardId);
indexShard.externalRefresh("refresh_flag_mget", l.map(r -> shardOperation(request, shardId)));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;

protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
getExecutor(request, shardId).execute(ActionRunnable.supplyAndDecRef(listener, () -> shardOperation(request, shardId)));
getExecutor(shardId).execute(ActionRunnable.supplyAndDecRef(listener, () -> shardOperation(request, shardId)));
}

protected abstract Writeable.Reader<Response> getResponseReader();
Expand Down Expand Up @@ -300,7 +300,7 @@ public String concreteIndex() {
}
}

protected Executor getExecutor(Request request, ShardId shardId) {
protected Executor getExecutor(ShardId shardId) {
return executor;
}
}