1111
1212import org .apache .logging .log4j .LogManager ;
1313import org .apache .logging .log4j .Logger ;
14+ import org .elasticsearch .action .ActionListener ;
1415import org .elasticsearch .action .ActionResponse ;
1516import org .elasticsearch .action .admin .cluster .reroute .ClusterRerouteUtils ;
1617import org .elasticsearch .action .admin .cluster .shards .ClusterSearchShardsGroup ;
2122import org .elasticsearch .action .search .OpenPointInTimeRequest ;
2223import org .elasticsearch .action .search .TransportClosePointInTimeAction ;
2324import org .elasticsearch .action .search .TransportOpenPointInTimeAction ;
25+ import org .elasticsearch .action .support .ActionFilters ;
26+ import org .elasticsearch .action .support .TransportAction ;
2427import org .elasticsearch .action .support .broadcast .BroadcastResponse ;
2528import org .elasticsearch .cluster .ClusterChangedEvent ;
2629import org .elasticsearch .cluster .ClusterState ;
3841import org .elasticsearch .common .settings .ClusterSettings ;
3942import org .elasticsearch .common .settings .Settings ;
4043import org .elasticsearch .common .util .CollectionUtils ;
44+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
4145import org .elasticsearch .core .Nullable ;
4246import org .elasticsearch .core .TimeValue ;
4347import org .elasticsearch .index .IndexService ;
4852import org .elasticsearch .index .engine .NoOpEngine ;
4953import org .elasticsearch .index .shard .IndexShard ;
5054import org .elasticsearch .indices .IndicesService ;
55+ import org .elasticsearch .indices .recovery .PeerRecoveryTargetService ;
56+ import org .elasticsearch .indices .recovery .RecoveryState ;
57+ import org .elasticsearch .indices .recovery .StatelessUnpromotableRelocationAction ;
58+ import org .elasticsearch .injection .guice .Inject ;
59+ import org .elasticsearch .plugins .ActionPlugin ;
5160import org .elasticsearch .plugins .ClusterPlugin ;
5261import org .elasticsearch .plugins .EnginePlugin ;
5362import org .elasticsearch .plugins .Plugin ;
5463import org .elasticsearch .plugins .PluginsService ;
5564import org .elasticsearch .search .builder .PointInTimeBuilder ;
5665import org .elasticsearch .snapshots .SnapshotState ;
66+ import org .elasticsearch .tasks .Task ;
5767import org .elasticsearch .test .ESIntegTestCase ;
5868import org .elasticsearch .test .XContentTestUtils ;
5969import org .elasticsearch .test .transport .MockTransportService ;
@@ -91,7 +101,7 @@ public class ShardRoutingRoleIT extends ESIntegTestCase {
91101
92102 private static final Logger logger = LogManager .getLogger (ShardRoutingRoleIT .class );
93103
94- public static class TestPlugin extends Plugin implements ClusterPlugin , EnginePlugin {
104+ public static class TestPlugin extends Plugin implements ClusterPlugin , EnginePlugin , ActionPlugin {
95105
96106 volatile int numIndexingCopies = 1 ;
97107 static final String NODE_ATTR_UNPROMOTABLE_ONLY = "unpromotableonly" ;
@@ -112,6 +122,61 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
112122 };
113123 }
114124
125+ // This is implemented in stateless, but for the tests we need to provide a simple implementation
126+ public static class TransportTestUnpromotableRelocationAction extends TransportAction <
127+ StatelessUnpromotableRelocationAction .Request ,
128+ ActionResponse .Empty > {
129+
130+ private final IndicesService indicesService ;
131+ private final PeerRecoveryTargetService peerRecoveryTargetService ;
132+
133+ @ Inject
134+ public TransportTestUnpromotableRelocationAction (
135+ ActionFilters actionFilters ,
136+ IndicesService indicesService ,
137+ PeerRecoveryTargetService peerRecoveryTargetService ,
138+ TransportService transportService
139+ ) {
140+ super (
141+ StatelessUnpromotableRelocationAction .TYPE .name (),
142+ actionFilters ,
143+ transportService .getTaskManager (),
144+ EsExecutors .DIRECT_EXECUTOR_SERVICE
145+ );
146+ this .indicesService = indicesService ;
147+ this .peerRecoveryTargetService = peerRecoveryTargetService ;
148+ }
149+
150+ @ Override
151+ protected void doExecute (
152+ Task task ,
153+ StatelessUnpromotableRelocationAction .Request request ,
154+ ActionListener <ActionResponse .Empty > listener
155+ ) {
156+ try (var recoveryRef = peerRecoveryTargetService .getRecoveryRef (request .getRecoveryId (), request .getShardId ())) {
157+ final var indexService = indicesService .indexServiceSafe (request .getShardId ().getIndex ());
158+ final var indexShard = indexService .getShard (request .getShardId ().id ());
159+ final var recoveryTarget = recoveryRef .target ();
160+ final var recoveryState = recoveryTarget .state ();
161+
162+ ActionListener .completeWith (listener , () -> {
163+ indexShard .prepareForIndexRecovery ();
164+ recoveryState .setStage (RecoveryState .Stage .VERIFY_INDEX );
165+ recoveryState .setStage (RecoveryState .Stage .TRANSLOG );
166+ indexShard .openEngineAndSkipTranslogRecovery ();
167+ recoveryState .getIndex ().setFileDetailsComplete ();
168+ recoveryState .setStage (RecoveryState .Stage .FINALIZE );
169+ return ActionResponse .Empty .INSTANCE ;
170+ });
171+ }
172+ }
173+ }
174+
175+ @ Override
176+ public Collection <ActionHandler > getActions () {
177+ return List .of (new ActionHandler (StatelessUnpromotableRelocationAction .TYPE , TransportTestUnpromotableRelocationAction .class ));
178+ }
179+
115180 @ Override
116181 public Collection <AllocationDecider > createAllocationDeciders (Settings settings , ClusterSettings clusterSettings ) {
117182 return List .of (new AllocationDecider () {
0 commit comments