Skip to content

Commit 9350bb2

Browse files
[coordinator] optimize coordinator event metric update logic (#1465)
Co-authored-by: ocean.wy <ocean.wy@alibaba-inc.com>
1 parent b458997 commit 9350bb2

File tree

2 files changed

+167
-108
lines changed

2 files changed

+167
-108
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 49 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.alibaba.fluss.metadata.TableInfo;
3535
import com.alibaba.fluss.metadata.TablePartition;
3636
import com.alibaba.fluss.metadata.TablePath;
37-
import com.alibaba.fluss.metrics.MetricNames;
3837
import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
3938
import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
4039
import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
@@ -60,7 +59,6 @@
6059
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
6160
import com.alibaba.fluss.server.coordinator.event.watcher.TableChangeWatcher;
6261
import com.alibaba.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
63-
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
6462
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaStateMachine;
6563
import com.alibaba.fluss.server.coordinator.statemachine.TableBucketStateMachine;
6664
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
@@ -136,13 +134,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
136134

137135
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
138136

139-
// metrics
140-
private volatile int tabletServerCount;
141-
private volatile int offlineBucketCount;
142-
private volatile int tableCount;
143-
private volatile int bucketCount;
144-
private volatile int replicasToDeleteCount;
145-
146137
public CoordinatorEventProcessor(
147138
ZooKeeperClient zooKeeperClient,
148139
CoordinatorMetadataCache serverMetadataCache,
@@ -198,18 +189,6 @@ public CoordinatorEventProcessor(
198189
this.lakeTableTieringManager = lakeTableTieringManager;
199190
this.coordinatorMetricGroup = coordinatorMetricGroup;
200191
this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
201-
registerMetrics();
202-
}
203-
204-
private void registerMetrics() {
205-
coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1);
206-
coordinatorMetricGroup.gauge(
207-
MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount);
208-
coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount);
209-
coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> bucketCount);
210-
coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> tableCount);
211-
coordinatorMetricGroup.gauge(
212-
MetricNames.REPLICAS_TO_DELETE_COUNT, () -> replicasToDeleteCount);
213192
}
214193

