1919import org .elasticsearch .action .ActionType ;
2020import org .elasticsearch .action .IndicesRequest ;
2121import org .elasticsearch .action .OriginalIndices ;
22+ import org .elasticsearch .action .ResolvedIndexExpression ;
23+ import org .elasticsearch .action .ResolvedIndexExpressions ;
24+ import org .elasticsearch .action .admin .indices .resolve .ResolveIndexAction ;
2225import org .elasticsearch .action .support .ActionFilters ;
2326import org .elasticsearch .action .support .ChannelActionListener ;
27+ import org .elasticsearch .action .support .GroupedActionListener ;
2428import org .elasticsearch .action .support .HandledTransportAction ;
2529import org .elasticsearch .action .support .IndicesOptions ;
30+ import org .elasticsearch .action .support .SubscribableListener ;
2631import org .elasticsearch .cluster .ClusterState ;
2732import org .elasticsearch .cluster .service .ClusterService ;
2833import org .elasticsearch .common .bytes .BytesReference ;
3843import org .elasticsearch .search .SearchPhaseResult ;
3944import org .elasticsearch .search .SearchService ;
4045import org .elasticsearch .search .builder .SearchSourceBuilder ;
46+ import org .elasticsearch .search .crossproject .CrossProjectIndexResolutionValidator ;
47+ import org .elasticsearch .search .crossproject .CrossProjectModeDecider ;
4148import org .elasticsearch .search .internal .AliasFilter ;
4249import org .elasticsearch .search .internal .ShardSearchContextId ;
4350import org .elasticsearch .tasks .Task ;
4451import org .elasticsearch .threadpool .ThreadPool ;
4552import org .elasticsearch .transport .AbstractTransportRequest ;
53+ import org .elasticsearch .transport .RemoteClusterAware ;
54+ import org .elasticsearch .transport .RemoteClusterService ;
4655import org .elasticsearch .transport .Transport ;
4756import org .elasticsearch .transport .TransportActionProxy ;
4857import org .elasticsearch .transport .TransportChannel ;
4958import org .elasticsearch .transport .TransportRequestHandler ;
59+ import org .elasticsearch .transport .TransportRequestOptions ;
5060import org .elasticsearch .transport .TransportResponseHandler ;
5161import org .elasticsearch .transport .TransportService ;
5262
5363import java .io .IOException ;
64+ import java .util .Collection ;
65+ import java .util .HashSet ;
5466import java .util .List ;
5567import java .util .Map ;
68+ import java .util .Set ;
5669import java .util .concurrent .Executor ;
5770import java .util .function .BiFunction ;
71+ import java .util .stream .Collectors ;
5872
5973import static org .elasticsearch .core .Strings .format ;
74+ import static org .elasticsearch .search .crossproject .CrossProjectIndexResolutionValidator .indicesOptionsForCrossProjectFanout ;
75+ import static org .elasticsearch .transport .RemoteClusterAware .buildRemoteIndexName ;
6076
6177public class TransportOpenPointInTimeAction extends HandledTransportAction <OpenPointInTimeRequest , OpenPointInTimeResponse > {
6278
@@ -72,6 +88,8 @@ public class TransportOpenPointInTimeAction extends HandledTransportAction<OpenP
7288 private final SearchService searchService ;
7389 private final ClusterService clusterService ;
7490 private final SearchResponseMetrics searchResponseMetrics ;
91+ private final CrossProjectModeDecider crossProjectModeDecider ;
92+ private final TimeValue forceConnectTimeoutSecs ;
7593
7694 @ Inject
7795 public TransportOpenPointInTimeAction (
@@ -92,6 +110,9 @@ public TransportOpenPointInTimeAction(
92110 this .namedWriteableRegistry = namedWriteableRegistry ;
93111 this .clusterService = clusterService ;
94112 this .searchResponseMetrics = searchResponseMetrics ;
113+ this .crossProjectModeDecider = new CrossProjectModeDecider (clusterService .getSettings ());
114+ this .forceConnectTimeoutSecs = clusterService .getSettings ()
115+ .getAsTime ("search.ccs.force_connect_timeout" , TimeValue .timeValueSeconds (3L ));
95116 transportService .registerRequestHandler (
96117 OPEN_SHARD_READER_CONTEXT_NAME ,
97118 EsExecutors .DIRECT_EXECUTOR_SERVICE ,
@@ -123,6 +144,146 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
123144 );
124145 return ;
125146 }
147+
148+ final boolean resolveCrossProject = crossProjectModeDecider .resolvesCrossProject (request );
149+ if (resolveCrossProject ) {
150+ executeOpenPitCrossProject ((SearchTask ) task , request , listener );
151+ } else {
152+ executeOpenPit ((SearchTask ) task , request , listener );
153+ }
154+ }
155+
156+ private void executeOpenPitCrossProject (
157+ SearchTask task ,
158+ OpenPointInTimeRequest request ,
159+ ActionListener <OpenPointInTimeResponse > listener
160+ ) {
161+ String [] indices = request .indices ();
162+ IndicesOptions originalIndicesOptions = request .indicesOptions ();
163+ // in CPS before executing the open pit request we need to get index resolution and possibly throw based on merged project view
164+ // rules. This should happen only if either ignore_unavailable or allow_no_indices is set to false (strict).
165+ // If instead both are true we can continue with the "normal" pit execution.
166+ if (originalIndicesOptions .ignoreUnavailable () && originalIndicesOptions .allowNoIndices ()) {
167+ // lenient indicesOptions thus execute standard pit
168+ executeOpenPit (task , request , listener );
169+ return ;
170+ }
171+
172+ // ResolvedIndexExpression for the origin cluster (only) as determined by the Security Action Filter
173+ final ResolvedIndexExpressions localResolvedIndexExpressions = request .getResolvedIndexExpressions ();
174+
175+ RemoteClusterService remoteClusterService = searchTransportService .getRemoteClusterService ();
176+ final Map <String , OriginalIndices > indicesPerCluster = remoteClusterService .groupIndices (
177+ indicesOptionsForCrossProjectFanout (originalIndicesOptions ),
178+ indices
179+ );
180+ // local indices resolution was already taken care of by the Security Action Filter
181+ indicesPerCluster .remove (RemoteClusterAware .LOCAL_CLUSTER_GROUP_KEY );
182+
183+ if (indicesPerCluster .isEmpty ()) {
184+ // for CPS requests that are targeting origin only, could be because of project_routing or other reasons, execute standard pit.
185+ final Exception ex = CrossProjectIndexResolutionValidator .validate (
186+ originalIndicesOptions ,
187+ request .getProjectRouting (),
188+ localResolvedIndexExpressions ,
189+ Map .of ()
190+ );
191+ if (ex != null ) {
192+ listener .onFailure (ex );
193+ return ;
194+ }
195+ executeOpenPit (task , request , listener );
196+ return ;
197+ }
198+
199+ // CPS
200+ final int linkedProjectsToQuery = indicesPerCluster .size ();
201+ ActionListener <Collection <Map .Entry <String , ResolveIndexAction .Response >>> responsesListener = listener .delegateFailureAndWrap (
202+ (l , responses ) -> {
203+ Map <String , ResolvedIndexExpressions > resolvedRemoteExpressions = responses .stream ()
204+ .filter (e -> e .getValue ().getResolvedIndexExpressions () != null )
205+ .collect (
206+ Collectors .toMap (
207+ Map .Entry ::getKey ,
208+ e -> e .getValue ().getResolvedIndexExpressions ()
209+
210+ )
211+ );
212+ final Exception ex = CrossProjectIndexResolutionValidator .validate (
213+ originalIndicesOptions ,
214+ request .getProjectRouting (),
215+ localResolvedIndexExpressions ,
216+ resolvedRemoteExpressions
217+ );
218+ if (ex != null ) {
219+ listener .onFailure (ex );
220+ return ;
221+ }
222+ Set <String > collectedIndices = new HashSet <>(indices .length );
223+
224+ for (Map .Entry <String , ResolvedIndexExpressions > resolvedRemoteExpressionEntry : resolvedRemoteExpressions .entrySet ()) {
225+ String remoteAlias = resolvedRemoteExpressionEntry .getKey ();
226+ for (ResolvedIndexExpression expression : resolvedRemoteExpressionEntry .getValue ().expressions ()) {
227+ ResolvedIndexExpression .LocalExpressions oneRemoteExpression = expression .localExpressions ();
228+ if (false == oneRemoteExpression .indices ().isEmpty ()
229+ && oneRemoteExpression
230+ .localIndexResolutionResult () == ResolvedIndexExpression .LocalIndexResolutionResult .SUCCESS ) {
231+ collectedIndices .addAll (
232+ oneRemoteExpression .indices ()
233+ .stream ()
234+ .map (i -> buildRemoteIndexName (remoteAlias , i ))
235+ .collect (Collectors .toSet ())
236+ );
237+ }
238+ }
239+ }
240+ if (localResolvedIndexExpressions != null ) { // this should never be null in CPS
241+ collectedIndices .addAll (localResolvedIndexExpressions .getLocalIndicesList ());
242+ }
243+ request .indices (collectedIndices .toArray (String []::new ));
244+ executeOpenPit (task , request , listener );
245+ }
246+ );
247+ ActionListener <Map .Entry <String , ResolveIndexAction .Response >> groupedListener = new GroupedActionListener <>(
248+ linkedProjectsToQuery ,
249+ responsesListener
250+ );
251+
252+ // make CPS calls
253+ for (Map .Entry <String , OriginalIndices > remoteClusterIndices : indicesPerCluster .entrySet ()) {
254+ String clusterAlias = remoteClusterIndices .getKey ();
255+ OriginalIndices originalIndices = remoteClusterIndices .getValue ();
256+ IndicesOptions relaxedFanoutIdxOptions = originalIndices .indicesOptions (); // from indicesOptionsForCrossProjectFanout
257+ ResolveIndexAction .Request remoteRequest = new ResolveIndexAction .Request (originalIndices .indices (), relaxedFanoutIdxOptions );
258+
259+ SubscribableListener <Transport .Connection > connectionListener = new SubscribableListener <>();
260+ connectionListener .addTimeout (forceConnectTimeoutSecs , transportService .getThreadPool (), EsExecutors .DIRECT_EXECUTOR_SERVICE );
261+
262+ connectionListener .addListener (groupedListener .delegateResponse ((l , failure ) -> {
263+ logger .info ("failed to resolve indices on remote cluster [" + clusterAlias + "]" , failure );
264+ l .onFailure (failure );
265+ })
266+ .delegateFailure (
267+ (ignored , connection ) -> transportService .sendRequest (
268+ connection ,
269+ ResolveIndexAction .REMOTE_TYPE .name (),
270+ remoteRequest ,
271+ TransportRequestOptions .EMPTY ,
272+ new ActionListenerResponseHandler <>(groupedListener .delegateResponse ((l , failure ) -> {
273+ logger .info ("Error occurred on remote cluster [" + clusterAlias + "]" , failure );
274+ l .onFailure (failure );
275+ }).map (resolveIndexResponse -> Map .entry (clusterAlias , resolveIndexResponse )),
276+ ResolveIndexAction .Response ::new ,
277+ EsExecutors .DIRECT_EXECUTOR_SERVICE
278+ )
279+ )
280+ ));
281+
282+ remoteClusterService .maybeEnsureConnectedAndGetConnection (clusterAlias , true , connectionListener );
283+ }
284+ }
285+
286+ private void executeOpenPit (SearchTask task , OpenPointInTimeRequest request , ActionListener <OpenPointInTimeResponse > listener ) {
126287 final SearchRequest searchRequest = new SearchRequest ().indices (request .indices ())
127288 .indicesOptions (request .indicesOptions ())
128289 .preference (request .preference ())
@@ -132,7 +293,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
132293 searchRequest .setMaxConcurrentShardRequests (request .maxConcurrentShardRequests ());
133294 searchRequest .setCcsMinimizeRoundtrips (false );
134295
135- transportSearchAction .executeOpenPit (( SearchTask ) task , searchRequest , listener .map (r -> {
296+ transportSearchAction .executeOpenPit (task , searchRequest , listener .map (r -> {
136297 assert r .pointInTimeId () != null : r ;
137298 return new OpenPointInTimeResponse (
138299 r .pointInTimeId (),
0 commit comments