|
75 | 75 | import org.elasticsearch.core.AbstractRefCounted; |
76 | 76 | import org.elasticsearch.core.CheckedConsumer; |
77 | 77 | import org.elasticsearch.core.CheckedFunction; |
78 | | -import org.elasticsearch.core.CheckedRunnable; |
79 | 78 | import org.elasticsearch.core.IOUtils; |
80 | 79 | import org.elasticsearch.core.Nullable; |
81 | 80 | import org.elasticsearch.core.Releasable; |
@@ -970,28 +969,30 @@ public void createShard( |
970 | 969 | RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); |
971 | 970 | IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); |
972 | 971 | indexShard.addShardFailureCallback(onShardFailure); |
973 | | - final CheckedRunnable<RuntimeException> recoveryRunnable = () -> indexShard.startRecovery( |
974 | | - recoveryState, |
975 | | - recoveryTargetService, |
976 | | - postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), |
977 | | - repositoriesService, |
978 | | - (mapping, listener) -> { |
979 | | - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS |
980 | | - : "mapping update consumer only required by local shards recovery"; |
981 | | - AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest() |
982 | | - // concrete index - no name clash, it uses uuid |
983 | | - .setConcreteIndex(shardRouting.index()) |
984 | | - .source(mapping.source().string(), XContentType.JSON); |
985 | | - client.execute( |
986 | | - TransportAutoPutMappingAction.TYPE, |
987 | | - putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), |
988 | | - new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) |
989 | | - ); |
990 | | - }, |
991 | | - this, |
992 | | - clusterStateVersion |
| 972 | + projectResolver.executeOnProject( |
| 973 | + projectId, |
| 974 | + () -> indexShard.startRecovery( |
| 975 | + recoveryState, |
| 976 | + recoveryTargetService, |
| 977 | + postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), |
| 978 | + repositoriesService, |
| 979 | + (mapping, listener) -> { |
| 980 | + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS |
| 981 | + : "mapping update consumer only required by local shards recovery"; |
| 982 | + AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest() |
| 983 | + // concrete index - no name clash, it uses uuid |
| 984 | + .setConcreteIndex(shardRouting.index()) |
| 985 | + .source(mapping.source().string(), XContentType.JSON); |
| 986 | + client.execute( |
| 987 | + TransportAutoPutMappingAction.TYPE, |
| 988 | + putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), |
| 989 | + new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) |
| 990 | + ); |
| 991 | + }, |
| 992 | + this, |
| 993 | + clusterStateVersion |
| 994 | + ) |
993 | 995 | ); |
994 | | - projectResolver.executeOnProject(projectId, recoveryRunnable); |
995 | 996 | } |
996 | 997 |
|
997 | 998 | @Override |
|
0 commit comments