1616import org .elasticsearch .TransportVersion ;
1717import org .elasticsearch .action .ActionListener ;
1818import org .elasticsearch .action .ActionListenerResponseHandler ;
19+ import org .elasticsearch .action .ActionResponse ;
1920import org .elasticsearch .action .ActionRunnable ;
2021import org .elasticsearch .action .ActionType ;
2122import org .elasticsearch .action .OriginalIndices ;
3334import org .elasticsearch .cluster .service .ClusterService ;
3435import org .elasticsearch .common .Strings ;
3536import org .elasticsearch .common .collect .Iterators ;
37+ import org .elasticsearch .common .io .stream .StreamInput ;
3638import org .elasticsearch .common .regex .Regex ;
3739import org .elasticsearch .common .util .Maps ;
3840import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
5860import org .elasticsearch .transport .TransportRequestOptions ;
5961import org .elasticsearch .transport .TransportService ;
6062
63+ import java .io .IOException ;
6164import java .util .ArrayList ;
6265import java .util .Arrays ;
6366import java .util .Collections ;
@@ -131,19 +134,54 @@ public TransportFieldCapabilitiesAction(
131134
132135 @ Override
133136 protected void doExecute (Task task , FieldCapabilitiesRequest request , final ActionListener <FieldCapabilitiesResponse > listener ) {
134- executeRequest (task , request , listener );
137+ executeRequest (task , request , new LinkedRequestExecutor <FieldCapabilitiesResponse >() {
138+ @ Override
139+ public void executeRemoteRequest (
140+ TransportService transportService ,
141+ Transport .Connection conn ,
142+ FieldCapabilitiesRequest remoteRequest ,
143+ ActionListenerResponseHandler <FieldCapabilitiesResponse > responseHandler
144+ ) {
145+ transportService .sendRequest (conn , REMOTE_TYPE .name (), remoteRequest , TransportRequestOptions .EMPTY , responseHandler );
146+ }
147+
148+ @ Override
149+ public FieldCapabilitiesResponse read (StreamInput in ) throws IOException {
150+ return new FieldCapabilitiesResponse (in );
151+ }
152+
153+ @ Override
154+ public FieldCapabilitiesResponse wrapPrimary (FieldCapabilitiesResponse primary ) {
155+ return primary ;
156+ }
157+
158+ @ Override
159+ public FieldCapabilitiesResponse unwrapPrimary (FieldCapabilitiesResponse fieldCapabilitiesResponse ) {
160+ return fieldCapabilitiesResponse ;
161+ }
162+ }, listener );
135163 }
136164
137- public void executeRequest (Task task , FieldCapabilitiesRequest request , ActionListener <FieldCapabilitiesResponse > listener ) {
165+ public <R extends ActionResponse > void executeRequest (
166+ Task task ,
167+ FieldCapabilitiesRequest request ,
168+ LinkedRequestExecutor <R > linkedRequestExecutor ,
169+ ActionListener <R > listener
170+ ) {
138171 // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
139- searchCoordinationExecutor .execute (ActionRunnable .wrap (listener , l -> doExecuteForked (task , request , l )));
172+ searchCoordinationExecutor .execute (ActionRunnable .wrap (listener , l -> doExecuteForked (task , request , linkedRequestExecutor , l )));
140173 }
141174
142- private void doExecuteForked (Task task , FieldCapabilitiesRequest request , ActionListener <FieldCapabilitiesResponse > listener ) {
175+ private <R extends ActionResponse > void doExecuteForked (
176+ Task task ,
177+ FieldCapabilitiesRequest request ,
178+ LinkedRequestExecutor <R > linkedRequestExecutor ,
179+ ActionListener <R > listener
180+ ) {
143181 if (ccsCheckCompatibility ) {
144182 checkCCSVersionCompatibility (request );
145183 }
146- final Executor singleThreadedExecutor = buildSingleThreadedExecutor (searchCoordinationExecutor , LOGGER );
184+ final Executor singleThreadedExecutor = buildSingleThreadedExecutor ();
147185 assert task instanceof CancellableTask ;
148186 final CancellableTask fieldCapTask = (CancellableTask ) task ;
149187 // retrieve the initial timestamp in case the action is a cross-cluster search
@@ -159,7 +197,11 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
159197 : Strings .EMPTY_ARRAY ;
160198
161199 if (concreteIndices .length == 0 && remoteClusterIndices .isEmpty ()) {
162- listener .onResponse (FieldCapabilitiesResponse .builder ().withMinTransportVersion (minTransportVersion .get ()).build ());
200+ listener .onResponse (
201+ linkedRequestExecutor .wrapPrimary (
202+ FieldCapabilitiesResponse .builder ().withMinTransportVersion (minTransportVersion .get ()).build ()
203+ )
204+ );
163205 return ;
164206 }
165207
@@ -235,7 +277,14 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
235277 if (fieldCapTask .notifyIfCancelled (listener )) {
236278 releaseResourcesOnCancel .run ();
237279 } else {
238- mergeIndexResponses (request , fieldCapTask , indexResponses , indexFailures , minTransportVersion , listener );
280+ mergeIndexResponses (
281+ request ,
282+ fieldCapTask ,
283+ indexResponses ,
284+ indexFailures ,
285+ minTransportVersion ,
286+ listener .map (linkedRequestExecutor ::wrapPrimary )
287+ );
239288 }
240289 })) {
241290 // local cluster
@@ -307,12 +356,15 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
307356 true ,
308357 ActionListener .releaseAfter (remoteListener , refs .acquire ())
309358 ).delegateFailure (
310- (responseListener , conn ) -> transportService .sendRequest (
359+ (responseListener , conn ) -> linkedRequestExecutor .executeRemoteRequest (
360+ transportService ,
311361 conn ,
312- REMOTE_TYPE .name (),
313362 remoteRequest ,
314- TransportRequestOptions .EMPTY ,
315- new ActionListenerResponseHandler <>(responseListener , FieldCapabilitiesResponse ::new , singleThreadedExecutor )
363+ new ActionListenerResponseHandler <>(
364+ responseListener ,
365+ in -> linkedRequestExecutor .unwrapPrimary (linkedRequestExecutor .read (in )),
366+ singleThreadedExecutor
367+ )
316368 )
317369 )
318370 );
@@ -325,7 +377,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
325377 }
326378 }
327379
328- public static Executor buildSingleThreadedExecutor (Executor searchCoordinationExecutor , Logger logger ) {
380+ private Executor buildSingleThreadedExecutor () {
329381 final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner ("field_caps" , 1 , searchCoordinationExecutor );
330382 return r -> throttledTaskRunner .enqueueTask (new ActionListener <>() {
331383 @ Override
@@ -348,7 +400,22 @@ public void onFailure(Exception e) {
348400 });
349401 }
350402
351- public static void checkIndexBlocks (ProjectState projectState , String [] concreteIndices ) {
403+ public interface LinkedRequestExecutor <R extends ActionResponse > {
404+ void executeRemoteRequest (
405+ TransportService transportService ,
406+ Transport .Connection conn ,
407+ FieldCapabilitiesRequest remoteRequest ,
408+ ActionListenerResponseHandler <FieldCapabilitiesResponse > responseHandler
409+ );
410+
411+ R read (StreamInput in ) throws IOException ;
412+
413+ R wrapPrimary (FieldCapabilitiesResponse primary );
414+
415+ FieldCapabilitiesResponse unwrapPrimary (R r );
416+ }
417+
418+ private static void checkIndexBlocks (ProjectState projectState , String [] concreteIndices ) {
352419 var blocks = projectState .blocks ();
353420 var projectId = projectState .projectId ();
354421 if (blocks .global (projectId ).isEmpty () && blocks .indices (projectId ).isEmpty ()) {
@@ -404,7 +471,7 @@ private static void mergeIndexResponses(
404471 }
405472 }
406473
407- public static FieldCapabilitiesRequest prepareRemoteRequest (
474+ private static FieldCapabilitiesRequest prepareRemoteRequest (
408475 String clusterAlias ,
409476 FieldCapabilitiesRequest request ,
410477 OriginalIndices originalIndices ,
@@ -610,10 +677,10 @@ private static void innerMerge(
610677 * This collector can contain a failure for an index even if one of its shards was successful. When building the final
611678 * list, these failures will be skipped because they have no affect on the final response.
612679 */
613- public static final class FailureCollector {
680+ private static final class FailureCollector {
614681 private final Map <String , Exception > failuresByIndex = new HashMap <>();
615682
616- public List <FieldCapabilitiesFailure > build (Set <String > successfulIndices ) {
683+ List <FieldCapabilitiesFailure > build (Set <String > successfulIndices ) {
617684 Map <Tuple <String , String >, FieldCapabilitiesFailure > indexFailures = new HashMap <>();
618685 for (Map .Entry <String , Exception > failure : failuresByIndex .entrySet ()) {
619686 String index = failure .getKey ();
@@ -642,15 +709,15 @@ public List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
642709 return new ArrayList <>(indexFailures .values ());
643710 }
644711
645- public void collect (String index , Exception e ) {
712+ void collect (String index , Exception e ) {
646713 failuresByIndex .putIfAbsent (index , e );
647714 }
648715
649- public void clear () {
716+ void clear () {
650717 failuresByIndex .clear ();
651718 }
652719
653- public boolean isEmpty () {
720+ boolean isEmpty () {
654721 return failuresByIndex .isEmpty ();
655722 }
656723 }
@@ -713,8 +780,8 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
713780 }
714781 }
715782
716- public static class ForkingOnFailureActionListener <Response > extends AbstractThreadedActionListener <Response > {
717- public ForkingOnFailureActionListener (Executor executor , boolean forceExecution , ActionListener <Response > delegate ) {
783+ private static class ForkingOnFailureActionListener <Response > extends AbstractThreadedActionListener <Response > {
784+ ForkingOnFailureActionListener (Executor executor , boolean forceExecution , ActionListener <Response > delegate ) {
718785 super (executor , forceExecution , delegate );
719786 }
720787
0 commit comments