Skip to content

Commit 91eef60

Browse files
committed
fix
1 parent 9df5609 commit 91eef60

File tree

7 files changed

+111
-36
lines changed

7 files changed

+111
-36
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ public long getBackendIdWithoutException() {
263263
}
264264

265265
public long getBackendId() throws UserException {
266+
// Use async version to avoid blocking getBackendIdImpl which is called frequently
267+
TabletAccessStats.getInstance().recordAccessAsync(getId());
266268
return this.backendId;
267269
}
268270

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletAccessStats.java renamed to fe/fe-core/src/main/java/org/apache/doris/catalog/TabletAccessStats.java

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package org.apache.doris.cloud.catalog;
18+
package org.apache.doris.catalog;
1919

20-
import org.apache.doris.catalog.Env;
21-
import org.apache.doris.catalog.TabletInvertedIndex;
2220
import org.apache.doris.common.Config;
2321
import org.apache.doris.common.util.MasterDaemon;
2422

@@ -38,10 +36,15 @@
3836
import java.util.concurrent.atomic.AtomicLongArray;
3937
import java.util.stream.Collectors;
4038

41-
public class CloudTabletAccessStats {
42-
private static final Logger LOG = LogManager.getLogger(CloudTabletAccessStats.class);
39+
public class TabletAccessStats {
40+
private static final Logger LOG = LogManager.getLogger(TabletAccessStats.class);
4341

44-
private static volatile CloudTabletAccessStats instance;
42+
private static volatile TabletAccessStats instance;
43+
44+
// Sort active tablets by accessCount desc, then lastAccessTime desc
45+
private static final Comparator<AccessStatsResult> TOPN_ACTIVE_TABLET_COMPARATOR =
46+
Comparator.comparingLong((AccessStatsResult r) -> r.accessCount).reversed()
47+
.thenComparing(Comparator.comparingLong((AccessStatsResult r) -> r.lastAccessTime).reversed());
4548

4649
// Time window in milliseconds (default: 1 hour)
4750
private final long timeWindowMs;
@@ -186,7 +189,7 @@ private static class AccessStatsShard {
186189
// Default cleanup interval: 5 minutes
187190
private static final long DEFAULT_CLEANUP_INTERVAL_SECOND = 300L;
188191

189-
private CloudTabletAccessStats() {
192+
private TabletAccessStats() {
190193
this.timeWindowMs = DEFAULT_TIME_WINDOW_SECOND * 1000L;
191194
this.bucketSizeMs = DEFAULT_BUCKET_SIZE_SECOND * 1000L;
192195
this.numBuckets = (int) (DEFAULT_TIME_WINDOW_SECOND / DEFAULT_BUCKET_SIZE_SECOND); // 60 buckets
@@ -223,11 +226,11 @@ private CloudTabletAccessStats() {
223226
}
224227
}
225228

226-
public static CloudTabletAccessStats getInstance() {
229+
public static TabletAccessStats getInstance() {
227230
if (instance == null) {
228-
synchronized (CloudTabletAccessStats.class) {
231+
synchronized (TabletAccessStats.class) {
229232
if (instance == null) {
230-
instance = new CloudTabletAccessStats();
233+
instance = new TabletAccessStats();
231234
}
232235
}
233236
}
@@ -246,15 +249,15 @@ private int getShardIndex(long id) {
246249
* This method is non-blocking and should be used in high-frequency call paths
247250
* to avoid blocking the caller thread.
248251
*/
249-
public void recordAccessAsync(long replicaId, long tableId, long partitionId, long indexId) {
252+
public void recordAccessAsync(long replicaId) {
250253
if (!Config.enable_cloud_active_tablet_priority_scheduling || asyncExecutor == null) {
251254
return;
252255
}
253256

254257
try {
255258
asyncExecutor.execute(() -> {
256259
try {
257-
recordAccess(replicaId, tableId, partitionId, indexId);
260+
recordAccess(replicaId);
258261
} catch (Exception e) {
259262
// Log but don't propagate exception to avoid affecting caller
260263
LOG.debug("Failed to record access asynchronously for replicaId={}", replicaId, e);
@@ -272,7 +275,7 @@ public void recordAccessAsync(long replicaId, long tableId, long partitionId, lo
272275
/**
273276
* Record an access to a replica
274277
*/
275-
public void recordAccess(long replicaId, long tableId, long partitionId, long indexId) {
278+
public void recordAccess(long replicaId) {
276279
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
277280
return;
278281
}
@@ -297,22 +300,26 @@ public void recordAccess(long replicaId, long tableId, long partitionId, long in
297300

298301
/**
299302
* Get access count for a tablet within the time window
303+
* return
300304
*/
301-
public long getTabletAccessCount(long tabletId) {
305+
public AccessStatsResult getTabletAccessInfo(long tabletId) {
302306
if (!Config.enable_cloud_active_tablet_priority_scheduling) {
303-
return 0;
307+
return null;
304308
}
305309

306310
int shardIndex = getShardIndex(tabletId);
307311
AccessStatsShard shard = shards[shardIndex];
308312
SlidingWindowCounter counter = shard.tabletCounters.get(tabletId);
309313

310314
if (counter == null) {
311-
return 0;
315+
return null;
312316
}
313317

314318
long currentTime = System.currentTimeMillis();
315-
return counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets);
319+
return new AccessStatsResult(
320+
tabletId,
321+
counter.getCount(currentTime, timeWindowMs, bucketSizeMs, numBuckets),
322+
counter.getLastAccessTime());
316323
}
317324

318325
/**
@@ -328,6 +335,15 @@ public AccessStatsResult(long id, long accessCount, long lastAccessTime) {
328335
this.accessCount = accessCount;
329336
this.lastAccessTime = lastAccessTime;
330337
}
338+
339+
@Override
340+
public String toString() {
341+
return "AccessStatsResult{"
342+
+ "id=" + id
343+
+ ", accessCount=" + accessCount
344+
+ ", lastAccessTime=" + lastAccessTime
345+
+ '}';
346+
}
331347
}
332348

333349
/**
@@ -360,9 +376,7 @@ public List<AccessStatsResult> getTopNActiveTablets(int topN) {
360376
}
361377

362378
// Sort and return top N
363-
results.sort(Comparator
364-
.comparingLong((AccessStatsResult r) -> r.accessCount).reversed()
365-
.thenComparingLong((AccessStatsResult r) -> r.lastAccessTime).reversed());
379+
results.sort(TOPN_ACTIVE_TABLET_COMPARATOR);
366380

367381
return results.stream().limit(topN).collect(Collectors.toList());
368382
}
@@ -421,7 +435,7 @@ public String getStatsSummary() {
421435
}
422436

423437
return String.format(
424-
"CloudTabletAccessStats{timeWindow=%ds, bucketSize=%ds, numBuckets=%d, "
438+
"TabletAccessStats{timeWindow=%ds, bucketSize=%ds, numBuckets=%d, "
425439
+ "shardSize=%d, activeTablets=%d, "
426440
+ "totalTabletAccess=%d, totalAccessCount=%d}",
427441
timeWindowMs / 1000, bucketSizeMs / 1000, numBuckets, SHARD_SIZE,
@@ -444,7 +458,10 @@ protected void runAfterCatalogReady() {
444458

445459
try {
446460
cleanupExpiredRecords();
447-
getStatsSummary();
461+
if (LOG.isDebugEnabled()) {
462+
LOG.debug("tablet stat = {}, top 10 active tablet = {}",
463+
getStatsSummary(), getTopNActiveTablets(10));
464+
}
448465
} catch (Exception e) {
449466
LOG.warn("Failed to cleanup expired access records", e);
450467
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.catalog.Env;
2323
import org.apache.doris.catalog.Partition;
2424
import org.apache.doris.catalog.Replica;
25+
import org.apache.doris.catalog.TabletAccessStats;
2526
import org.apache.doris.cloud.proto.Cloud;
2627
import org.apache.doris.cloud.qe.ComputeGroupException;
2728
import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -300,7 +301,7 @@ private long getBackendIdImpl(String clusterId) throws ComputeGroupException {
300301
return -1L;
301302
}
302303
// Use async version to avoid blocking getBackendIdImpl which is called frequently
303-
CloudTabletAccessStats.getInstance().recordAccessAsync(getId(), getTableId(), getPartitionId(), getIndexId());
304+
TabletAccessStats.getInstance().recordAccessAsync(getId());
304305

305306
if (isColocated()) {
306307
return getColocatedBeId(clusterId);

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.catalog.Replica;
2727
import org.apache.doris.catalog.Table;
2828
import org.apache.doris.catalog.Tablet;
29+
import org.apache.doris.catalog.TabletAccessStats;
2930
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
3031
import org.apache.doris.cloud.proto.Cloud;
3132
import org.apache.doris.cloud.qe.ComputeGroupException;
@@ -1479,6 +1480,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, Set<Tablet>
14791480
final Set<Long> activeTabletIds = getActiveTabletIds();
14801481
final Map<Long, List<Tablet>> activeTabletsByBeCache =
14811482
activeTabletIds.isEmpty() ? null : new HashMap<>();
1483+
final Set<Long> pickedTabletIds = new HashSet<>();
14821484

14831485
for (int i = 0; i < transferNum; i++) {
14841486
TransferPairInfo pairInfo = new TransferPairInfo();
@@ -1492,11 +1494,13 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, Set<Tablet>
14921494
long destBe = pairInfo.destBe;
14931495

14941496
Tablet pickedTablet = pickTabletPreferActive(srcBe, beToTablets.get(srcBe),
1495-
activeTabletIds, activeTabletsByBeCache);
1497+
activeTabletIds, activeTabletsByBeCache, pickedTabletIds);
14961498
if (pickedTablet == null) {
14971499
continue; // No tablet to pick
14981500
}
14991501

1502+
pickedTabletIds.add(pickedTablet.getId());
1503+
15001504
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
15011505
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
15021506

@@ -1562,13 +1566,13 @@ private void updateBalanceStatus(BalanceType balanceType) {
15621566
private Set<Long> getActiveTabletIds() {
15631567
try {
15641568
// get topN active tablets
1565-
List<CloudTabletAccessStats.AccessStatsResult> active =
1566-
CloudTabletAccessStats.getInstance().getTopNActiveTablets(10000);
1569+
List<TabletAccessStats.AccessStatsResult> active =
1570+
TabletAccessStats.getInstance().getTopNActiveTablets(10000);
15671571
if (active == null || active.isEmpty()) {
15681572
return Collections.emptySet();
15691573
}
15701574
Set<Long> ids = new HashSet<>(active.size() * 2);
1571-
for (CloudTabletAccessStats.AccessStatsResult r : active) {
1575+
for (TabletAccessStats.AccessStatsResult r : active) {
15721576
ids.add(r.id);
15731577
}
15741578
return ids;
@@ -1582,24 +1586,38 @@ private Set<Long> getActiveTabletIds() {
15821586

15831587
// choose active tablet first to re-balance, otherwise random pick
15841588
private Tablet pickTabletPreferActive(long srcBe, Set<Tablet> tablets, Set<Long> activeTabletIds,
1585-
Map<Long, List<Tablet>> activeTabletsByBeCache) {
1589+
Map<Long, List<Tablet>> activeTabletsByBeCache,
1590+
Set<Long> pickedTabletIds) {
15861591
if (tablets == null || tablets.isEmpty()) {
15871592
return null;
15881593
}
1594+
1595+
Set<Tablet> availableTablets = tablets.stream()
1596+
.filter(t -> !pickedTabletIds.contains(t.getId()))
1597+
.collect(Collectors.toSet());
1598+
1599+
if (availableTablets.isEmpty()) {
1600+
return null; // No available tablets
1601+
}
1602+
15891603
if (activeTabletIds == null || activeTabletIds.isEmpty() || activeTabletsByBeCache == null) {
1590-
return pickRandomTablet(tablets);
1604+
return pickRandomTablet(availableTablets);
15911605
}
15921606

15931607
// Compute srcBe active tablets once, cache for subsequent iterations in this balanceImpl call.
15941608
List<Tablet> activeInSrc = activeTabletsByBeCache.computeIfAbsent(srcBe, k -> tablets.stream()
15951609
.filter(t -> activeTabletIds.contains(t.getId()))
15961610
.collect(Collectors.toList()));
15971611

1598-
if (activeInSrc != null && !activeInSrc.isEmpty()) {
1599-
return activeInSrc.get(rand.nextInt(activeInSrc.size()));
1612+
List<Tablet> availableActiveInSrc = activeInSrc.stream()
1613+
.filter(t -> !pickedTabletIds.contains(t.getId()))
1614+
.collect(Collectors.toList());
1615+
1616+
if (availableActiveInSrc != null && !availableActiveInSrc.isEmpty()) {
1617+
return availableActiveInSrc.get(rand.nextInt(availableActiveInSrc.size()));
16001618
}
16011619

1602-
return pickRandomTablet(tablets);
1620+
return pickRandomTablet(availableTablets);
16031621
}
16041622

16051623
private Tablet pickRandomTablet(Set<Tablet> tablets) {

fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.doris.catalog.OlapTable;
2323
import org.apache.doris.catalog.Replica;
2424
import org.apache.doris.catalog.Tablet;
25+
import org.apache.doris.catalog.TabletAccessStats;
2526
import org.apache.doris.catalog.TabletMeta;
2627
import org.apache.doris.cloud.catalog.CloudReplica;
2728
import org.apache.doris.common.AnalysisException;
@@ -51,7 +52,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
5152
.add("IsUserDrop")
5253
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
5354
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
54-
.add("CooldownMetaId").add("QueryHits");
55+
.add("CooldownMetaId").add("QueryHits").add("AccessCount1H").add("LastAccessTime");
5556

5657
if (Config.isCloudMode()) {
5758
builder.add("PrimaryBackendId");
@@ -117,6 +118,13 @@ public ProcResult fetchResult() throws AnalysisException {
117118
if (Config.enable_query_hit_stats) {
118119
queryHits = QueryStatsUtil.getMergedReplicaStats(replica.getId());
119120
}
121+
TabletAccessStats asr = TabletAccessStats.getInstance();
122+
long accessCount1H = 0;
123+
long lastAccessTime = 0;
124+
if (asr != null) {
125+
accessCount1H = TabletAccessStats.getInstance().getTabletAccessInfo(tabletId).accessCount;
126+
lastAccessTime = TabletAccessStats.getInstance().getTabletAccessInfo(tabletId).lastAccessTime;
127+
}
120128
List<String> replicaInfo = Lists.newArrayList(String.valueOf(replica.getId()),
121129
String.valueOf(beId),
122130
String.valueOf(replica.getVersion()),
@@ -138,7 +146,10 @@ public ProcResult fetchResult() throws AnalysisException {
138146
compactionUrl,
139147
String.valueOf(tablet.getCooldownConf().first),
140148
cooldownMetaId,
141-
String.valueOf(queryHits));
149+
String.valueOf(queryHits),
150+
String.valueOf(accessCount1H),
151+
String.valueOf(lastAccessTime)
152+
);
142153
if (Config.isCloudMode()) {
143154
replicaInfo.add(String.valueOf(((CloudReplica) replica).getPrimaryBackendId()));
144155
}

fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.catalog.Replica;
2424
import org.apache.doris.catalog.Table;
2525
import org.apache.doris.catalog.Tablet;
26+
import org.apache.doris.catalog.TabletAccessStats;
2627
import org.apache.doris.cloud.catalog.CloudReplica;
2728
import org.apache.doris.common.AnalysisException;
2829
import org.apache.doris.common.Config;
@@ -56,7 +57,8 @@ public class TabletsProcDir implements ProcDirInterface {
5657
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
5758
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
5859
.add("LstConsistencyCheckTime").add("CheckVersion")
59-
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
60+
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("AccessCount1H")
61+
.add("LastAccessTime").add("PathHash").add("Path")
6062
.add("MetaUrl").add("CompactionStatus")
6163
.add("CooldownReplicaId").add("CooldownMetaId");
6264

@@ -128,6 +130,8 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
128130
tabletInfo.add(-1); // visible version count
129131
tabletInfo.add(-1); // total version count
130132
tabletInfo.add(0L); // query hits
133+
tabletInfo.add(0L); // query AccessCount1H
134+
tabletInfo.add(0L); // query LastAccessTime
131135
tabletInfo.add(-1); // path hash
132136
tabletInfo.add(FeConstants.null_string); // path
133137
tabletInfo.add(FeConstants.null_string); // meta url
@@ -140,6 +144,13 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
140144

141145
tabletInfos.add(tabletInfo);
142146
} else {
147+
TabletAccessStats asr = TabletAccessStats.getInstance();
148+
long accessCount1H = 0;
149+
long lastAccessTime = 0;
150+
if (asr != null) {
151+
accessCount1H = TabletAccessStats.getInstance().getTabletAccessInfo(tabletId).accessCount;
152+
lastAccessTime = TabletAccessStats.getInstance().getTabletAccessInfo(tabletId).lastAccessTime;
153+
}
143154
for (Replica replica : tablet.getReplicas()) {
144155
long beId = replica.getBackendIdWithoutException();
145156
if ((version > -1 && replica.getVersion() != version)
@@ -167,6 +178,8 @@ public List<List<Comparable>> fetchComparableResult(long version, long backendId
167178
tabletInfo.add(replica.getVisibleVersionCount());
168179
tabletInfo.add(replica.getTotalVersionCount());
169180
tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L));
181+
tabletInfo.add(String.valueOf(accessCount1H));
182+
tabletInfo.add(String.valueOf(lastAccessTime));
170183
tabletInfo.add(replica.getPathHash());
171184
tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), ""));
172185
Backend be = backendMap.get(beId);

0 commit comments

Comments
 (0)