Skip to content

Commit 2f8bdee

Browse files
committed
fix
1 parent 9df5609 commit 2f8bdee

File tree

9 files changed

+134
-43
lines changed

9 files changed

+134
-43
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,13 @@ public long getBackendId() throws UserException {
266266
return this.backendId;
267267
}
268268

269+
// just record read,write ops
270+
public long getBackendIdRecordAccessInfo() throws UserException {
271+
// Use async version to avoid blocking getBackendIdImpl which is called frequently
272+
TabletAccessStats.getInstance().recordAccessAsync(getId());
273+
return getBackendId();
274+
}
275+
269276
// just for ut
270277
public void setBackendId(long backendId) {
271278
this.backendId = backendId;

fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private Multimap<Long, Long> getNormalReplicaBackendPathMapImpl(String beEndpoin
278278
// return map of (BE id -> path hash) of normal replicas
279279
// for load plan.
280280
public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws UserException {
281-
return getNormalReplicaBackendPathMapImpl(null, (rep, be) -> rep.getBackendId());
281+
return getNormalReplicaBackendPathMapImpl(null, (rep, be) -> rep.getBackendIdAndRecordAccessInfo());
282282
}
283283

284284
// for cloud mode without ConnectContext. use BE IP to find replica
@@ -312,7 +312,15 @@ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Lon
312312
continue;
313313
}
314314

315-
Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendIdWithoutException());
315+
long beId = -1;
316+
try {
317+
beId = replica.getBackendIdAndRecordAccessInfo();
318+
} catch (UserException e) {
319+
if (LOG.isDebugEnabled()) {
320+
LOG.debug("getBackendIdWithoutException: ", e);
321+
}
322+
}
323+
Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(beId);
316324
ReplicaState state = replica.getState();
317325
// if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state.
318326
// should ignore this case.

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletAccessStats.java renamed to fe/fe-core/src/main/java/org/apache/doris/catalog/TabletAccessStats.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package org.apache.doris.cloud.catalog;
18+
package org.apache.doris.catalog;
1919

20-
import org.apache.doris.catalog.Env;
21-
import org.apache.doris.catalog.TabletInvertedIndex;
2220
import org.apache.doris.common.Config;
2321
import org.apache.doris.common.util.MasterDaemon;
2422

@@ -38,10 +36,15 @@
3836
import java.util.concurrent.atomic.AtomicLongArray;
3937
import java.util.stream.Collectors;
4038

