Skip to content

Commit 9c3b973

Browse files
authored
ReplicationOperation exceptions should not escape (#116074)
We introduce ActionListener.run() in order to ensure the RefCountingListener introduced by PR #115341 , is the single point that is failed upon exceptions, and no exception escapes through the ReplicationOperation.execute() method. Fixes #116071 Fixes #116081 Fixes #116073
1 parent a6cd265 commit 9c3b973

File tree

1 file changed

+52
-49
lines changed

1 file changed

+52
-49
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 52 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -115,28 +115,25 @@ public void execute() throws Exception {
115115
);
116116
resultListener.onResponse(primaryResult);
117117
}, resultListener::onFailure))) {
118-
final String activeShardCountFailure = checkActiveShardCount();
119-
final ShardRouting primaryRouting = primary.routingEntry();
120-
final ShardId primaryId = primaryRouting.shardId();
121-
if (activeShardCountFailure != null) {
122-
pendingActionsListener.acquire()
123-
.onFailure(
124-
new UnavailableShardsException(
125-
primaryId,
126-
"{} Timeout: [{}], request: [{}]",
127-
activeShardCountFailure,
128-
request.timeout(),
129-
request
130-
)
118+
ActionListener.run(pendingActionsListener.acquire(), (primaryCoordinationListener) -> { // triggered when we finish coordination
119+
final String activeShardCountFailure = checkActiveShardCount();
120+
final ShardRouting primaryRouting = primary.routingEntry();
121+
final ShardId primaryId = primaryRouting.shardId();
122+
if (activeShardCountFailure != null) {
123+
throw new UnavailableShardsException(
124+
primaryId,
125+
"{} Timeout: [{}], request: [{}]",
126+
activeShardCountFailure,
127+
request.timeout(),
128+
request
131129
);
132-
return;
133-
}
130+
}
134131

135-
totalShards.incrementAndGet();
136-
var primaryCoordinationPendingActionListener = pendingActionsListener.acquire(); // triggered when we finish all coordination
137-
primary.perform(request, primaryCoordinationPendingActionListener.delegateFailureAndWrap((l, primaryResult) -> {
138-
handlePrimaryResult(primaryResult, l, pendingActionsListener);
139-
}));
132+
totalShards.incrementAndGet();
133+
primary.perform(request, primaryCoordinationListener.delegateFailureAndWrap((l, primaryResult) -> {
134+
handlePrimaryResult(primaryResult, l, pendingActionsListener);
135+
}));
136+
});
140137
}
141138
}
142139

@@ -153,24 +150,25 @@ private void handlePrimaryResult(
153150
}
154151
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
155152

156-
var primaryOperationPendingActionListener = pendingActionsListener.acquire();
157-
replicasProxy.onPrimaryOperationComplete(
158-
replicaRequest,
159-
replicationGroup.getRoutingTable(),
160-
ActionListener.wrap(ignored -> primaryOperationPendingActionListener.onResponse(null), exception -> {
161-
totalShards.incrementAndGet();
162-
shardReplicaFailures.add(
163-
new ReplicationResponse.ShardInfo.Failure(
164-
primary.routingEntry().shardId(),
165-
null,
166-
exception,
167-
ExceptionsHelper.status(exception),
168-
false
169-
)
170-
);
171-
primaryOperationPendingActionListener.onResponse(null);
172-
})
173-
);
153+
ActionListener.run(pendingActionsListener.acquire(), primaryOperationPendingActionListener -> {
154+
replicasProxy.onPrimaryOperationComplete(
155+
replicaRequest,
156+
replicationGroup.getRoutingTable(),
157+
ActionListener.wrap(ignored -> primaryOperationPendingActionListener.onResponse(null), exception -> {
158+
totalShards.incrementAndGet();
159+
shardReplicaFailures.add(
160+
new ReplicationResponse.ShardInfo.Failure(
161+
primary.routingEntry().shardId(),
162+
null,
163+
exception,
164+
ExceptionsHelper.status(exception),
165+
false
166+
)
167+
);
168+
primaryOperationPendingActionListener.onResponse(null);
169+
})
170+
);
171+
});
174172

175173
// we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
176174
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
@@ -234,13 +232,14 @@ private void markUnavailableShardsAsStale(
234232
) {
235233
// if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
236234
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
237-
var staleCopyPendingActionListener = pendingActionsListener.acquire();
238-
replicasProxy.markShardCopyAsStaleIfNeeded(
239-
replicaRequest.shardId(),
240-
allocationId,
241-
primaryTerm,
242-
staleCopyPendingActionListener.delegateResponse((l, e) -> onNoLongerPrimary(e, l))
243-
);
235+
ActionListener.run(pendingActionsListener.acquire(), (staleCopyPendingActionListener) -> {
236+
replicasProxy.markShardCopyAsStaleIfNeeded(
237+
replicaRequest.shardId(),
238+
allocationId,
239+
primaryTerm,
240+
staleCopyPendingActionListener.delegateResponse((l, e) -> onNoLongerPrimary(e, l))
241+
);
242+
});
244243
}
245244
}
246245

@@ -285,13 +284,17 @@ private void performOnReplica(
285284
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
286285
}
287286
totalShards.incrementAndGet();
288-
var replicationPendingActionListener = pendingActionsListener.acquire();
289-
ActionListener.run(replicationPendingActionListener, (listener) -> {
287+
ActionListener.run(pendingActionsListener.acquire(), (replicationPendingActionListener) -> {
290288
final ActionListener<ReplicaResponse> replicationListener = new ActionListener<>() {
291289
@Override
292290
public void onResponse(ReplicaResponse response) {
293291
successfulShards.incrementAndGet();
294-
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint, () -> listener.onResponse(null));
292+
updateCheckPoints(
293+
shard,
294+
response::localCheckpoint,
295+
response::globalCheckpoint,
296+
() -> replicationPendingActionListener.onResponse(null)
297+
);
295298
}
296299

297300
@Override
@@ -325,7 +328,7 @@ public void onFailure(Exception replicaException) {
325328
primaryTerm,
326329
message,
327330
replicaException,
328-
listener.delegateResponse((l, e) -> onNoLongerPrimary(e, l))
331+
replicationPendingActionListener.delegateResponse((l, e) -> onNoLongerPrimary(e, l))
329332
);
330333
}
331334

0 commit comments

Comments
 (0)