Skip to content

Commit 9946884

Browse files
committed
Change
1 parent c14623a commit 9946884

File tree

4 files changed

+85
-103
lines changed

4 files changed

+85
-103
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@
7575

7676
import java.io.IOException;
7777
import java.util.ArrayList;
78-
import java.util.Collection;
79-
import java.util.Collections;
8078
import java.util.HashMap;
8179
import java.util.List;
8280
import java.util.Map;
8381
import java.util.concurrent.Executor;
82+
import java.util.concurrent.TimeUnit;
83+
import java.util.concurrent.locks.LockSupport;
8484
import java.util.function.Consumer;
8585
import java.util.function.LongSupplier;
8686
import java.util.function.ObjLongConsumer;
@@ -174,6 +174,8 @@ protected void shardOperationOnPrimary(
174174

175175
@Override
176176
protected Map<ShardId, BulkShardRequest> splitRequestOnPrimary(BulkShardRequest request) {
177+
// TODO Needed right now for not in primary mode on the target. Need to make sure we handle that with retries.
178+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
177179
final ShardId sourceShardId = request.shardId();
178180
final Index index = sourceShardId.getIndex();
179181

@@ -249,7 +251,9 @@ protected Tuple<BulkShardResponse, Exception> combineSplitResponses(
249251
}
250252
}
251253
}
252-
return new Tuple<>(new BulkShardResponse(originalRequest.shardId(), bulkItemResponses), null);
254+
BulkShardResponse bulkShardResponse = new BulkShardResponse(originalRequest.shardId(), bulkItemResponses);
255+
bulkShardResponse.setShardInfo(responses.get(originalRequest.shardId()).v1().getShardInfo());
256+
return new Tuple<>(bulkShardResponse, null);
253257
}
254258

255259
@Override