215194
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -242,7 +221,6 @@ public void startup() {
242221

243222
// start table manager
244223
tableManager.startup();
245-
updateMetrics();
246224

247225
// start the event manager which will then process the event
248226
coordinatorEventManager.start();
@@ -447,92 +425,56 @@ private void onShutdown() {
447425

448426
@Override
449427
public void process(CoordinatorEvent event) {
450-
try {
451-
if (event instanceof CreateTableEvent) {
452-
processCreateTable((CreateTableEvent) event);
453-
} else if (event instanceof CreatePartitionEvent) {
454-
processCreatePartition((CreatePartitionEvent) event);
455-
} else if (event instanceof DropTableEvent) {
456-
processDropTable((DropTableEvent) event);
457-
} else if (event instanceof DropPartitionEvent) {
458-
processDropPartition((DropPartitionEvent) event);
459-
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
460-
processNotifyLeaderAndIsrResponseReceivedEvent(
461-
(NotifyLeaderAndIsrResponseReceivedEvent) event);
462-
} else if (event instanceof DeleteReplicaResponseReceivedEvent) {
463-
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
464-
} else if (event instanceof NewTabletServerEvent) {
465-
processNewTabletServer((NewTabletServerEvent) event);
466-
} else if (event instanceof DeadTabletServerEvent) {
467-
processDeadTabletServer((DeadTabletServerEvent) event);
468-
} else if (event instanceof AdjustIsrReceivedEvent) {
469-
AdjustIsrReceivedEvent adjustIsrReceivedEvent = (AdjustIsrReceivedEvent) event;
470-
CompletableFuture<AdjustIsrResponse> callback =
471-
adjustIsrReceivedEvent.getRespCallback();
472-
completeFromCallable(
473-
callback,
474-
() ->
475-
makeAdjustIsrResponse(
476-
tryProcessAdjustIsr(
477-
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
478-
} else if (event instanceof CommitKvSnapshotEvent) {
479-
CommitKvSnapshotEvent commitKvSnapshotEvent = (CommitKvSnapshotEvent) event;
480-
CompletableFuture<CommitKvSnapshotResponse> callback =
481-
commitKvSnapshotEvent.getRespCallback();
482-
completeFromCallable(
483-
callback, () -> tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
484-
} else if (event instanceof CommitRemoteLogManifestEvent) {
485-
CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
486-
(CommitRemoteLogManifestEvent) event;
487-
completeFromCallable(
488-
commitRemoteLogManifestEvent.getRespCallback(),
489-
() -> tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
490-
} else if (event instanceof CommitLakeTableSnapshotEvent) {
491-
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
492-
(CommitLakeTableSnapshotEvent) event;
493-
completeFromCallable(
494-
commitLakeTableSnapshotEvent.getRespCallback(),
495-
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
496-
} else if (event instanceof AccessContextEvent) {
497-
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
498-
processAccessContext(accessContextEvent);
499-
} else {
500-
LOG.warn("Unknown event type: {}", event.getClass().getName());
501-
}
502-
} finally {
503-
updateMetrics();
504-
}
505-
}
506-
507-
private void updateMetrics() {
508-
tabletServerCount = coordinatorContext.getLiveTabletServers().size();
509-
tableCount = coordinatorContext.allTables().size();
510-
bucketCount = coordinatorContext.bucketLeaderAndIsr().size();
511-
offlineBucketCount = coordinatorContext.getOfflineBucketCount();
512-
513-
int replicasToDeletes = 0;
514-
// for replica in partitions to be deleted
515-
for (TablePartition tablePartition : coordinatorContext.getPartitionsToBeDeleted()) {
516-
for (TableBucketReplica replica :
517-
coordinatorContext.getAllReplicasForPartition(
518-
tablePartition.getTableId(), tablePartition.getPartitionId())) {
519-
replicasToDeletes =
520-
isReplicaToDelete(replica) ? replicasToDeletes + 1 : replicasToDeletes;
521-
}
522-
}
523-
// for replica in tables to be deleted
524-
for (long tableId : coordinatorContext.getTablesToBeDeleted()) {
525-
for (TableBucketReplica replica : coordinatorContext.getAllReplicasForTable(tableId)) {
526-
replicasToDeletes =
527-
isReplicaToDelete(replica) ? replicasToDeletes + 1 : replicasToDeletes;
528-
}
428+
if (event instanceof CreateTableEvent) {
429+
processCreateTable((CreateTableEvent) event);
430+
} else if (event instanceof CreatePartitionEvent) {
431+
processCreatePartition((CreatePartitionEvent) event);
432+
} else if (event instanceof DropTableEvent) {
433+
processDropTable((DropTableEvent) event);
434+
} else if (event instanceof DropPartitionEvent) {
435+
processDropPartition((DropPartitionEvent) event);
436+
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
437+
processNotifyLeaderAndIsrResponseReceivedEvent(
438+
(NotifyLeaderAndIsrResponseReceivedEvent) event);
439+
} else if (event instanceof DeleteReplicaResponseReceivedEvent) {
440+
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
441+
} else if (event instanceof NewTabletServerEvent) {
442+
processNewTabletServer((NewTabletServerEvent) event);
443+
} else if (event instanceof DeadTabletServerEvent) {
444+
processDeadTabletServer((DeadTabletServerEvent) event);
445+
} else if (event instanceof AdjustIsrReceivedEvent) {
446+
AdjustIsrReceivedEvent adjustIsrReceivedEvent = (AdjustIsrReceivedEvent) event;
447+
CompletableFuture<AdjustIsrResponse> callback =
448+
adjustIsrReceivedEvent.getRespCallback();
449+
completeFromCallable(
450+
callback,
451+
() ->
452+
makeAdjustIsrResponse(
453+
tryProcessAdjustIsr(
454+
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
455+
} else if (event instanceof CommitKvSnapshotEvent) {
456+
CommitKvSnapshotEvent commitKvSnapshotEvent = (CommitKvSnapshotEvent) event;
457+
CompletableFuture<CommitKvSnapshotResponse> callback =
458+
commitKvSnapshotEvent.getRespCallback();
459+
completeFromCallable(callback, () -> tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
460+
} else if (event instanceof CommitRemoteLogManifestEvent) {
461+
CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
462+
(CommitRemoteLogManifestEvent) event;
463+
completeFromCallable(
464+
commitRemoteLogManifestEvent.getRespCallback(),
465+
() -> tryProcessCommitRemoteLogManifest(commitRemoteLogManifestEvent));
466+
} else if (event instanceof CommitLakeTableSnapshotEvent) {
467+
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent =
468+
(CommitLakeTableSnapshotEvent) event;
469+
completeFromCallable(
470+
commitLakeTableSnapshotEvent.getRespCallback(),
471+
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
472+
} else if (event instanceof AccessContextEvent) {
473+
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
474+
processAccessContext(accessContextEvent);
475+
} else {
476+
LOG.warn("Unknown event type: {}", event.getClass().getName());
529477
}
530-
this.replicasToDeleteCount = replicasToDeletes;
531-
}
532-
533-
private boolean isReplicaToDelete(TableBucketReplica replica) {
534-
ReplicaState replicaState = coordinatorContext.getReplicaState(replica);
535-
return replicaState != null && replicaState != ReplicaDeletionSuccessful;
536478
}
537479

538480
private void processCreateTable(CreateTableEvent createTableEvent) {

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/CoordinatorEventManager.java

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package com.alibaba.fluss.server.coordinator.event;
1919

2020
import com.alibaba.fluss.annotation.Internal;
21+
import com.alibaba.fluss.metadata.TableBucketReplica;
22+
import com.alibaba.fluss.metadata.TablePartition;
2123
import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram;
2224
import com.alibaba.fluss.metrics.Histogram;
2325
import com.alibaba.fluss.metrics.MetricNames;
26+
import com.alibaba.fluss.server.coordinator.CoordinatorContext;
27+
import com.alibaba.fluss.server.coordinator.statemachine.ReplicaState;
2428
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
2529
import com.alibaba.fluss.utils.concurrent.ShutdownableThread;
2630

@@ -31,6 +35,7 @@
3135
import java.util.concurrent.locks.Lock;
3236
import java.util.concurrent.locks.ReentrantLock;
3337

38+
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
3439
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
3540

3641
/**
@@ -56,7 +61,15 @@ public final class CoordinatorEventManager implements EventManager {
5661
private Histogram eventProcessingTime;
5762
private Histogram eventQueueTime;
5863

64+
// Coordinator metrics moved from CoordinatorEventProcessor
65+
private volatile int tabletServerCount;
66+
private volatile int offlineBucketCount;
67+
private volatile int tableCount;
68+
private volatile int bucketCount;
69+
private volatile int replicasToDeleteCount;
70+
5971
private static final int WINDOW_SIZE = 100;
72+
private static final long METRICS_UPDATE_INTERVAL_MS = 5000; // 5 seconds
6073

6174
public CoordinatorEventManager(
6275
EventProcessor eventProcessor, CoordinatorMetricGroup coordinatorMetricGroup) {
@@ -77,6 +90,80 @@ private void registerMetrics() {
7790
coordinatorMetricGroup.histogram(
7891
MetricNames.EVENT_QUEUE_TIME_MS,
7992
new DescriptiveStatisticsHistogram(WINDOW_SIZE));
93+
94+
// Register coordinator metrics
95+
coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1);
96+
coordinatorMetricGroup.gauge(
97+
MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount);
98+
coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount);
99+
coordinatorMetricGroup.gauge(MetricNames.BUCKET_COUNT, () -> bucketCount);
100+
coordinatorMetricGroup.gauge(MetricNames.TABLE_COUNT, () -> tableCount);
101+
coordinatorMetricGroup.gauge(
102+
MetricNames.REPLICAS_TO_DELETE_COUNT, () -> replicasToDeleteCount);
103+
}
104+
105+
/** Not thread safety! this method can only be executed in the CoordinatorEventThread. */
106+
private void updateMetricsViaAccessContext() {
107+
// Create AccessContextEvent to safely access CoordinatorContext
108+
AccessContextEvent<MetricsData> accessContextEvent =
109+
new AccessContextEvent<>(
110+
context -> {
111+
int tabletServerCount = context.getLiveTabletServers().size();
112+
int tableCount = context.allTables().size();
113+
int bucketCount = context.bucketLeaderAndIsr().size();
114+
int offlineBucketCount = context.getOfflineBucketCount();
115+
116+
int replicasToDeletes = 0;
117+
// for replica in partitions to be deleted
118+
for (TablePartition tablePartition :
119+
context.getPartitionsToBeDeleted()) {
120+
for (TableBucketReplica replica :
121+
context.getAllReplicasForPartition(
122+
tablePartition.getTableId(),
123+
tablePartition.getPartitionId())) {
124+
replicasToDeletes =
125+
isReplicaToDelete(replica, context)
126+
? replicasToDeletes + 1
127+
: replicasToDeletes;
128+
}
129+
}
130+
// for replica in tables to be deleted
131+
for (long tableId : context.getTablesToBeDeleted()) {
132+
for (TableBucketReplica replica :
133+
context.getAllReplicasForTable(tableId)) {
134+
replicasToDeletes =
135+
isReplicaToDelete(replica, context)
136+
? replicasToDeletes + 1
137+
: replicasToDeletes;
138+
}
139+
}
140+
141+
return new MetricsData(
142+
tabletServerCount,
143+
tableCount,
144+
bucketCount,
145+
offlineBucketCount,
146+
replicasToDeletes);
147+
});
148+
149+
eventProcessor.process(accessContextEvent);
150+
151+
// Wait for the result and update local metrics
152+
try {
153+
MetricsData metricsData = accessContextEvent.getResultFuture().get();
154+
this.tabletServerCount = metricsData.tabletServerCount;
155+
this.tableCount = metricsData.tableCount;
156+
this.bucketCount = metricsData.bucketCount;
157+
this.offlineBucketCount = metricsData.offlineBucketCount;
158+
this.replicasToDeleteCount = metricsData.replicasToDeleteCount;
159+
} catch (Exception e) {
160+
LOG.warn("Failed to update metrics via AccessContextEvent", e);
161+
}
162+
}
163+
164+
private boolean isReplicaToDelete(TableBucketReplica replica, CoordinatorContext context) {
165+
ReplicaState replicaState = context.getReplicaState(replica);
166+
return replicaState != null && replicaState != ReplicaDeletionSuccessful;
80167
}
81168

82169
public void start() {
@@ -123,12 +210,21 @@ public void clearAndPut(CoordinatorEvent event) {
123210

124211
private class CoordinatorEventThread extends ShutdownableThread {
125212

213+
private long lastMetricsUpdateTime = System.currentTimeMillis();
214+
126215
public CoordinatorEventThread(String name) {
127216
super(name, false);
128217
}
129218

130219
@Override
131220
public void doWork() throws Exception {
221+
// Check if it's time to update metrics (before taking event from queue)
222+
long currentTime = System.currentTimeMillis();
223+
if (currentTime - lastMetricsUpdateTime >= METRICS_UPDATE_INTERVAL_MS) {
224+
updateMetricsViaAccessContext();
225+
lastMetricsUpdateTime = currentTime;
226+
}
227+
132228
QueuedEvent queuedEvent = queue.take();
133229
CoordinatorEvent coordinatorEvent = queuedEvent.event;
134230

@@ -144,7 +240,7 @@ public void doWork() throws Exception {
144240
eventProcessor.process(coordinatorEvent);
145241
}
146242
} catch (Throwable e) {
147-
log.error("Uncaught error processing event {}.", coordinatorEvent, e);
243+
LOG.error("Uncaught error processing event {}.", coordinatorEvent, e);
148244
} finally {
149245
long costTimeMs = System.currentTimeMillis() - eventStartTimeMs;
150246
eventProcessingTime.update(costTimeMs);
@@ -166,4 +262,25 @@ public QueuedEvent(CoordinatorEvent event, long enqueueTimeMs) {
166262
this.enqueueTimeMs = enqueueTimeMs;
167263
}
168264
}
265+
266+
private static class MetricsData {
267+
private final int tabletServerCount;
268+
private final int tableCount;
269+
private final int bucketCount;
270+
private final int offlineBucketCount;
271+
private final int replicasToDeleteCount;
272+
273+
public MetricsData(
274+
int tabletServerCount,
275+
int tableCount,
276+
int bucketCount,
277+
int offlineBucketCount,
278+
int replicasToDeleteCount) {
279+
this.tabletServerCount = tabletServerCount;
280+
this.tableCount = tableCount;
281+
this.bucketCount = bucketCount;
282+
this.offlineBucketCount = offlineBucketCount;
283+
this.replicasToDeleteCount = replicasToDeleteCount;
284+
}
285+
}
169286
}

0 commit comments

Comments
 (0)