|
60 | 60 | import org.elasticsearch.transport.TransportService; |
61 | 61 |
|
62 | 62 | import java.io.IOException; |
63 | | -import java.util.Collections; |
64 | 63 | import java.util.HashSet; |
65 | 64 | import java.util.List; |
66 | 65 | import java.util.Map; |
67 | 66 | import java.util.Set; |
68 | | -import java.util.SortedMap; |
69 | | -import java.util.TreeMap; |
| 67 | +import java.util.concurrent.ConcurrentHashMap; |
70 | 68 | import java.util.concurrent.Executor; |
71 | 69 | import java.util.function.BiFunction; |
72 | 70 | import java.util.stream.Collectors; |
@@ -153,18 +151,18 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen |
153 | 151 | if (resolveCrossProject) { |
154 | 152 | IndicesOptions originalIndicesOptions = request.indicesOptions(); |
155 | 153 | if (false == originalIndicesOptions.ignoreUnavailable() || false == originalIndicesOptions.allowNoIndices()) { |
156 | | - final SortedMap<String, ResolveIndexAction.Response> remoteResponses = Collections.synchronizedSortedMap(new TreeMap<>()); |
157 | 154 | final ResolvedIndexExpressions localResolvedIndexExpressions = request.getResolvedIndexExpressions(); |
158 | 155 | RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService(); |
159 | | - final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices( |
| 156 | + final Map<String, OriginalIndices> indicesPerCluster = remoteClusterService.groupIndices( |
160 | 157 | indicesOptionsForCrossProjectFanout(originalIndicesOptions), |
161 | 158 | indices |
162 | 159 | ); |
163 | 160 | // local indices resolution was already taken care of by the Security Action Filter |
164 | | - remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); |
165 | | - if (false == remoteClusterIndices.isEmpty()) { |
166 | | - final int remoteRequests = remoteClusterIndices.size(); |
167 | | - final CountDown completionCounter = new CountDown(remoteRequests); |
| 161 | + indicesPerCluster.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); |
| 162 | + if (false == indicesPerCluster.isEmpty()) { |
| 163 | + final int linkedProjectsToQuery = indicesPerCluster.size(); |
| 164 | + final Map<String, ResolveIndexAction.Response> remoteResponses = new ConcurrentHashMap<>(linkedProjectsToQuery); |
| 165 | + final CountDown completionCounter = new CountDown(linkedProjectsToQuery); |
168 | 166 | final Runnable terminalHandler = () -> { |
169 | 167 | if (completionCounter.countDown()) { |
170 | 168 | Map<String, ResolvedIndexExpressions> resolvedRemoteExpressions = remoteResponses.entrySet() |
@@ -216,17 +214,19 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen |
216 | 214 | }; |
217 | 215 |
|
218 | 216 | // make CPS calls |
219 | | - for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) { |
220 | | - String clusterAlias = remoteIndices.getKey(); |
221 | | - OriginalIndices originalIndices = remoteIndices.getValue(); |
| 217 | + for (Map.Entry<String, OriginalIndices> remoteClusterIndices : indicesPerCluster.entrySet()) { |
| 218 | + String clusterAlias = remoteClusterIndices.getKey(); |
| 219 | + OriginalIndices originalIndices = remoteClusterIndices.getValue(); |
| 220 | + // form indicesOptionsForCrossProjectFanout |
| 221 | + IndicesOptions relaxedFanoutIndicesOptions = originalIndices.indicesOptions(); |
222 | 222 | var remoteClusterClient = remoteClusterService.getRemoteClusterClient( |
223 | 223 | clusterAlias, |
224 | 224 | EsExecutors.DIRECT_EXECUTOR_SERVICE, |
225 | | - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE |
| 225 | + RemoteClusterService.DisconnectedStrategy.FAIL_IF_DISCONNECTED |
226 | 226 | ); |
227 | 227 | ResolveIndexAction.Request remoteRequest = new ResolveIndexAction.Request( |
228 | 228 | originalIndices.indices(), |
229 | | - originalIndices.indicesOptions() |
| 229 | + relaxedFanoutIndicesOptions |
230 | 230 | ); |
231 | 231 | remoteClusterClient.execute(ResolveIndexAction.REMOTE_TYPE, remoteRequest, ActionListener.wrap(response -> { |
232 | 232 | remoteResponses.put(clusterAlias, response); |
|
0 commit comments