Skip to content

Commit b760563

Browse files
authored
IGNITE-24499 Extract Primary Replica negotiation logic from ReplicaImpl do dedicated class (#5225)
1 parent b613af6 commit b760563

File tree

4 files changed

+329
-235
lines changed

4 files changed

+329
-235
lines changed
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.replicator;
19+
20+
import static java.lang.System.currentTimeMillis;
21+
import static java.util.concurrent.CompletableFuture.completedFuture;
22+
import static java.util.concurrent.CompletableFuture.failedFuture;
23+
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
24+
import static org.apache.ignite.internal.util.IgniteUtils.retryOperationUntilSuccess;
25+
26+
import java.util.UUID;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
import java.util.function.BiFunction;
32+
import org.apache.ignite.internal.hlc.ClockService;
33+
import org.apache.ignite.internal.hlc.HybridTimestamp;
34+
import org.apache.ignite.internal.lang.NodeStoppingException;
35+
import org.apache.ignite.internal.logger.IgniteLogger;
36+
import org.apache.ignite.internal.logger.Loggers;
37+
import org.apache.ignite.internal.network.NetworkMessage;
38+
import org.apache.ignite.internal.placementdriver.PlacementDriver;
39+
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
40+
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
41+
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
42+
import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
43+
import org.apache.ignite.internal.raft.Peer;
44+
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
45+
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
46+
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
47+
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
48+
import org.apache.ignite.internal.util.TrackerClosedException;
49+
import org.apache.ignite.network.ClusterNode;
50+
import org.jetbrains.annotations.Nullable;
51+
52+
/**
53+
* Replica specific placement driver message processor.
54+
*/
55+
public class PlacementDriverMessageProcessor {
56+
private static final IgniteLogger LOG = Loggers.forClass(PlacementDriverMessageProcessor.class);
57+
58+
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
59+
60+
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
61+
62+
private final ReplicationGroupId groupId;
63+
64+
private final ClusterNode localNode;
65+
66+
private final PlacementDriver placementDriver;
67+
68+
private final ClockService clockService;
69+
70+
private final BiFunction<ReplicationGroupId, HybridTimestamp, Boolean> replicaReservationClosure;
71+
72+
// TODO: IGNITE-20063 Maybe get rid of it
73+
private final ExecutorService executor;
74+
75+
/** Latest lease expiration time. */
76+
private volatile HybridTimestamp leaseExpirationTime;
77+
78+
private final PendingComparableValuesTracker<Long, Void> storageIndexTracker;
79+
80+
// TODO IGNITE-19120 after replica inoperability logic is introduced, this future should be replaced with something like
81+
// VersionedValue (so that PlacementDriverMessages would wait for new leader election)
82+
/** Completes when leader is elected. */
83+
private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>();
84+
85+
/** Container of the elected leader. */
86+
private volatile @Nullable ClusterNode leaderRef = null;
87+
88+
private final TopologyAwareRaftGroupService raftClient;
89+
90+
/**
91+
* The constructor of a replica server.
92+
*
93+
* @param groupId Replication group id.
94+
* @param localNode Instance of the local node.
95+
* @param placementDriver Placement driver.
96+
* @param clockService Clock service.
97+
* @param replicaReservationClosure Closure that will be called to reserve the replica for becoming primary. It returns whether
98+
* the reservation was successful.
99+
* @param executor External executor.
100+
* @param storageIndexTracker Storage index tracker.
101+
* @param raftClient Raft client.
102+
*/
103+
104+
PlacementDriverMessageProcessor(
105+
ReplicationGroupId groupId,
106+
ClusterNode localNode,
107+
PlacementDriver placementDriver,
108+
ClockService clockService,
109+
BiFunction<ReplicationGroupId, HybridTimestamp, Boolean> replicaReservationClosure,
110+
ExecutorService executor,
111+
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
112+
TopologyAwareRaftGroupService raftClient
113+
) {
114+
this.groupId = groupId;
115+
this.localNode = localNode;
116+
this.placementDriver = placementDriver;
117+
this.clockService = clockService;
118+
this.replicaReservationClosure = replicaReservationClosure;
119+
this.executor = executor;
120+
this.storageIndexTracker = storageIndexTracker;
121+
this.raftClient = raftClient;
122+
123+
raftClient.subscribeLeader(this::onLeaderElected);
124+
}
125+
126+
/**
127+
* Process placement driver message.
128+
*
129+
* @param msg Message to process.
130+
* @return Future that contains a result.
131+
*/
132+
CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
133+
if (msg instanceof LeaseGrantedMessage) {
134+
return processLeaseGrantedMessage((LeaseGrantedMessage) msg)
135+
.handle((v, e) -> {
136+
if (e != null) {
137+
Throwable ex = unwrapCause(e);
138+
139+
if (!(ex instanceof NodeStoppingException) && !(ex instanceof TrackerClosedException)) {
140+
LOG.warn("Failed to process the lease granted message [msg={}].", ex, msg);
141+
}
142+
143+
// Just restart the negotiation in case of exception.
144+
return PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
145+
.accepted(false)
146+
.build();
147+
} else {
148+
return v;
149+
}
150+
});
151+
}
152+
153+
return failedFuture(new AssertionError("Unknown message type, msg=" + msg));
154+
}
155+
156+
/**
157+
* Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance,
158+
* initiates the leadership transfer, if this replica is not a group leader.
159+
*
160+
* @param msg Message to process.
161+
* @return Future that contains a result.
162+
*/
163+
private CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) {
164+
LOG.info("Received LeaseGrantedMessage for replica [groupId={}, leaseStartTime={}, force={}].", groupId, msg.leaseStartTime(),
165+
msg.force());
166+
167+
return placementDriver.previousPrimaryExpired(groupId).thenCompose(unused -> leaderFuture().thenCompose(leader -> {
168+
HybridTimestamp leaseExpirationTime = this.leaseExpirationTime;
169+
170+
assert leaseExpirationTime == null || clockService.after(msg.leaseExpirationTime(), leaseExpirationTime)
171+
: "Invalid lease expiration time in message, msg=" + msg;
172+
173+
if (msg.force()) {
174+
// Replica must wait till storage index reaches the current leader's index to make sure that all updates made on the
175+
// group leader are received.
176+
return waitForActualState(msg.leaseStartTime(), msg.leaseExpirationTime().getPhysical())
177+
.thenCompose(v -> sendPrimaryReplicaChangeToReplicationGroup(
178+
msg.leaseStartTime().longValue(),
179+
localNode.id(),
180+
localNode.name()
181+
))
182+
.thenCompose(v -> {
183+
CompletableFuture<LeaseGrantedMessageResponse> respFut =
184+
acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime());
185+
186+
if (leader.equals(localNode)) {
187+
return respFut;
188+
} else {
189+
return raftClient.transferLeadership(new Peer(localNode.name()))
190+
.thenCompose(ignored -> respFut);
191+
}
192+
});
193+
} else {
194+
if (leader.equals(localNode)) {
195+
return waitForActualState(msg.leaseStartTime(), msg.leaseExpirationTime().getPhysical())
196+
.thenCompose(v -> sendPrimaryReplicaChangeToReplicationGroup(
197+
msg.leaseStartTime().longValue(),
198+
localNode.id(),
199+
localNode.name()
200+
))
201+
.thenCompose(v -> acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()));
202+
} else {
203+
return proposeLeaseRedirect(leader);
204+
}
205+
}
206+
}));
207+
}
208+
209+
private CompletableFuture<Void> sendPrimaryReplicaChangeToReplicationGroup(
210+
long leaseStartTime,
211+
UUID primaryReplicaNodeId,
212+
String primaryReplicaNodeName
213+
) {
214+
PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
215+
.leaseStartTime(leaseStartTime)
216+
.primaryReplicaNodeId(primaryReplicaNodeId)
217+
.primaryReplicaNodeName(primaryReplicaNodeName)
218+
.build();
219+
220+
return raftClient.run(cmd);
221+
}
222+
223+
private CompletableFuture<LeaseGrantedMessageResponse> acceptLease(
224+
HybridTimestamp leaseStartTime,
225+
HybridTimestamp leaseExpirationTime
226+
) {
227+
LOG.info("Lease accepted [group=" + groupId + ", leaseStartTime=" + leaseStartTime + "].");
228+
229+
this.leaseExpirationTime = leaseExpirationTime;
230+
231+
LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
232+
.accepted(true)
233+
.build();
234+
235+
return completedFuture(resp);
236+
}
237+
238+
private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(ClusterNode groupLeader) {
239+
LOG.info("Proposing lease redirection [groupId={}, proposed node={}].", groupId, groupLeader);
240+
241+
LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
242+
.accepted(false)
243+
.redirectProposal(groupLeader.name())
244+
.build();
245+
246+
return completedFuture(resp);
247+
}
248+
249+
/**
250+
* Tries to read index from group leader and wait for this index to appear in local storage. Can possible return failed future with
251+
* timeout exception, and in this case, replica would not answer to placement driver, because the response is useless. Placement driver
252+
* should handle this.
253+
*
254+
* @param startTime Lease start time.
255+
* @param expirationTime Lease expiration time.
256+
* @return Future that is completed when local storage catches up the index that is actual for leader on the moment of request.
257+
*/
258+
private CompletableFuture<Void> waitForActualState(HybridTimestamp startTime, long expirationTime) {
259+
LOG.info("Waiting for actual storage state, group=" + groupId);
260+
261+
if (!replicaReservationClosure.apply(groupId, startTime)) {
262+
throw new IllegalStateException("Replica reservation failed [groupId=" + groupId + ", leaseStartTime=" + startTime + "].");
263+
}
264+
265+
long timeout = expirationTime - currentTimeMillis();
266+
if (timeout <= 0) {
267+
return failedFuture(new TimeoutException());
268+
}
269+
270+
return retryOperationUntilSuccess(raftClient::readIndex, e -> currentTimeMillis() > expirationTime, executor)
271+
.orTimeout(timeout, TimeUnit.MILLISECONDS)
272+
.thenCompose(storageIndexTracker::waitFor);
273+
}
274+
275+
private void onLeaderElected(ClusterNode clusterNode, long term) {
276+
leaderRef = clusterNode;
277+
leaderReadyFuture.complete(null);
278+
}
279+
280+
private CompletableFuture<ClusterNode> leaderFuture() {
281+
return leaderReadyFuture.thenApply(ignored -> leaderRef);
282+
}
283+
}

0 commit comments

Comments
 (0)