@@ -143,106 +143,121 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
143143 );
144144 return ;
145145 }
146+
147+ final boolean resolveCrossProject = crossProjectModeDecider .resolvesCrossProject (request );
148+ if (resolveCrossProject ) {
149+ executeOpenPitCrossProject ((SearchTask ) task , request , listener );
150+ } else {
151+ executeOpenPit ((SearchTask ) task , request , listener );
152+ }
153+ }
154+
155+ private void executeOpenPitCrossProject (
156+ SearchTask task ,
157+ OpenPointInTimeRequest request ,
158+ ActionListener <OpenPointInTimeResponse > listener
159+ ) {
160+ String [] indices = request .indices ();
161+ IndicesOptions originalIndicesOptions = request .indicesOptions ();
146162 // in CPS before executing the open pit request we need to get index resolution and possibly throw based on merged project view
147163 // rules. This should happen only if either ignore_unavailable or allow_no_indices is set to false (strict) if instead both are true
148164 // we can continue with the "normal" pit execution.
149- final boolean resolveCrossProject = crossProjectModeDecider .resolvesCrossProject (request );
150- String [] indices = request .indices ();
151- if (resolveCrossProject ) {
152- IndicesOptions originalIndicesOptions = request .indicesOptions ();
153- if (false == originalIndicesOptions .ignoreUnavailable () || false == originalIndicesOptions .allowNoIndices ()) {
154- final ResolvedIndexExpressions localResolvedIndexExpressions = request .getResolvedIndexExpressions ();
155- RemoteClusterService remoteClusterService = searchTransportService .getRemoteClusterService ();
156- final Map <String , OriginalIndices > indicesPerCluster = remoteClusterService .groupIndices (
157- indicesOptionsForCrossProjectFanout (originalIndicesOptions ),
158- indices
165+ if (originalIndicesOptions .ignoreUnavailable () && originalIndicesOptions .allowNoIndices ()) {
166+ // lenient indicesOptions thus execute standard pit
167+ executeOpenPit (task , request , listener );
168+ return ;
169+ }
170+ final ResolvedIndexExpressions localResolvedIndexExpressions = request .getResolvedIndexExpressions ();
171+ RemoteClusterService remoteClusterService = searchTransportService .getRemoteClusterService ();
172+ final Map <String , OriginalIndices > indicesPerCluster = remoteClusterService .groupIndices (
173+ indicesOptionsForCrossProjectFanout (originalIndicesOptions ),
174+ indices
175+ );
176+ // local indices resolution was already taken care of by the Security Action Filter
177+ indicesPerCluster .remove (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY );
178+
179+ if (indicesPerCluster .isEmpty ()) {
180+ // for CPS requests that are targeting origin only, could be because of project_routing or other reasons, execute standard pit.
181+ executeOpenPit (task , request , listener );
182+ return ;
183+ }
184+
185+ // CPS
186+ final int linkedProjectsToQuery = indicesPerCluster .size ();
187+ final Map <String , ResolveIndexAction .Response > remoteResponses = new ConcurrentHashMap <>(linkedProjectsToQuery );
188+ final CountDown completionCounter = new CountDown (linkedProjectsToQuery );
189+ final Runnable terminalHandler = () -> {
190+ if (completionCounter .countDown ()) {
191+ Map <String , ResolvedIndexExpressions > resolvedRemoteExpressions = remoteResponses .entrySet ()
192+ .stream ()
193+ .filter (e -> e .getValue ().getResolvedIndexExpressions () != null )
194+ .collect (
195+ Collectors .toMap (
196+ Map .Entry ::getKey ,
197+ e -> e .getValue ().getResolvedIndexExpressions ()
198+
199+ )
200+ );
201+
202+ final Exception ex = CrossProjectIndexResolutionValidator .validate (
203+ originalIndicesOptions ,
204+ request .getProjectRouting (),
205+ localResolvedIndexExpressions ,
206+ resolvedRemoteExpressions
159207 );
160- // local indices resolution was already taken care of by the Security Action Filter
161- indicesPerCluster .remove (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY );
162- if (false == indicesPerCluster .isEmpty ()) {
163- final int linkedProjectsToQuery = indicesPerCluster .size ();
164- final Map <String , ResolveIndexAction .Response > remoteResponses = new ConcurrentHashMap <>(linkedProjectsToQuery );
165- final CountDown completionCounter = new CountDown (linkedProjectsToQuery );
166- final Runnable terminalHandler = () -> {
167- if (completionCounter .countDown ()) {
168- Map <String , ResolvedIndexExpressions > resolvedRemoteExpressions = remoteResponses .entrySet ()
169- .stream ()
170- .filter (e -> e .getValue ().getResolvedIndexExpressions () != null )
171- .collect (
172- Collectors .toMap (
173- Map .Entry ::getKey ,
174- e -> e .getValue ().getResolvedIndexExpressions ()
175-
176- )
177- );
178-
179- final Exception ex = CrossProjectIndexResolutionValidator .validate (
180- originalIndicesOptions ,
181- request .getProjectRouting (),
182- localResolvedIndexExpressions ,
183- resolvedRemoteExpressions
208+ if (ex != null ) {
209+ listener .onFailure (ex );
210+ return ;
211+ }
212+ Set <String > collectedIndices = new HashSet <>(indices .length );
213+
214+ for (Map .Entry <String , ResolvedIndexExpressions > resolvedRemoteExpressionEntry : resolvedRemoteExpressions .entrySet ()) {
215+ String remoteAlias = resolvedRemoteExpressionEntry .getKey ();
216+ for (ResolvedIndexExpression expression : resolvedRemoteExpressionEntry .getValue ().expressions ()) {
217+ ResolvedIndexExpression .LocalExpressions oneRemoteExpression = expression .localExpressions ();
218+ if (false == oneRemoteExpression .indices ().isEmpty ()
219+ && oneRemoteExpression
220+ .localIndexResolutionResult () == ResolvedIndexExpression .LocalIndexResolutionResult .SUCCESS ) {
221+ collectedIndices .addAll (
222+ oneRemoteExpression .indices ()
223+ .stream ()
224+ .map (i -> buildRemoteIndexName (remoteAlias , i ))
225+ .collect (Collectors .toSet ())
184226 );
185- if (ex != null ) {
186- listener .onFailure (ex );
187- return ;
188- }
189- Set <String > collectedIndices = new HashSet <>(indices .length );
190-
191- for (Map .Entry <String , ResolvedIndexExpressions > resolvedRemoteExpressionEntry : resolvedRemoteExpressions
192- .entrySet ()) {
193- String remoteAlias = resolvedRemoteExpressionEntry .getKey ();
194- for (ResolvedIndexExpression expression : resolvedRemoteExpressionEntry .getValue ().expressions ()) {
195- ResolvedIndexExpression .LocalExpressions oneRemoteExpression = expression .localExpressions ();
196- if (false == oneRemoteExpression .indices ().isEmpty ()
197- && oneRemoteExpression
198- .localIndexResolutionResult () == ResolvedIndexExpression .LocalIndexResolutionResult .SUCCESS ) {
199- collectedIndices .addAll (
200- oneRemoteExpression .indices ()
201- .stream ()
202- .map (i -> buildRemoteIndexName (remoteAlias , i ))
203- .collect (Collectors .toSet ())
204- );
205- }
206- }
207- }
208- if (localResolvedIndexExpressions != null ) { // this should never be null in CPS
209- collectedIndices .addAll (localResolvedIndexExpressions .getLocalIndicesList ());
210- }
211- request .indices (collectedIndices .stream ().toArray (String []::new ));
212- executeOpenPit ((SearchTask ) task , request , listener );
213227 }
214- };
215-
216- // make CPS calls
217- for (Map .Entry <String , OriginalIndices > remoteClusterIndices : indicesPerCluster .entrySet ()) {
218- String clusterAlias = remoteClusterIndices .getKey ();
219- OriginalIndices originalIndices = remoteClusterIndices .getValue ();
220- // form indicesOptionsForCrossProjectFanout
221- IndicesOptions relaxedFanoutIndicesOptions = originalIndices .indicesOptions ();
222- var remoteClusterClient = remoteClusterService .getRemoteClusterClient (
223- clusterAlias ,
224- EsExecutors .DIRECT_EXECUTOR_SERVICE ,
225- RemoteClusterService .DisconnectedStrategy .FAIL_IF_DISCONNECTED
226- );
227- ResolveIndexAction .Request remoteRequest = new ResolveIndexAction .Request (
228- originalIndices .indices (),
229- relaxedFanoutIndicesOptions
230- );
231- remoteClusterClient .execute (ResolveIndexAction .REMOTE_TYPE , remoteRequest , ActionListener .wrap (response -> {
232- remoteResponses .put (clusterAlias , response );
233- terminalHandler .run ();
234- }, failure -> {
235- logger .info ("failed to resolve indices on remote cluster [" + clusterAlias + "]" , failure );
236- terminalHandler .run ();
237- }));
238228 }
239- } else {
240- // for CPS requests that are targeting origin only, could be because of project_routing or other reasons.
241- executeOpenPit ((SearchTask ) task , request , listener );
242229 }
230+ if (localResolvedIndexExpressions != null ) { // this should never be null in CPS
231+ collectedIndices .addAll (localResolvedIndexExpressions .getLocalIndicesList ());
232+ }
233+ request .indices (collectedIndices .toArray (String []::new ));
234+ executeOpenPit (task , request , listener );
243235 }
244- } else {
245- executeOpenPit ((SearchTask ) task , request , listener );
236+ };
237+
238+ // make CPS calls
239+ for (Map .Entry <String , OriginalIndices > remoteClusterIndices : indicesPerCluster .entrySet ()) {
240+ String clusterAlias = remoteClusterIndices .getKey ();
241+ OriginalIndices originalIndices = remoteClusterIndices .getValue ();
242+ // form indicesOptionsForCrossProjectFanout
243+ IndicesOptions relaxedFanoutIndicesOptions = originalIndices .indicesOptions ();
244+ var remoteClusterClient = remoteClusterService .getRemoteClusterClient (
245+ clusterAlias ,
246+ EsExecutors .DIRECT_EXECUTOR_SERVICE ,
247+ RemoteClusterService .DisconnectedStrategy .FAIL_IF_DISCONNECTED
248+ );
249+ ResolveIndexAction .Request remoteRequest = new ResolveIndexAction .Request (
250+ originalIndices .indices (),
251+ relaxedFanoutIndicesOptions
252+ );
253+ remoteClusterClient .execute (ResolveIndexAction .REMOTE_TYPE , remoteRequest , ActionListener .wrap (response -> {
254+ remoteResponses .put (clusterAlias , response );
255+ terminalHandler .run ();
256+ }, failure -> {
257+ logger .info ("failed to resolve indices on remote cluster [" + clusterAlias + "]" , failure );
258+ terminalHandler .run ();
259+ }));
260+
246261 }
247262 }
248263
0 commit comments