server/src/main/java/org/elasticsearch/action/support/master/TermOverridingMasterNodeRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* which it may be necessary to use the test utility {@code MasterNodeRequestHelper#unwrapTermOverride} to remove the wrapper and access the
3434
* inner request.
3535
*/
36-
class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
36+
public class TermOverridingMasterNodeRequest extends AbstractTransportRequest {
3737

3838
private static final Logger logger = LogManager.getLogger(TermOverridingMasterNodeRequest.class);
3939

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,9 @@
1616
import org.elasticsearch.action.ActionListenerResponseHandler;
1717
import org.elasticsearch.action.ActionResponse;
1818
import org.elasticsearch.action.UnavailableShardsException;
19-
import org.elasticsearch.action.bulk.BulkShardRequest;
20-
import org.elasticsearch.action.bulk.BulkShardResponse;
2119
import org.elasticsearch.action.support.ActionFilters;
2220
import org.elasticsearch.action.support.ActiveShardCount;
2321
import org.elasticsearch.action.support.ChannelActionListener;
24-
import org.elasticsearch.action.support.CountDownActionListener;
25-
import org.elasticsearch.action.support.GroupedActionListener;
2622
import org.elasticsearch.action.support.TransportAction;
2723
import org.elasticsearch.action.support.TransportActions;
2824
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
@@ -81,16 +77,13 @@
8177
import org.elasticsearch.transport.TransportService;
8278

8379
import java.io.IOException;
84-
import java.util.ArrayList;
8580
import java.util.Collections;
8681
import java.util.Map;
8782
import java.util.Objects;
8883
import java.util.Optional;
8984
import java.util.concurrent.ConcurrentHashMap;
9085
import java.util.concurrent.Executor;
9186
import java.util.concurrent.atomic.AtomicBoolean;
92-
import java.util.concurrent.atomic.AtomicReference;
93-
import java.util.concurrent.atomic.AtomicReferenceArray;
9487

9588
import static org.elasticsearch.core.Strings.format;
9689

@@ -498,10 +491,6 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
498491
}
499492

500493
SplitShardCountSummary reshardSplitShardCountSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
501-
assert reshardSplitShardCountSummary.isUnset()
502-
|| reshardSplitShardCountSummary.equals(
503-
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
504-
);
505494
if (primaryShardReference.isRelocated()) {
506495
primaryShardReference.close(); // release shard operation lock as soon as possible
507496
setPhase(replicationTask, "primary_delegation");
@@ -511,7 +500,31 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
511500
final ShardRouting primary = primaryShardReference.routingEntry();
512501
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
513502
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
514-
delegate(relocatingNode, primary.allocationId().getRelocationId(), onCompletionListener);
503+
String allocationID = primary.allocationId().getRelocationId();
504+
transportService.sendRequest(
505+
relocatingNode,
506+
transportPrimaryAction,
507+
new ConcreteShardRequest<>(primaryRequest.getRequest(), allocationID, primaryRequest.getPrimaryTerm()),
508+
transportOptions,
509+
new ActionListenerResponseHandler<>(
510+
onCompletionListener,
511+
TransportReplicationAction.this::newResponseInstance,
512+
TransportResponseHandler.TRANSPORT_WORKER
513+
) {
514+
515+
@Override
516+
public void handleResponse(Response response) {
517+
setPhase(replicationTask, "finished");
518+
super.handleResponse(response);
519+
}
520+
521+
@Override
522+
public void handleException(TransportException exp) {
523+
setPhase(replicationTask, "finished");
524+
super.handleException(exp);
525+
}
526+
}
527+
);
515528
} else if (reshardSplitShardCountSummary.isUnset()
516529
|| reshardSplitShardCountSummary.equals(
517530
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
@@ -541,7 +554,34 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
541554
final ShardRouting target = targetShard.routingEntry();
542555
DiscoveryNode targetNode = clusterState.nodes().get(target.currentNodeId());
543556
String allocationID = target.allocationId().getId();
544-
delegate(targetNode, allocationID, onCompletionListener);
557+
transportService.sendRequest(
558+
targetNode,
559+
transportPrimaryAction,
560+
new ConcreteShardRequest<>(
561+
primaryRequest.getRequest(),
562+
allocationID,
563+
indexMetadata.primaryTerm(targetShardId.id())
564+
),
565+
transportOptions,
566+
new ActionListenerResponseHandler<>(
567+
onCompletionListener,
568+
TransportReplicationAction.this::newResponseInstance,
569+
TransportResponseHandler.TRANSPORT_WORKER
570+
) {
571+
572+
@Override
573+
public void handleResponse(Response response) {
574+
setPhase(replicationTask, "finished");
575+
super.handleResponse(response);
576+
}
577+
578+
@Override
579+
public void handleException(TransportException exp) {
580+
setPhase(replicationTask, "finished");
581+
super.handleException(exp);
582+
}
583+
}
584+
);
545585
}
546586
} else {
547587
Map<ShardId, Tuple<Response, Exception>> results = new ConcurrentHashMap<>(splitRequests.size());
@@ -584,7 +624,30 @@ private void finish() {
584624
final ShardRouting target = targetShard.routingEntry();
585625
DiscoveryNode targetNode = clusterState.nodes().get(target.currentNodeId());
586626
String allocationID = target.allocationId().getId();
587-
delegate(targetNode, allocationID, listener);
627+
transportService.sendRequest(
628+
targetNode,
629+
transportPrimaryAction,
630+
new ConcreteShardRequest<>(splitRequest.getValue(), allocationID, primaryRequest.getPrimaryTerm()),
631+
transportOptions,
632+
new ActionListenerResponseHandler<>(
633+
listener,
634+
TransportReplicationAction.this::newResponseInstance,
635+
TransportResponseHandler.TRANSPORT_WORKER
636+
) {
637+
638+
@Override
639+
public void handleResponse(Response response) {
640+
setPhase(replicationTask, "finished");
641+
super.handleResponse(response);
642+
}
643+
644+
@Override
645+
public void handleException(TransportException exp) {
646+
setPhase(replicationTask, "finished");
647+
super.handleException(exp);
648+
}
649+
}
650+
);
588651
}
589652
}
590653
}
@@ -597,33 +660,6 @@ private void finish() {
597660
}
598661
}
599662

