Skip to content

Commit 0b34aa8

Browse files
committed
replication action
1 parent d88b476 commit 0b34aa8

File tree

22 files changed

+235
-137
lines changed

22 files changed

+235
-137
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ protected void shardOperationOnPrimary(
114114
IndexShard primary,
115115
ActionListener<PrimaryResult<Request, Response>> listener
116116
) {
117-
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
117+
listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response()));
118118
}
119119

120120
@Override
121121
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
122-
listener.onResponse(new ReplicaResult());
122+
listener.onResponse(new ReplicaResult(replica));
123123
}
124124
}
125125

server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ protected void shardOperationOnPrimary(
124124
IndexShard primary,
125125
ActionListener<PrimaryResult<Request, Response>> listener
126126
) {
127-
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
127+
listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response()));
128128
}
129129

130130
@Override
131131
protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
132-
listener.onResponse(new ReplicaResult());
132+
listener.onResponse(new ReplicaResult(replica));
133133
}
134134
}
135135

server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,15 @@ protected void shardOperationOnPrimary(
110110
) {
111111
ActionListener.completeWith(listener, () -> {
112112
executeShardOperation(shardRequest, primary);
113-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
113+
return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse());
114114
});
115115
}
116116

117117
@Override
118118
protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
119119
ActionListener.completeWith(listener, () -> {
120120
executeShardOperation(shardRequest, replica);
121-
return new ReplicaResult();
121+
return new ReplicaResult(replica);
122122
});
123123
}
124124

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ protected void shardOperationOnPrimary(
8585
) {
8686
primary.flush(shardRequest.getRequest(), listener.map(flushed -> {
8787
logger.trace("{} flush request executed on primary", primary.shardId());
88-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
88+
return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse());
8989
}));
9090
}
9191

9292
@Override
9393
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
9494
replica.flush(request.getRequest(), listener.map(flushed -> {
9595
logger.trace("{} flush request executed on replica", replica.shardId());
96-
return new ReplicaResult();
96+
return new ReplicaResult(replica);
9797
}));
9898
}
9999

server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,15 @@ protected void shardOperationOnPrimary(
114114
) {
115115
ActionListener.completeWith(listener, () -> {
116116
executeShardOperation(shardRequest, primary);
117-
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
117+
return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse());
118118
});
119119
}
120120

121121
@Override
122122
protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
123123
ActionListener.completeWith(listener, () -> {
124124
executeShardOperation(shardRequest, replica);
125-
return new ReplicaResult();
125+
return new ReplicaResult(replica);
126126
});
127127
}
128128

server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ protected void shardOperationOnPrimary(
101101
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
102102
replicaRequest.setParentTask(shardRequest.getParentTask());
103103
logger.trace("{} refresh request executed on primary", primary.shardId());
104-
return new PrimaryResult<>(replicaRequest, new ReplicationResponse());
104+
return new PrimaryResult<>(primary, replicaRequest, new ReplicationResponse());
105105
}));
106106
}
107107

