2424import org .elasticsearch .action .admin .indices .resolve .ResolveIndexAction ;
2525import org .elasticsearch .action .support .ActionFilters ;
2626import org .elasticsearch .action .support .ChannelActionListener ;
27+ import org .elasticsearch .action .support .GroupedActionListener ;
2728import org .elasticsearch .action .support .HandledTransportAction ;
2829import org .elasticsearch .action .support .IndicesOptions ;
30+ import org .elasticsearch .action .support .SubscribableListener ;
2931import org .elasticsearch .client .internal .Client ;
3032import org .elasticsearch .cluster .ClusterState ;
3133import org .elasticsearch .cluster .service .ClusterService ;
3234import org .elasticsearch .common .bytes .BytesReference ;
3335import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
3436import org .elasticsearch .common .io .stream .StreamInput ;
3537import org .elasticsearch .common .io .stream .StreamOutput ;
36- import org .elasticsearch .common .util .concurrent .CountDown ;
3738import org .elasticsearch .common .util .concurrent .EsExecutors ;
3839import org .elasticsearch .core .TimeValue ;
3940import org .elasticsearch .index .shard .ShardId ;
5657import org .elasticsearch .transport .TransportActionProxy ;
5758import org .elasticsearch .transport .TransportChannel ;
5859import org .elasticsearch .transport .TransportRequestHandler ;
60+ import org .elasticsearch .transport .TransportRequestOptions ;
5961import org .elasticsearch .transport .TransportResponseHandler ;
6062import org .elasticsearch .transport .TransportService ;
6163
6264import java .io .IOException ;
65+ import java .util .Collection ;
6366import java .util .HashSet ;
6467import java .util .List ;
6568import java .util .Map ;
6669import java .util .Set ;
67- import java .util .concurrent .ConcurrentHashMap ;
6870import java .util .concurrent .Executor ;
6971import java .util .function .BiFunction ;
7072import java .util .stream .Collectors ;
@@ -178,18 +180,25 @@ private void executeOpenPitCrossProject(
178180
179181 if (indicesPerCluster .isEmpty ()) {
180182 // for CPS requests that are targeting origin only, could be because of project_routing or other reasons, execute standard pit.
183+ final Exception ex = CrossProjectIndexResolutionValidator .validate (
184+ originalIndicesOptions ,
185+ request .getProjectRouting (),
186+ localResolvedIndexExpressions ,
187+ Map .of ()
188+ );
189+ if (ex != null ) {
190+ listener .onFailure (ex );
191+ return ;
192+ }
181193 executeOpenPit (task , request , listener );
182194 return ;
183195 }
184196
185197 // CPS
186198 final int linkedProjectsToQuery = indicesPerCluster .size ();
187- final Map <String , ResolveIndexAction .Response > remoteResponses = new ConcurrentHashMap <>(linkedProjectsToQuery );
188- final CountDown completionCounter = new CountDown (linkedProjectsToQuery );
189- final Runnable terminalHandler = () -> {
190- if (completionCounter .countDown ()) {
191- Map <String , ResolvedIndexExpressions > resolvedRemoteExpressions = remoteResponses .entrySet ()
192- .stream ()
199+ ActionListener <Collection <Map .Entry <String , ResolveIndexAction .Response >>> responsesListener = listener .delegateFailureAndWrap (
200+ (l , responses ) -> {
201+ Map <String , ResolvedIndexExpressions > resolvedRemoteExpressions = responses .stream ()
193202 .filter (e -> e .getValue ().getResolvedIndexExpressions () != null )
194203 .collect (
195204 Collectors .toMap (
@@ -198,7 +207,6 @@ private void executeOpenPitCrossProject(
198207
199208 )
200209 );
201-
202210 final Exception ex = CrossProjectIndexResolutionValidator .validate (
203211 originalIndicesOptions ,
204212 request .getProjectRouting (),
@@ -233,31 +241,47 @@ private void executeOpenPitCrossProject(
233241 request .indices (collectedIndices .toArray (String []::new ));
234242 executeOpenPit (task , request , listener );
235243 }
236- };
244+ );
245+ ActionListener <Map .Entry <String , ResolveIndexAction .Response >> groupedListener = new GroupedActionListener <>(
246+ linkedProjectsToQuery ,
247+ responsesListener
248+ );
237249
238250 // make CPS calls
239251 for (Map .Entry <String , OriginalIndices > remoteClusterIndices : indicesPerCluster .entrySet ()) {
240252 String clusterAlias = remoteClusterIndices .getKey ();
241253 OriginalIndices originalIndices = remoteClusterIndices .getValue ();
242- // form indicesOptionsForCrossProjectFanout
243- IndicesOptions relaxedFanoutIndicesOptions = originalIndices .indicesOptions ();
244- var remoteClusterClient = remoteClusterService .getRemoteClusterClient (
245- clusterAlias ,
246- EsExecutors .DIRECT_EXECUTOR_SERVICE ,
247- RemoteClusterService .DisconnectedStrategy .FAIL_IF_DISCONNECTED
254+ IndicesOptions relaxedFanoutIdxOptions = originalIndices .indicesOptions (); // form indicesOptionsForCrossProjectFanout
255+ ResolveIndexAction .Request remoteRequest = new ResolveIndexAction .Request (originalIndices .indices (), relaxedFanoutIdxOptions );
256+
257+ SubscribableListener <Transport .Connection > connectionListener = new SubscribableListener <>();
258+ connectionListener .addTimeout (
259+ TimeValue .timeValueSeconds (3L ),
260+ transportService .getThreadPool (),
261+ EsExecutors .DIRECT_EXECUTOR_SERVICE
248262 );
249- ResolveIndexAction .Request remoteRequest = new ResolveIndexAction .Request (
250- originalIndices .indices (),
251- relaxedFanoutIndicesOptions
252- );
253- remoteClusterClient .execute (ResolveIndexAction .REMOTE_TYPE , remoteRequest , ActionListener .wrap (response -> {
254- remoteResponses .put (clusterAlias , response );
255- terminalHandler .run ();
256- }, failure -> {
257- logger .info ("failed to resolve indices on remote cluster [" + clusterAlias + "]" , failure );
258- terminalHandler .run ();
259- }));
260263
264+ connectionListener .addListener (groupedListener .delegateResponse ((l , failure ) -> {
265+ logger .info ("failed to resolve indices on remote cluster [{}]" , clusterAlias , failure );
266+ l .onFailure (failure );
267+ })
268+ .delegateFailure (
269+ (ignored , connection ) -> transportService .sendRequest (
270+ connection ,
271+ ResolveIndexAction .REMOTE_TYPE .name (),
272+ remoteRequest ,
273+ TransportRequestOptions .EMPTY ,
274+ new ActionListenerResponseHandler <>(groupedListener .delegateResponse ((l , failure ) -> {
275+ logger .info ("Error occurred on remote cluster [{}]" , clusterAlias , failure );
276+ l .onFailure (failure );
277+ }).map (resolveIndexResponse -> Map .entry (clusterAlias , resolveIndexResponse )),
278+ ResolveIndexAction .Response ::new ,
279+ EsExecutors .DIRECT_EXECUTOR_SERVICE
280+ )
281+ )
282+ ));
283+
284+ remoteClusterService .maybeEnsureConnectedAndGetConnection (clusterAlias , true , connectionListener );
261285 }
262286 }
263287
0 commit comments