600-
private void delegate(DiscoveryNode targetNode, String allocationID, ActionListener<Response> listener) {
601-
transportService.sendRequest(
602-
targetNode,
603-
transportPrimaryAction,
604-
new ConcreteShardRequest<>(primaryRequest.getRequest(), allocationID, primaryRequest.getPrimaryTerm()),
605-
transportOptions,
606-
new ActionListenerResponseHandler<>(
607-
listener,
608-
TransportReplicationAction.this::newResponseInstance,
609-
TransportResponseHandler.TRANSPORT_WORKER
610-
) {
611-
612-
@Override
613-
public void handleResponse(Response response) {
614-
setPhase(replicationTask, "finished");
615-
super.handleResponse(response);
616-
}
617-
618-
@Override
619-
public void handleException(TransportException exp) {
620-
setPhase(replicationTask, "finished");
621-
super.handleException(exp);
622-
}
623-
}
624-
);
625-
}
626-
627663
private void executePrimaryRequest(
628664
final PrimaryShardReference primaryShardReference,
629665
String phase,

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,6 @@ private static int effectiveRoutingToHash(String effectiveRouting) {
153153
*/
154154
public void checkIndexSplitAllowed() {}
155155

156-
public abstract int rerouteIndexingRequestIfResharding(IndexRequest indexRequest);
157-
158-
public abstract int rerouteDeleteRequestIfResharding(String id, @Nullable String routing);
159-
160-
public abstract int rerouteUpdateRequestIfResharding(String id, @Nullable String routing);
161-
162156
/**
163157
* If this index is in the process of resharding, and the shard to which this request is being routed,
164158
* is a target shard that is not yet in HANDOFF state, then route it to the source shard.
@@ -240,46 +234,20 @@ public int rerouteToTarget(IndexRequest indexRequest) {
240234
return indexShard(indexRequest);
241235
}
242236

243-
@Override
244-
public int rerouteIndexingRequestIfResharding(IndexRequest indexRequest) {
245-
String id = indexRequest.id();
246-
String routing = indexRequest.routing();
247-
if (id == null) {
248-
throw new IllegalStateException("id is required and should have been set by process");
249-
}
250-
checkRoutingRequired(id, routing);
251-
int shardId = shardId(id, routing);
252-
return rerouteWritesIfResharding(shardId);
253-
}
254-
255237
@Override
256238
public int updateShard(String id, @Nullable String routing) {
257239
checkRoutingRequired(id, routing);
258240
int shardId = shardId(id, routing);
259241
return rerouteWritesIfResharding(shardId);
260242
}
261243

262-
@Override
263-
public int rerouteUpdateRequestIfResharding(String id, @Nullable String routing) {
264-
checkRoutingRequired(id, routing);
265-
int shardId = shardId(id, routing);
266-
return rerouteWritesIfResharding(shardId);
267-
}
268-
269244
@Override
270245
public int deleteShard(String id, @Nullable String routing) {
271246
checkRoutingRequired(id, routing);
272247
int shardId = shardId(id, routing);
273248
return rerouteWritesIfResharding(shardId);
274249
}
275250

276-
@Override
277-
public int rerouteDeleteRequestIfResharding(String id, @Nullable String routing) {
278-
checkRoutingRequired(id, routing);
279-
int shardId = shardId(id, routing);
280-
return rerouteWritesIfResharding(shardId);
281-
}
282-
283251
@Override
284252
public int getShard(String id, @Nullable String routing) {
285253
checkRoutingRequired(id, routing);
@@ -403,20 +371,6 @@ public int rerouteToTarget(IndexRequest indexRequest) {
403371

404372
protected abstract int hashSource(IndexRequest indexRequest);
405373

406-
// This is actually same as indexShard above minus the checkNoRouting check because routing
407-
// can be added to the request during postProcess. But we probably need to think of ways to
408-
// make this call cheaper.
409-
@Override
410-
public int rerouteIndexingRequestIfResharding(IndexRequest indexRequest) {
411-
// assert Transports.assertNotTransportThread("parsing the _source can get slow");
412-
// TODO: Is this always necessary ? This can be expensive. postProcess adds some additional metadata
413-
// TODO: to the indexing request, can that be used to get the hash in a cheaper way ? Or maybe we
414-
// TODO: can add the hash to the IndexRequest ?
415-
hash = hashSource(indexRequest);
416-
int shardId = hashToShardId(hash);
417-
return rerouteWritesIfResharding(shardId);
418-
}
419-
420374
private static int defaultOnEmpty() {
421375
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
422376
}
@@ -430,25 +384,13 @@ public int updateShard(String id, @Nullable String routing) {
430384
throw new IllegalArgumentException(error("update"));
431385
}
432386

433-
@Override
434-
public int rerouteUpdateRequestIfResharding(String id, @Nullable String routing) {
435-
throw new IllegalArgumentException(error("update"));
436-
}
437-
438387
@Override
439388
public int deleteShard(String id, @Nullable String routing) {
440389
checkNoRouting(routing);
441390
int shardId = idToHash(id);
442391
return rerouteWritesIfResharding(shardId);
443392
}
444393

445-
@Override
446-
public int rerouteDeleteRequestIfResharding(String id, @Nullable String routing) {
447-
checkNoRouting(routing);
448-
int shardId = idToHash(id);
449-
return rerouteWritesIfResharding(shardId);
450-
}
451-
452394
@Override
453395
public int getShard(String id, @Nullable String routing) {
454396
checkNoRouting(routing);

0 commit comments

Comments
 (0)