Skip to content

Commit dc0fb02

Browse files
committed
Change
1 parent aa8505e commit dc0fb02

File tree

3 files changed

+650
-45
lines changed

3 files changed

+650
-45
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationSplitHelper.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,24 +62,22 @@ public ReplicationSplitHelper(
6262
}
6363

6464
public static <Request extends ReplicationRequest<Request>> boolean needsSplitCoordination(
65-
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest,
65+
final Request primaryRequest,
6666
final IndexMetadata indexMetadata
6767
) {
68-
SplitShardCountSummary requestSplitSummary = primaryRequest.getRequest().reshardSplitShardCountSummary();
68+
SplitShardCountSummary requestSplitSummary = primaryRequest.reshardSplitShardCountSummary();
6969
// TODO: We currently only set the request split summary transport shard bulk. Only evaluate this at the moment or else every
7070
// request would say it needs a split.
7171
return requestSplitSummary.isUnset() == false
72-
&& requestSplitSummary.equals(
73-
SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.getRequest().shardId().getId())
74-
) == false;
72+
&& requestSplitSummary.equals(SplitShardCountSummary.forIndexing(indexMetadata, primaryRequest.shardId().getId())) == false;
7573
}
7674

7775
public SplitCoordinator newSplitRequest(
7876
TransportReplicationAction<Request, ReplicaRequest, Response> action,
7977
ReplicationTask task,
8078
ProjectMetadata project,
8179
TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference primaryShardReference,
82-
TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest,
80+
Request primaryRequest,
8381
CheckedBiConsumer<
8482
TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference,
8583
ActionListener<Response>,
@@ -103,7 +101,7 @@ public class SplitCoordinator {
103101
private final ReplicationTask task;
104102
private final ProjectMetadata project;
105103
private final TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference primaryShardReference;
106-
private final TransportReplicationAction.ConcreteShardRequest<Request> originalRequest;
104+
private final Request originalRequest;
107105
private final CheckedBiConsumer<
108106
TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference,
109107
ActionListener<Response>,
@@ -115,7 +113,7 @@ public SplitCoordinator(
115113
ReplicationTask task,
116114
ProjectMetadata project,
117115
TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference primaryShardReference,
118-
TransportReplicationAction.ConcreteShardRequest<Request> originalRequest,
116+
Request originalRequest,
119117
CheckedBiConsumer<
120118
TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference,
121119
ActionListener<Response>,
@@ -132,7 +130,7 @@ public SplitCoordinator(
132130
}
133131

134132
public void coordinate() throws Exception {
135-
Map<ShardId, Request> splitRequests = action.splitRequestOnPrimary(originalRequest.getRequest());
133+
Map<ShardId, Request> splitRequests = action.splitRequestOnPrimary(originalRequest);
136134

137135
int numSplitRequests = splitRequests.size();
138136

@@ -142,7 +140,8 @@ public void coordinate() throws Exception {
142140

143141
if (numSplitRequests == 1) {
144142
// If the request is for source, same behavior as before
145-
if (splitRequests.containsKey(originalRequest.getRequest().shardId())) {
143+
if (splitRequests.containsKey(originalRequest.shardId())) {
144+
TransportReplicationAction.setPhase(task, "primary");
146145
doPrimaryRequest.accept(primaryShardReference, onCompletionListener);
147146
} else {
148147
// If the request is for target, forward request to target.
@@ -198,11 +197,7 @@ public void onFailure(Exception e) {
198197
}
199198

200199
private void finish() {
201-
Tuple<Response, Exception> finalResponse = action.combineSplitResponses(
202-
originalRequest.getRequest(),
203-
splitRequests,
204-
results
205-
);
200+
Tuple<Response, Exception> finalResponse = action.combineSplitResponses(originalRequest, splitRequests, results);
206201
TransportReplicationAction.setPhase(task, "finished");
207202
if (finalResponse.v1() != null) {
208203
onCompletionListener.onResponse(finalResponse.v1());
@@ -211,7 +206,7 @@ private void finish() {
211206
}
212207
}
213208
};
214-
if (splitRequest.getKey().equals(originalRequest.getRequest().shardId())) {
209+
if (splitRequest.getKey().equals(originalRequest.shardId())) {
215210
doPrimaryRequest.accept(primaryShardReference, listener);
216211
} else {
217212
delegateToTarget(splitRequest.getKey(), splitRequest.getValue(), clusterService::state, project, listener);

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ protected void doRun() throws Exception {
492492
}
493493

494494
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
495+
ActionListener<Response> setFinishedListener = ActionListener.runBefore(
496+
onCompletionListener,
497+
() -> setPhase(replicationTask, "finished")
498+
);
495499
try {
496500
final ClusterState clusterState = clusterService.state();
497501
final Index index = primaryShardReference.routingEntry().index();
@@ -521,51 +525,37 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
521525
new ConcreteShardRequest<>(primaryRequest.getRequest(), allocationID, primaryRequest.getPrimaryTerm()),
522526
transportOptions,
523527
new ActionListenerResponseHandler<>(
524-
onCompletionListener,
528+
setFinishedListener,
525529
TransportReplicationAction.this::newResponseInstance,
526530
TransportResponseHandler.TRANSPORT_WORKER
527-
) {
528-
529-
@Override
530-
public void handleResponse(Response response) {
531-
setPhase(replicationTask, "finished");
532-
super.handleResponse(response);
533-
}
534-
535-
@Override
536-
public void handleException(TransportException exp) {
537-
setPhase(replicationTask, "finished");
538-
super.handleException(exp);
539-
}
540-
}
531+
)
541532
);
542-
} else if (ReplicationSplitHelper.needsSplitCoordination(primaryRequest, indexMetadata)) {
533+
} else if (ReplicationSplitHelper.needsSplitCoordination(primaryRequest.getRequest(), indexMetadata)) {
543534
ReplicationSplitHelper<Request, ReplicaRequest, Response>.SplitCoordinator splitCoordinator = splitHelper
544535
.newSplitRequest(
545536
TransportReplicationAction.this,
546537
replicationTask,
547538
project,
548539
primaryShardReference,
549-
primaryRequest,
540+
primaryRequest.getRequest(),
550541
this::executePrimaryRequest,
551-
onCompletionListener
542+
setFinishedListener
552543
);
553544
splitCoordinator.coordinate();
554545
} else {
555-
executePrimaryRequest(primaryShardReference, onCompletionListener);
546+
setPhase(replicationTask, "primary");
547+
executePrimaryRequest(primaryShardReference, setFinishedListener);
556548
}
557549
} catch (Exception e) {
558550
Releasables.closeWhileHandlingException(primaryShardReference);
559-
onFailure(e);
551+
setFinishedListener.onFailure(e);
560552
}
561553
}
562554

563555
private void executePrimaryRequest(
564556
final TransportReplicationAction<Request, ReplicaRequest, Response>.PrimaryShardReference primaryShardReference,
565557
final ActionListener<Response> listener
566558
) throws Exception {
567-
setPhase(replicationTask, "primary");
568-
569559
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
570560
adaptResponse(response, primaryShardReference.indexShard);
571561

@@ -589,9 +579,11 @@ private void executePrimaryRequest(
589579

590580
assert primaryShardReference.indexShard.isPrimaryMode();
591581
primaryShardReference.close(); // release shard operation lock before responding to caller
592-
setPhase(replicationTask, "finished");
593582
listener.onResponse(response);
594-
}, e -> handleException(primaryShardReference, e));
583+
}, e -> {
584+
Releasables.closeWhileHandlingException(primaryShardReference);
585+
listener.onFailure(e);
586+
});
595587

596588
new ReplicationOperation<>(
597589
primaryRequest.getRequest(),
@@ -607,11 +599,6 @@ private void executePrimaryRequest(
607599
).execute();
608600
}
609601

610-
private void handleException(PrimaryShardReference primaryShardReference, Exception e) {
611-
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
612-
onFailure(e);
613-
}
614-
615602
@Override
616603
public void onFailure(Exception e) {
617604
setPhase(replicationTask, "finished");

0 commit comments

Comments
 (0)