108108
@Override
109109
protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
110110
replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> {
111111
logger.trace("{} refresh request executed on replica", replica.shardId());
112-
return new ReplicaResult();
112+
return new ReplicaResult(replica);
113113
}));
114114
}
115115

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,21 +194,20 @@ private void handlePrimaryResult(
194194
pendingActionsListener
195195
);
196196
}
197-
primaryResult.runPostReplicationActions(new ActionListener<>() {
198-
197+
primaryResult.runPostReplicationActions(new PrimaryPostReplicationActionsListener() {
199198
@Override
200-
public void onResponse(Void aVoid) {
199+
public void onResponse(long globalCheckpoint, long localCheckpoint) {
201200
successfulShards.incrementAndGet();
202201
updateCheckPoints(
203202
primary.routingEntry(),
204-
primary::localCheckpoint,
205-
primary::globalCheckpoint,
203+
() -> localCheckpoint,
204+
() -> globalCheckpoint,
206205
() -> primaryCoordinationPendingActionListener.onResponse(null)
207206
);
208207
}
209208

210209
@Override
211-
public void onFailure(Exception e) {
210+
public void onFailure(long globalCheckpoint, long localCheckpoint, Exception e) {
212211
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
213212
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
214213
// go out of sync with the primary
@@ -217,8 +216,8 @@ public void onFailure(Exception e) {
217216
// is appended into the translog.
218217
updateCheckPoints(
219218
primary.routingEntry(),
220-
primary::localCheckpoint,
221-
primary::globalCheckpoint,
219+
() -> localCheckpoint,
220+
() -> globalCheckpoint,
222221
() -> primaryCoordinationPendingActionListener.onFailure(e)
223222
);
224223
}
@@ -718,7 +717,13 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
718717
* Run actions to be triggered post replication
719718
* @param listener callback that is invoked after post replication actions have completed
720719
* */
721-
void runPostReplicationActions(ActionListener<Void> listener);
720+
void runPostReplicationActions(PrimaryPostReplicationActionsListener listener);
722721
}
723722

723+
public interface PrimaryPostReplicationActionsListener {
724+
725+
void onResponse(long globalCheckpoint, long localCheckpoint);
726+
727+
void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex);
728+
}
724729
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -571,14 +571,16 @@ protected void adaptResponse(Response response, IndexShard indexShard) {
571571
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>, Response extends ReplicationResponse>
572572
implements
573573
ReplicationOperation.PrimaryResult<ReplicaRequest> {
574+
private final IndexShard primary;
574575
private final ReplicaRequest replicaRequest;
575576
public final Response replicationResponse;
576577

577578
/**
578579
* Result of executing a primary operation
579580
* expects <code>replicationResponse</code> to be not-null
580581
*/
581-
public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) {
582+
public PrimaryResult(IndexShard primary, ReplicaRequest replicaRequest, Response replicationResponse) {
583+
this.primary = Objects.requireNonNull(primary);
582584
assert replicaRequest != null : "request is required";
583585
assert replicationResponse != null : "response is required";
584586
this.replicaRequest = replicaRequest;
@@ -596,31 +598,40 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
596598
}
597599

598600
@Override
599-
public void runPostReplicationActions(ActionListener<Void> listener) {
600-
listener.onResponse(null);
601+
public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) {
602+
listener.onResponse(primary.getLastSyncedGlobalCheckpoint(), primary.getLocalCheckpoint());
601603
}
602604
}
603605

604606
public static class ReplicaResult {
605-
final Exception finalFailure;
607+
protected final IndexShard replica;
608+
protected final Exception finalFailure;
606609

607-
public ReplicaResult(Exception finalFailure) {
610+
public ReplicaResult(IndexShard replica, Exception finalFailure) {
611+
this.replica = Objects.requireNonNull(replica);
608612
this.finalFailure = finalFailure;
609613
}
610614

611-
public ReplicaResult() {
612-
this(null);
615+
public ReplicaResult(IndexShard replica) {
616+
this(replica, null);
613617
}
614618

615-
public void runPostReplicaActions(ActionListener<Void> listener) {
619+
public void runPostReplicaActions(ReplicaPostReplicationActionsListener listener) {
616620
if (finalFailure != null) {
617621
listener.onFailure(finalFailure);
618622
} else {
619-
listener.onResponse(null);
623+
listener.onResponse(replica.getLastSyncedGlobalCheckpoint(), replica.getLocalCheckpoint());
620624
}
621625
}
622626
}
623627

628+
public interface ReplicaPostReplicationActionsListener {
629+
630+
void onResponse(long globalCheckpoint, long localCheckpoint);
631+
632+
void onFailure(Exception ex);
633+
}
634+
624635
protected void handleReplicaRequest(
625636
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,
626637
final TransportChannel channel,
@@ -681,12 +692,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
681692
public void onResponse(Releasable releasable) {
682693
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
683694
try {
684-
shardOperationOnReplica(
685-
replicaRequest.getRequest(),
686-
replica,
687-
ActionListener.wrap((replicaResult) -> {
688-
final var response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
689-
replicaResult.runPostReplicaActions(ActionListener.wrap(r -> {
695+
shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> {
696+
replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() {
697+
@Override
698+
public void onResponse(long globalCheckpoint, long localCheckpoint) {
699+
final ReplicaResponse response = new ReplicaResponse(globalCheckpoint, localCheckpoint);
690700
releasable.close(); // release shard operation lock before responding to caller
691701
if (logger.isTraceEnabled()) {
692702
logger.trace(
@@ -698,15 +708,19 @@ public void onResponse(Releasable releasable) {
698708
}
699709
setPhase(task, "finished");
700710
onCompletionListener.onResponse(response);
701-
}, e -> {
702-
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
711+
}
712+
713+
@Override
714+
public void onFailure(Exception e) {
715+
// release shard operation lock before responding to caller
716+
Releasables.closeWhileHandlingException(releasable);
703717
responseWithFailure(e);
704-
}));
705-
}, e -> {
706-
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
707-
AsyncReplicaAction.this.onFailure(e);
708-
})
709-
);
718+
}
719+
});
720+
}, e -> {
721+
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
722+
AsyncReplicaAction.this.onFailure(e);
723+
}));
710724
// TODO: Evaluate if we still need to catch this exception
711725
} catch (Exception e) {
712726
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
@@ -1139,15 +1153,11 @@ class PrimaryShardReference
11391153
protected final IndexShard indexShard;
11401154
private final Releasable operationLock;
11411155
private final Supplier<SeqNoStats> seqNoStatsSupplier;
1142-
private final Supplier<Long> localCheckpointSupplier;
1143-
private final Supplier<Long> globalCheckpointSupplier;
11441156

11451157
PrimaryShardReference(IndexShard indexShard, Releasable operationLock) {
11461158
this.indexShard = indexShard;
11471159
this.operationLock = operationLock;
11481160
this.seqNoStatsSupplier = indexShard.getSeqNoStatsSupplier();
1149-
this.localCheckpointSupplier = indexShard.getLocalCheckpointSupplier();
1150-
this.globalCheckpointSupplier = indexShard.getLastSyncedGlobalCheckpointSupplier();
11511161
}
11521162

11531163
@Override
@@ -1197,12 +1207,12 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
11971207

11981208
@Override
11991209
public long localCheckpoint() {
1200-
return localCheckpointSupplier.get();
1210+
return indexShard.getLocalCheckpoint();
12011211
}
12021212

12031213
@Override
12041214
public long globalCheckpoint() {
1205-
return globalCheckpointSupplier.get();
1215+
return indexShard.getLastSyncedGlobalCheckpoint();
12061216
}
12071217

12081218
@Override

0 commit comments

Comments
 (0)