41-
public class CloudTabletAccessStats {
42-
private static final Logger LOG = LogManager.getLogger(CloudTabletAccessStats.class);
39+
public class TabletAccessStats {
40+
private static final Logger LOG = LogManager.getLogger(TabletAccessStats.class);
4341

44-
private static volatile CloudTabletAccessStats instance;
42+
private static volatile TabletAccessStats instance;
43+
44+
// Sort active tablets by accessCount desc, then lastAccessTime desc
45+
private static final Comparator<AccessStatsResult> TOPN_ACTIVE_TABLET_COMPARATOR =
46+
Comparator.comparingLong((AccessStatsResult r) -> r.accessCount).reversed()
47+
.thenComparing(Comparator.comparingLong((AccessStatsResult r) -> r.lastAccessTime).reversed());
4548

4649
// Time window in milliseconds (default: 1 hour)
4750
private final long timeWindowMs;
@@ -186,7 +189,7 @@ private static class AccessStatsShard {
186189
// Default cleanup interval: 5 minutes
187190
private static final long DEFAULT_CLEANUP_INTERVAL_SECOND = 300L;
188191

189-
private CloudTabletAccessStats() {
192+
private TabletAccessStats() {
190193
this.timeWindowMs = DEFAULT_TIME_WINDOW_SECOND * 1000L;
191194
this.bucketSizeMs = DEFAULT_BUCKET_SIZE_SECOND * 1000L;
192195
this.numBuckets = (int) (DEFAULT_TIME_WINDOW_SECOND / DEFAULT_BUCKET_SIZE_SECOND); // 60 buckets
@@ -223,11 +226,11 @@ private CloudTabletAccessStats() {
223226
}
224227
}
225228

226-
public static CloudTabletAccessStats getInstance() {
229+
public static TabletAccessStats getInstance() {
227230
if (instance == null) {
228-
synchronized (CloudTabletAccessStats.class) {
231+
synchronized (TabletAccessStats.class) {
229232
if (instance == null) {
230-
instance = new CloudTabletAccessStats();
233+
instance = new TabletAccessStats();
231234
}
232235
}
233236
}
@@ -246,15 +249,15 @@ private int getShardIndex(long id) {
246249
* This method is non-blocking and should be used in high-frequency call paths
247250
* to avoid blocking the caller thread.
248251
*/
249-
public void recordAccessAsync(long replicaId, long tableId, long partitionId, long indexId) {
252+
public void recordAccessAsync(long replicaId) {
250253
if (!Config.enable_cloud_active_tablet_priority_scheduling || asyncExecutor == null) {
251254
return;
252255
}
253256

254257
try {
255258
asyncExecutor.execute(() -> {
256259
try {
257-
recordAccess(replicaId, tableId, partitionId, indexId);
260+
recordAccess(replicaId);
258261
} catch (Exception e) {
259262
// Log but don't propagate exception to avoid affecting caller
260263
LOG.debug("Failed to record access asynchronously for replicaId={}", replicaId, e);
@@ -272,7 +275,7 @@ public void recordAccessAsync(long replicaId, long tableId, long partitionId, lo
272275
/**
273276
* Record an access to a replica
274277
*/
275-
public void recordAccess(long replicaId, long tableId, long partitionId, long indexId) {
278+
public void recordAccess(long replicaId) {
276279
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
277280
return;
278281
}
@@ -298,21 +301,24 @@ public void recordAccess(long replicaId, long tableId, long partitionId, long in
298301
/**
299302
* Get access count for a tablet within the time window
300303
*/
301-
public long getTabletAccessCount(long tabletId) {
304+
public AccessStatsResult getTabletAccessInfo(long tabletId) {
302305
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
303-
return 0;
306+
return null;
304307
}
305308

306309
int shardIndex = getShardIndex(tabletId);
307310
AccessStatsShard shard = shards[shardIndex];
308311
SlidingWindowCounter counter = shard.tabletCounters.get(tabletId);
309312

310313
if (counter == null) {
311-
return 0;
314+
return null;
312315
}
313316

314317
long currentTime = System.currentTimeMillis();
315-
return counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets);
318+
return new AccessStatsResult(
319+
tabletId,
320+
counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets),
321+
counter.getLastAccessTime());
316322
}
317323

318324
/**
@@ -328,6 +334,15 @@ public AccessStatsResult(long id, long accessCount, long lastAccessTime) {
328334
this.accessCount = accessCount;
329335
this.lastAccessTime = lastAccessTime;
330336
}
337+
338+
@Override
339+
public String toString() {
340+
return "AccessStatsResult{"
341+
+ "id=" + id
342+
+ ", accessCount=" + accessCount
343+
+ ", lastAccessTime=" + lastAccessTime
344+
+ '}';
345+
}
331346
}
332347

333348
/**
@@ -360,9 +375,7 @@ public List<AccessStatsResult> getTopNActiveTablets(int topN) {
360375
}
361376

362377
// Sort and return top N
363-
results.sort(Comparator
364-
.comparingLong((AccessStatsResult r) -> r.accessCount).reversed()
365-
.thenComparingLong((AccessStatsResult r) -> r.lastAccessTime).reversed());
378+
results.sort(TOPN_ACTIVE_TABLET_COMPARATOR);
366379

367380
return results.stream().limit(topN).collect(Collectors.toList());
368381
}
@@ -421,7 +434,7 @@ public String getStatsSummary() {
421434
}
422435

423436
return String.format(
424-
"CloudTabletAccessStats{timeWindow=%ds, bucketSize=%ds, numBuckets=%d, "
437+
"TabletAccessStats{timeWindow=%ds, bucketSize=%ds, numBuckets=%d, "
425438
+ "shardSize=%d, activeTablets=%d, "
426439
+ "totalTabletAccess=%d, totalAccessCount=%d}",
427440
timeWindowMs / 1000, bucketSizeMs / 1000, numBuckets, SHARD_SIZE,
@@ -444,7 +457,10 @@ protected void runAfterCatalogReady() {
444457

445458
try {
446459
cleanupExpiredRecords();
447-
getStatsSummary();
460+
if (LOG.isDebugEnabled()) {
461+
LOG.debug("tablet stat = {}, top 10 active tablet = {}",
462+
getStatsSummary(), getTopNActiveTablets(10));
463+
}
448464
} catch (Exception e) {
449465
LOG.warn("Failed to cleanup expired access records", e);
450466
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.catalog.Env;
2323
import org.apache.doris.catalog.Partition;
2424
import org.apache.doris.catalog.Replica;
25+
import org.apache.doris.catalog.TabletAccessStats;
2526
import org.apache.doris.cloud.proto.Cloud;
2627
import org.apache.doris.cloud.qe.ComputeGroupException;
2728
import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -300,7 +301,7 @@ private long getBackendIdImpl(String clusterId) throws ComputeGroupException {
300301
return -1L;
301302
}
302303
// Use async version to avoid blocking getBackendIdImpl which is called frequently
303-
CloudTabletAccessStats.getInstance().recordAccessAsync(getId(), getTableId(), getPartitionId(), getIndexId());
304+
TabletAccessStats.getInstance().recordAccessAsync(getId());
304305

305306
if (isColocated()) {
306307
return getColocatedBeId(clusterId);

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.catalog.Replica;
2727
import org.apache.doris.catalog.Table;
2828
import org.apache.doris.catalog.Tablet;
29+
import org.apache.doris.catalog.TabletAccessStats;
2930
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
3031
import org.apache.doris.cloud.proto.Cloud;
3132
import org.apache.doris.cloud.qe.ComputeGroupException;
@@ -136,6 +137,8 @@ public class CloudTabletRebalancer extends MasterDaemon {
136137

137138
private BalanceTypeEnum globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
138139

140+
private Set<Long> activeTabletIds = new HashSet<>();
141+
139142
/**
140143
* Get the current balance type for a compute group, falling back to global balance type if not found
141144
*/
@@ -451,6 +454,7 @@ protected void runAfterCatalogReady() {
451454

452455
LOG.info("cloud tablet rebalance begin");
453456
long start = System.currentTimeMillis();
457+
activeTabletIds = getActiveTabletIds();
454458
globalBalanceTypeEnum = BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
455459

456460
buildClusterToBackendMap();
@@ -481,7 +485,7 @@ protected void runAfterCatalogReady() {
481485
balanceEnd += (Config.cloud_tablet_rebalancer_interval_second + 10L) * 1000L;
482486
}
483487
if (balanceEnd - start > Config.cloud_tablet_rebalancer_interval_second * 1000L) {
484-
sleepSeconds = 0L;
488+
sleepSeconds = 1L;
485489
}
486490
setInterval(sleepSeconds * 1000L);
487491
LOG.info("finished to rebalancer. cost: {} ms, rebalancer sche interval {} s",
@@ -1476,9 +1480,9 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, Set<Tablet>
14761480
currentBalanceType, beNum, totalTabletsNum, avgNum, transferNum);
14771481

14781482
// Prefer scheduling active tablets (recently accessed)
1479-
final Set<Long> activeTabletIds = getActiveTabletIds();
14801483
final Map<Long, List<Tablet>> activeTabletsByBeCache =
1481-
activeTabletIds.isEmpty() ? null : new HashMap<>();
1484+
this.activeTabletIds.isEmpty() ? null : new HashMap<>();
1485+
final Set<Long> pickedTabletIds = new HashSet<>();
14821486

14831487
for (int i = 0; i < transferNum; i++) {
14841488
TransferPairInfo pairInfo = new TransferPairInfo();
@@ -1492,11 +1496,13 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, Set<Tablet>
14921496
long destBe = pairInfo.destBe;
14931497

14941498
Tablet pickedTablet = pickTabletPreferActive(srcBe, beToTablets.get(srcBe),
1495-
activeTabletIds, activeTabletsByBeCache);
1499+
this.activeTabletIds, activeTabletsByBeCache, pickedTabletIds);
14961500
if (pickedTablet == null) {
14971501
continue; // No tablet to pick
14981502
}
14991503

1504+
pickedTabletIds.add(pickedTablet.getId());
1505+
15001506
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
15011507
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
15021508

@@ -1562,13 +1568,13 @@ private void updateBalanceStatus(BalanceType balanceType) {
15621568
private Set<Long> getActiveTabletIds() {
15631569
try {
15641570
// get topN active tablets
1565-
List<CloudTabletAccessStats.AccessStatsResult> active =
1566-
CloudTabletAccessStats.getInstance().getTopNActiveTablets(10000);
1571+
List<TabletAccessStats.AccessStatsResult> active =
1572+
TabletAccessStats.getInstance().getTopNActiveTablets(10000);
15671573
if (active == null || active.isEmpty()) {
15681574
return Collections.emptySet();
15691575
}
15701576
Set<Long> ids = new HashSet<>(active.size() * 2);
1571-
for (CloudTabletAccessStats.AccessStatsResult r : active) {
1577+
for (TabletAccessStats.AccessStatsResult r : active) {
15721578
ids.add(r.id);
15731579
}
15741580
return ids;
@@ -1582,24 +1588,38 @@ private Set<Long> getActiveTabletIds() {
15821588

15831589
// choose active tablet first to re-balance, otherwise random pick
15841590
private Tablet pickTabletPreferActive(long srcBe, Set<Tablet> tablets, Set<Long> activeTabletIds,
1585-
Map<Long, List<Tablet>> activeTabletsByBeCache) {
1591+
Map<Long, List<Tablet>> activeTabletsByBeCache,
1592+
Set<Long> pickedTabletIds) {
15861593
if (tablets == null || tablets.isEmpty()) {
15871594
return null;
15881595
}
1596+
1597+
Set<Tablet> availableTablets = tablets.stream()
1598+
.filter(t -> !pickedTabletIds.contains(t.getId()))
1599+
.collect(Collectors.toSet());
1600+
1601+
if (availableTablets.isEmpty()) {
1602+
return null; // No available tablets
1603+
}
1604+
15891605
if (activeTabletIds == null || activeTabletIds.isEmpty() || activeTabletsByBeCache == null) {
1590-
return pickRandomTablet(tablets);
1606+
return pickRandomTablet(availableTablets);
15911607
}
15921608

15931609
// Compute srcBe active tablets once, cache for subsequent iterations in this balanceImpl call.
15941610
List<Tablet> activeInSrc = activeTabletsByBeCache.computeIfAbsent(srcBe, k -> tablets.stream()
15951611
.filter(t -> activeTabletIds.contains(t.getId()))
15961612
.collect(Collectors.toList()));
15971613

1598-
if (activeInSrc != null && !activeInSrc.isEmpty()) {
1599-
return activeInSrc.get(rand.nextInt(activeInSrc.size()));
1614+
List<Tablet> availableActiveInSrc = activeInSrc.stream()
1615+
.filter(t -> !pickedTabletIds.contains(t.getId()))
1616+
.collect(Collectors.toList());
1617+
1618+
if (availableActiveInSrc != null && !availableActiveInSrc.isEmpty()) {
1619+
return availableActiveInSrc.get(rand.nextInt(availableActiveInSrc.size()));
16001620
}
16011621

1602-
return pickRandomTablet(tablets);
1622+
return pickRandomTablet(availableTablets);
16031623
}
16041624

16051625
private Tablet pickRandomTablet(Set<Tablet> tablets) {

fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.catalog.OlapTable;
2323
import org.apache.doris.catalog.Replica;
2424
import org.apache.doris.catalog.Tablet;
25+
import org.apache.doris.catalog.TabletAccessStats;
2526
import org.apache.doris.catalog.TabletMeta;
2627
import org.apache.doris.cloud.catalog.CloudReplica;
2728
import org.apache.doris.common.AnalysisException;
@@ -51,7 +52,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
5152
.add("IsUserDrop")
5253
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
5354
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
54-
.add("CooldownMetaId").add("QueryHits");
55+
.add("CooldownMetaId").add("QueryHits").add("AccessCount1H").add("LastAccessTime");
5556

5657
if (Config.isCloudMode()) {
5758
builder.add("PrimaryBackendId");
@@ -91,6 +92,14 @@ public ProcResult fetchResult() throws AnalysisException {
9192
}
9293

9394
for (Replica replica : replicas) {
95+
TabletAccessStats.AccessStatsResult asr = TabletAccessStats.getInstance()
96+
.getTabletAccessInfo(tabletId);
97+
long accessCount1H = 0;
98+
long lastAccessTime = 0;
99+
if (asr != null) {
100+
accessCount1H = asr.accessCount;
101+
lastAccessTime = asr.lastAccessTime;
102+
}
94103
long beId = replica.getBackendIdWithoutException();
95104
Backend be = backendMap.get(beId);
96105
String host = (be == null ? Backend.DUMMY_IP : be.getHost());
@@ -138,7 +147,10 @@ public ProcResult fetchResult() throws AnalysisException {
138147
compactionUrl,
139148
String.valueOf(tablet.getCooldownConf().first),
140149
cooldownMetaId,
141-
String.valueOf(queryHits));
150+
String.valueOf(queryHits),
151+
String.valueOf(accessCount1H),
152+
String.valueOf(lastAccessTime)
153+
);
142154
if (Config.isCloudMode()) {
143155
replicaInfo.add(String.valueOf(((CloudReplica) replica).getPrimaryBackendId()));
144156
}

0 commit comments

Comments
 (0)