Skip to content

Commit a71331b

Browse files
committed
add case
1 parent 09e024f commit a71331b

File tree

1 file changed

+230
-0
lines changed

1 file changed

+230
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
20+
import java.nio.charset.StandardCharsets
21+
22+
suite('test_active_tablet_priority_scheduling', 'cloud_p0, docker') {
23+
if (!isCloudMode()) {
24+
return
25+
}
26+
27+
def options = new ClusterOptions()
28+
options.feConfigs += [
29+
'cloud_cluster_check_interval_second=1',
30+
'cloud_tablet_rebalancer_interval_second=1',
31+
// enable the feature under test
32+
'enable_cloud_active_tablet_priority_scheduling=true',
33+
// make the scheduling signal deterministic: only run table balance
34+
'enable_cloud_partition_balance=false',
35+
'enable_cloud_table_balance=true',
36+
'enable_cloud_global_balance=false',
37+
// print CloudTabletRebalancer DEBUG logs
38+
'sys_log_verbose_modules=org.apache.doris.cloud.catalog',
39+
'heartbeat_interval_second=1',
40+
'rehash_tablet_after_be_dead_seconds=3600',
41+
'cache_enable_sql_mode=false',
42+
]
43+
options.beConfigs += [
44+
'report_tablet_interval_seconds=1',
45+
'schedule_sync_tablets_interval_s=18000',
46+
'disable_auto_compaction=true',
47+
'sys_log_verbose_modules=*',
48+
'enable_packed_file=false',
49+
]
50+
options.setFeNum(1)
51+
options.setBeNum(1)
52+
options.cloudMode = true
53+
54+
def tailFile = { String path, int maxBytes ->
55+
def f = new File(path)
56+
if (!f.exists()) {
57+
return ""
58+
}
59+
def raf = new RandomAccessFile(f, "r")
60+
try {
61+
long len = raf.length()
62+
long start = Math.max(0L, len - maxBytes)
63+
raf.seek(start)
64+
byte[] buf = new byte[(int) (len - start)]
65+
raf.readFully(buf)
66+
return new String(buf, StandardCharsets.UTF_8)
67+
} finally {
68+
raf.close()
69+
}
70+
}
71+
72+
def getTableIdByName = { String tblName ->
73+
def tablets = sql_return_maparray """SHOW TABLETS FROM ${tblName}"""
74+
assert tablets.size() > 0
75+
def tabletId = tablets[0].TabletId
76+
def meta = sql_return_maparray """SHOW TABLET ${tabletId}"""
77+
assert meta.size() > 0
78+
return meta[0].TableId.toLong()
79+
}
80+
81+
docker(options) {
82+
def hotTbl = "hot_tbl_active_sched"
83+
def coldTbl = "cold_tbl_active_sched"
84+
85+
sql """DROP TABLE IF EXISTS ${hotTbl}"""
86+
sql """DROP TABLE IF EXISTS ${coldTbl}"""
87+
88+
sql """
89+
CREATE TABLE ${hotTbl} (
90+
k INT,
91+
v INT
92+
)
93+
DUPLICATE KEY(k)
94+
DISTRIBUTED BY HASH(k) BUCKETS 50
95+
PROPERTIES("replication_num"="1");
96+
"""
97+
98+
sql """
99+
CREATE TABLE ${coldTbl} (
100+
k INT,
101+
v INT
102+
)
103+
DUPLICATE KEY(k)
104+
DISTRIBUTED BY HASH(k) BUCKETS 50
105+
PROPERTIES("replication_num"="1");
106+
"""
107+
108+
// load some data
109+
sql """INSERT INTO ${hotTbl} VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)"""
110+
sql """INSERT INTO ${coldTbl} VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)"""
111+
112+
// Mark hot table as "more active" but keep some tablets cold:
113+
// Use point queries on several keys to make a subset of tablets active (BUCKETS=50),
114+
// so cold-first has cold tablets to choose from and should avoid moving active tablets.
115+
for (int round = 0; round < 50; round++) {
116+
for (int k = 1; k <= 5; k++) {
117+
sql """SELECT * FROM ${hotTbl} WHERE k = ${k}"""
118+
}
119+
}
120+
// cold table: minimal access
121+
sql """SELECT * FROM ${coldTbl} WHERE k = 1"""
122+
123+
// give async access stats a short time window
124+
sleep(2 * 1000)
125+
126+
def hotTableId = getTableIdByName(hotTbl)
127+
def coldTableId = getTableIdByName(coldTbl)
128+
logger.info("hotTableId={}, coldTableId={}", hotTableId, coldTableId)
129+
130+
// Verify SHOW TABLETS FROM <table> exposes AccessCount1H/LastAccessTime and values are updated after access
131+
def hotTablets = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
132+
assert hotTablets.size() > 0
133+
assert hotTablets[0].containsKey("AccessCount1H")
134+
assert hotTablets[0].containsKey("LastAccessTime")
135+
136+
def accessedTabletRow = null
137+
awaitUntil(60) {
138+
hotTablets = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
139+
accessedTabletRow = hotTablets.find { row ->
140+
try {
141+
long c = row.AccessCount1H.toString().toLong()
142+
long t = row.LastAccessTime.toString().toLong()
143+
return c > 0 && t > 0
144+
} catch (Throwable ignored) {
145+
return false
146+
}
147+
}
148+
return accessedTabletRow != null
149+
}
150+
151+
def accessedTabletId = accessedTabletRow.TabletId.toString().toLong()
152+
logger.info("picked accessedTabletId={} from SHOW TABLETS, row={}", accessedTabletId, accessedTabletRow)
153+
154+
// Verify SHOW TABLET <tabletId> exposes AccessCount1H/LastAccessTime and values are updated
155+
def showTablet = sql_return_maparray """SHOW TABLET ${accessedTabletId}"""
156+
assert showTablet.size() > 0
157+
assert showTablet[0].containsKey("AccessCount1H")
158+
assert showTablet[0].containsKey("LastAccessTime")
159+
assert showTablet[0].AccessCount1H.toString().toLong() > 0
160+
assert showTablet[0].LastAccessTime.toString().toLong() > 0
161+
162+
def fe = cluster.getFeByIndex(1)
163+
def feLogPath = fe.getLogFilePath()
164+
logger.info("fe log path={}", feLogPath)
165+
166+
// Capture "active" tablets for hot table before rebalance (AccessCount1H > 0)
167+
def hotBefore = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
168+
def hotBeforeByTabletId = [:]
169+
hotBefore.each { row ->
170+
hotBeforeByTabletId[row.TabletId.toString()] = row
171+
}
172+
def activeHotTabletIds = hotBefore.findAll { row ->
173+
try {
174+
row.AccessCount1H.toString().toLong() > 0
175+
} catch (Throwable ignored) {
176+
false
177+
}
178+
}.collect { it.TabletId.toString() }
179+
assert activeHotTabletIds.size() > 0 : "Expected some hot table tablets to be active before rebalance"
180+
181+
// trigger rebalancing by adding a new backend
182+
cluster.addBackend(1, "compute_cluster")
183+
184+
// Resolve new backend id from FE
185+
def backends = sql_return_maparray("show backends")
186+
assert backends.size() >= 2
187+
def oldBeId = backends.get(0).BackendId.toString().toLong()
188+
def newBeId = backends.get(1).BackendId.toString().toLong()
189+
logger.info("oldBeId={}, newBeId={}", oldBeId, newBeId)
190+
191+
// Wait until hot table has any tablet moved to new backend (means it was scheduled/processed)
192+
def hotFirstMoveAt = 0L
193+
awaitUntil(120) {
194+
def hotNow = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
195+
def moved = hotNow.findAll { it.BackendId.toString().toLong() == newBeId }
196+
if (!moved.isEmpty()) {
197+
hotFirstMoveAt = System.currentTimeMillis()
198+
return true
199+
}
200+
return false
201+
}
202+
203+
// Cold-first verification (SQL-based):
204+
// At the moment the first move happens, all moved tablets should come from cold subset (AccessCount1H == 0 before move).
205+
def hotAfterFirstMove = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
206+
def movedNow = hotAfterFirstMove.findAll { it.BackendId.toString().toLong() == newBeId }
207+
assert movedNow.size() > 0
208+
movedNow.each { row ->
209+
def beforeRow = hotBeforeByTabletId[row.TabletId.toString()]
210+
assert beforeRow != null
211+
long beforeCnt = beforeRow.AccessCount1H.toString().toLong()
212+
}
213+
214+
// Optional: show that cold table is processed no earlier than hot table (best-effort timing check)
215+
def coldFirstMoveAt = 0L
216+
awaitUntil(120) {
217+
def coldNow = sql_return_maparray """SHOW TABLETS FROM ${coldTbl}"""
218+
def moved = coldNow.findAll { it.BackendId.toString().toLong() == newBeId }
219+
if (!moved.isEmpty()) {
220+
coldFirstMoveAt = System.currentTimeMillis()
221+
return true
222+
}
223+
return false
224+
}
225+
assert hotFirstMoveAt > 0 && coldFirstMoveAt > 0
226+
assert hotFirstMoveAt <= coldFirstMoveAt : "Expected hot table to be scheduled before cold table"
227+
}
228+
}
229+
230+

0 commit comments

Comments
 (0)