3535import org .elasticsearch .cluster .action .shard .ShardStateAction ;
3636import org .elasticsearch .cluster .metadata .ProjectMetadata ;
3737import org .elasticsearch .cluster .project .ProjectResolver ;
38- import org .elasticsearch .cluster .routing .IndexRouting ;
3938import org .elasticsearch .cluster .service .ClusterService ;
4039import org .elasticsearch .common .bytes .BytesReference ;
4140import org .elasticsearch .common .compress .CompressedXContent ;
4645import org .elasticsearch .core .Strings ;
4746import org .elasticsearch .core .TimeValue ;
4847import org .elasticsearch .core .Tuple ;
49- import org .elasticsearch .index .Index ;
5048import org .elasticsearch .index .IndexingPressure ;
5149import org .elasticsearch .index .engine .Engine ;
5250import org .elasticsearch .index .engine .VersionConflictEngineException ;
7472import org .elasticsearch .xcontent .XContentType ;
7573
7674import java .io .IOException ;
77- import java .util .ArrayList ;
78- import java .util .HashMap ;
79- import java .util .List ;
8075import java .util .Map ;
8176import java .util .concurrent .Executor ;
8277import java .util .concurrent .TimeUnit ;
@@ -175,56 +170,11 @@ protected void shardOperationOnPrimary(
175170 @ Override
176171 protected Map <ShardId , BulkShardRequest > splitRequestOnPrimary (BulkShardRequest request ) {
177172 // TODO Needed right now for not in primary mode on the target. Need to make sure we handle that with retries.
178- LockSupport .parkNanos (TimeUnit .MILLISECONDS .toNanos (100 ));
179- final ShardId sourceShardId = request .shardId ();
180- final Index index = sourceShardId .getIndex ();
181-
173+ // LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
182174 ClusterState clusterState = clusterService .state ();
183175 ProjectMetadata project = projectResolver .getProjectMetadata (clusterState );
184176
185- IndexRouting indexRouting = IndexRouting .fromIndexMetadata (project .getIndexSafe (index ));
186- Map <ShardId , List <BulkItemRequest >> requestsByShard = new HashMap <>();
187- Map <ShardId , BulkShardRequest > bulkRequestsPerShard = new HashMap <>();
188-
189- // Iterate through the items in the input request and split them based on the
190- // current resharding-split state.
191- BulkItemRequest [] items = request .items ();
192- if (items .length == 0 ) { // Nothing to split
193- return Map .of (sourceShardId , request );
194- }
195-
196- for (int i = 0 ; i < items .length ; i ++) {
197- BulkItemRequest bulkItemRequest = items [i ];
198- DocWriteRequest <?> docWriteRequest = bulkItemRequest .request ();
199- int newShardId = docWriteRequest .rerouteAtSourceDuringResharding (indexRouting );
200- List <BulkItemRequest > shardRequests = requestsByShard .computeIfAbsent (
201- new ShardId (index , newShardId ),
202- shardNum -> new ArrayList <>()
203- );
204- shardRequests .add (new BulkItemRequest (bulkItemRequest .id (), bulkItemRequest .request ()));
205- }
206-
207- // All items belong to either the source shard or target shard.
208- if (requestsByShard .size () == 1 ) {
209- Map .Entry <ShardId , List <BulkItemRequest >> entry = requestsByShard .entrySet ().iterator ().next ();
210- // Return the original request if no items were split to target.
211- if (entry .getKey ().equals (sourceShardId )) {
212- return Map .of (sourceShardId , request );
213- }
214- }
215-
216- for (Map .Entry <ShardId , List <BulkItemRequest >> entry : requestsByShard .entrySet ()) {
217- final ShardId shardId = entry .getKey ();
218- final List <BulkItemRequest > requests = entry .getValue ();
219- BulkShardRequest bulkShardRequest = new BulkShardRequest (
220- shardId ,
221- request .getRefreshPolicy (),
222- requests .toArray (new BulkItemRequest [0 ]),
223- request .isSimulated ()
224- );
225- bulkRequestsPerShard .put (shardId , bulkShardRequest );
226- }
227- return bulkRequestsPerShard ;
177+ return ShardBulkSplitHelper .splitRequests (request , project );
228178 }
229179
230180 @ Override
@@ -233,27 +183,7 @@ protected Tuple<BulkShardResponse, Exception> combineSplitResponses(
233183 Map <ShardId , BulkShardRequest > splitRequests ,
234184 Map <ShardId , Tuple <BulkShardResponse , Exception >> responses
235185 ) {
236- BulkItemResponse [] bulkItemResponses = new BulkItemResponse [originalRequest .items ().length ];
237- for (Map .Entry <ShardId , Tuple <BulkShardResponse , Exception >> entry : responses .entrySet ()) {
238- ShardId shardId = entry .getKey ();
239- Tuple <BulkShardResponse , Exception > value = entry .getValue ();
240- Exception exception = value .v2 ();
241- if (exception != null ) {
242- BulkShardRequest bulkShardRequest = splitRequests .get (shardId );
243- for (BulkItemRequest item : bulkShardRequest .items ()) {
244- DocWriteRequest <?> request = item .request ();
245- BulkItemResponse .Failure failure = new BulkItemResponse .Failure (item .index (), request .id (), exception );
246- bulkItemResponses [item .id ()] = BulkItemResponse .failure (item .id (), request .opType (), failure );
247- }
248- } else {
249- for (BulkItemResponse bulkItemResponse : value .v1 ().getResponses ()) {
250- bulkItemResponses [bulkItemResponse .getItemId ()] = bulkItemResponse ;
251- }
252- }
253- }
254- BulkShardResponse bulkShardResponse = new BulkShardResponse (originalRequest .shardId (), bulkItemResponses );
255- bulkShardResponse .setShardInfo (responses .get (originalRequest .shardId ()).v1 ().getShardInfo ());
256- return new Tuple <>(bulkShardResponse , null );
186+ return ShardBulkSplitHelper .combineResponses (originalRequest , splitRequests , responses );
257187 }
258188
259189 @ Override
0 commit comments