Skip to content

Commit c49b848

Browse files
committed
[server] Improve the implementation of Coordinator HA
1 parent bbc445a commit c49b848

File tree

13 files changed

+123
-253
lines changed

13 files changed

+123
-253
lines changed

fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package org.apache.fluss.exception;
1919

20-
/** Exception thrown when a request is sent to a stand by coordinator server. since: 0.9 */
20+
/**
21+
* Exception thrown when a request is sent to a stand by coordinator server.
22+
*
23+
* @since 1.0
24+
*/
2125
public class NotCoordinatorLeaderException extends ApiException {
2226

2327
private static final long serialVersionUID = 1L;

fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
/** The entry point of RPC gateway interface for coordinator server. */
4141
public interface CoordinatorGateway extends RpcGateway, AdminGateway {
4242

43+
/** Checks if the current server is the leader. */
44+
boolean isLeader();
45+
4346
/**
4447
* AdjustIsr request to adjust (expend or shrink) the ISR set for request table bucket.
4548
*

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.fluss.rpc.netty.server;
1919

20+
import org.apache.fluss.exception.NotCoordinatorLeaderException;
2021
import org.apache.fluss.rpc.RpcGatewayService;
22+
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
2123
import org.apache.fluss.rpc.messages.ApiMessage;
24+
import org.apache.fluss.rpc.protocol.ApiKeys;
2225
import org.apache.fluss.rpc.protocol.ApiMethod;
2326
import org.apache.fluss.rpc.protocol.RequestType;
2427

@@ -35,9 +38,11 @@ public class FlussRequestHandler implements RequestHandler<FlussRequest> {
3538
private static final Logger LOG = LoggerFactory.getLogger(FlussRequestHandler.class);
3639

3740
private final RpcGatewayService service;
41+
private final boolean isCoordinator;
3842

3943
public FlussRequestHandler(RpcGatewayService service) {
4044
this.service = service;
45+
this.isCoordinator = service instanceof CoordinatorGateway;
4146
}
4247

4348
@Override
@@ -58,6 +63,16 @@ public void processRequest(FlussRequest request) {
5863
request.isInternal(),
5964
request.getAddress(),
6065
request.getPrincipal()));
66+
// check if the coordinator server is the current leader if the API is a coordinator
67+
// TODO: we should only check coordinator APIs instead of all APIs
68+
if (isCoordinator && api.getApiKey() != ApiKeys.API_VERSIONS) {
69+
if (!((CoordinatorGateway) service).isLeader()) {
70+
request.fail(
71+
new NotCoordinatorLeaderException(
72+
"This coordinator server is not the current leader."));
73+
return;
74+
}
75+
}
6176
// invoke the corresponding method on RpcGateway instance.
6277
CompletableFuture<?> responseFuture =
6378
(CompletableFuture<?>) api.getMethod().invoke(service, message);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,10 +710,10 @@ public void resetContext() {
710710
tablesToBeDeleted.clear();
711711
coordinatorEpoch = 0;
712712
clearTablesState();
713-
// clear the live tablet servers
714713
liveTabletServers.clear();
715714
shuttingDownTabletServers.clear();
716715
serverTags.clear();
716+
liveCoordinatorServers.clear();
717717
}
718718

719719
public int getTotalPartitionCount() {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 31 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,15 @@ public class CoordinatorLeaderElection implements AutoCloseable {
5353
private final String serverId;
5454
private final LeaderLatch leaderLatch;
5555
private final AtomicBoolean isLeader = new AtomicBoolean(false);
56-
private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>();
57-
// Single-threaded executor to run leader init callbacks outside Curator's EventThread.
56+
// Cached thread pool to run leader init/cleanup callbacks outside Curator's EventThread.
5857
// Curator's LeaderLatchListener callbacks run on its internal EventThread; performing
5958
// synchronous ZK operations there causes deadlock because ZK response dispatch also
60-
// needs that same thread.
59+
// needs that same thread. A cached pool is used because these callbacks are transient
60+
// (only run during leadership transitions), so idle threads can be reclaimed.
6161
private final ExecutorService leaderCallbackExecutor;
6262
// Tracks the pending cleanup task so that init can wait for it to complete.
6363
private final AtomicReference<CompletableFuture<Void>> pendingCleanup =
6464
new AtomicReference<>(CompletableFuture.completedFuture(null));
65-
private volatile Runnable initLeaderServices;
66-
private volatile Consumer<Throwable> cleanupLeaderServices;
6765

6866
public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) {
6967
this.serverId = serverId;
@@ -73,34 +71,35 @@ public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) {
7371
ZkData.CoordinatorElectionZNode.path(),
7472
String.valueOf(serverId));
7573
this.leaderCallbackExecutor =
76-
Executors.newSingleThreadExecutor(
74+
Executors.newCachedThreadPool(
7775
r -> {
7876
Thread t = new Thread(r, "coordinator-leader-callback-" + serverId);
77+
// Daemon threads ensure the JVM can exit even if close() is not
78+
// called. Orderly shutdown is handled by close() -> shutdownNow().
7979
t.setDaemon(true);
8080
return t;
8181
});
8282
}
8383

8484
/**
85-
* Starts the leader election process asynchronously. The returned future completes when this
86-
* server becomes the leader for the first time and initializes the leader services.
85+
* Starts the leader election process asynchronously.
8786
*
8887
* <p>After the first election, the server will continue to participate in future elections.
8988
* When re-elected as leader, the initLeaderServices callback will be invoked again.
9089
*
9190
* @param initLeaderServices the callback to initialize leader services once elected
9291
* @param cleanupLeaderServices the callback to clean up leader services when losing leadership
93-
* @return a CompletableFuture that completes when this server becomes leader for the first time
9492
*/
95-
public CompletableFuture<Void> startElectLeaderAsync(
93+
public void startElectLeaderAsync(
9694
Runnable initLeaderServices, Consumer<Throwable> cleanupLeaderServices) {
97-
this.initLeaderServices = initLeaderServices;
98-
this.cleanupLeaderServices = cleanupLeaderServices;
99-
10095
leaderLatch.addListener(
10196
new LeaderLatchListener() {
10297
@Override
10398
public void isLeader() {
99+
// return if already marked as leader to avoid duplicate init calls
100+
if (isLeader.get()) {
101+
return;
102+
}
104103
LOG.info("Coordinator server {} has become the leader.", serverId);
105104
// Capture the pending cleanup future at this point so that
106105
// init waits for it before proceeding.
@@ -124,15 +123,15 @@ public void isLeader() {
124123
}
125124
try {
126125
initLeaderServices.run();
127-
leaderReadyFuture.complete(null);
128126
} catch (Exception e) {
129127
LOG.error(
130128
"Failed to initialize leader services for server {}",
131129
serverId,
132130
e);
133-
leaderReadyFuture.completeExceptionally(e);
134131
}
135132
});
133+
// Set leader flag before init completes, so when zk found this leader, the
134+
// leader can accept requests
136135
isLeader.set(true);
137136
}
138137

@@ -142,31 +141,27 @@ public void notLeader() {
142141
LOG.warn(
143142
"Coordinator server {} has lost the leadership, cleaning up leader services.",
144143
serverId);
145-
// Run cleanup on a separate daemon thread (NOT on the
146-
// leaderCallbackExecutor) to avoid blocking init tasks.
144+
// Submit cleanup to leaderCallbackExecutor. The cached thread
145+
// pool can spawn a new thread even if init is still running.
147146
// The cleanup completion is tracked via pendingCleanup so
148147
// that subsequent init waits for it.
149148
CompletableFuture<Void> cleanupFuture = new CompletableFuture<>();
150149
pendingCleanup.set(cleanupFuture);
151-
Thread cleanupThread =
152-
new Thread(
153-
() -> {
154-
try {
155-
if (cleanupLeaderServices != null) {
156-
cleanupLeaderServices.accept(null);
157-
}
158-
} catch (Exception e) {
159-
LOG.error(
160-
"Failed to cleanup leader services for server {}",
161-
serverId,
162-
e);
163-
} finally {
164-
cleanupFuture.complete(null);
165-
}
166-
},
167-
"coordinator-leader-cleanup-" + serverId);
168-
cleanupThread.setDaemon(true);
169-
cleanupThread.start();
150+
leaderCallbackExecutor.execute(
151+
() -> {
152+
try {
153+
if (cleanupLeaderServices != null) {
154+
cleanupLeaderServices.accept(null);
155+
}
156+
} catch (Exception e) {
157+
LOG.error(
158+
"Failed to cleanup leader services for server {}",
159+
serverId,
160+
e);
161+
} finally {
162+
cleanupFuture.complete(null);
163+
}
164+
});
170165
}
171166
}
172167
});
@@ -176,11 +171,7 @@ public void notLeader() {
176171
LOG.info("Coordinator server {} started leader election.", serverId);
177172
} catch (Exception e) {
178173
LOG.error("Failed to start LeaderLatch for server {}", serverId, e);
179-
leaderReadyFuture.completeExceptionally(
180-
new RuntimeException("Leader election start failed", e));
181174
}
182-
183-
return leaderReadyFuture;
184175
}
185176

186177
@Override
@@ -196,12 +187,6 @@ public void close() {
196187
}
197188

198189
leaderCallbackExecutor.shutdownNow();
199-
200-
// Complete the future exceptionally if it hasn't been completed yet
201-
if (!leaderReadyFuture.isDone()) {
202-
leaderReadyFuture.completeExceptionally(
203-
new RuntimeException("Leader election closed for server " + serverId));
204-
}
205190
}
206191

207192
public boolean isLeader() {

0 commit comments

Comments
 (0)