Skip to content

Commit 597e2dc

Browse files
committed
fixes and uts
Signed-off-by: zhengyu <zhangzhengyu@selectdb.com>
1 parent a1fb36d commit 597e2dc

File tree

2 files changed

+206
-0
lines changed

2 files changed

+206
-0
lines changed

be/src/cloud/cloud_warm_up_manager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
130130
int64_t expiration_time,
131131
std::shared_ptr<bthread::CountdownEvent> wait,
132132
bool is_index, std::function<void(Status)> done_cb) {
133+
VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size
134+
<< ", expiration_time: " << expiration_time
135+
<< ", is_index: " << (is_index ? "true" : "false");
133136
if (file_size < 0) {
134137
auto st = file_system->file_size(path, &file_size);
135138
if (!st.ok()) [[unlikely]] {
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import org.apache.doris.regression.util.NodeType
20+
import groovy.json.JsonSlurper
21+
22+
suite('test_warmup_table_docker', 'docker') {
23+
def options = new ClusterOptions()
24+
options.feConfigs += [
25+
'cloud_cluster_check_interval_second=1',
26+
]
27+
options.beConfigs += [
28+
'file_cache_enter_disk_resource_limit_mode_percent=99',
29+
'enable_evict_file_cache_in_advance=false',
30+
'enable_only_warm_up_idx=true',
31+
]
32+
options.cloudMode = true
33+
options.beNum = 1
34+
options.feNum = 1
35+
36+
def testTable = "test_warmup_table"
37+
38+
def clearFileCache = {ip, port ->
39+
def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
40+
def response = new URL(url).text
41+
def json = new JsonSlurper().parseText(response)
42+
43+
// Check the status
44+
if (json.status != "OK") {
45+
throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}")
46+
}
47+
}
48+
49+
def clearFileCacheOnAllBackends = {
50+
def backends = sql """SHOW BACKENDS"""
51+
52+
for (be in backends) {
53+
def ip = be[1]
54+
def port = be[4]
55+
clearFileCache(ip, port)
56+
}
57+
58+
// clear file cache is async, wait it done
59+
sleep(5000)
60+
}
61+
62+
def getBrpcMetrics = {ip, port, name ->
63+
def url = "http://${ip}:${port}/brpc_metrics"
64+
def metrics = new URL(url).text
65+
def matcher = metrics =~ ~"${name}\\s+(\\d+)"
66+
if (matcher.find()) {
67+
return matcher[0][1] as long
68+
} else {
69+
return 0
70+
}
71+
}
72+
73+
def updateBeConf = {cluster, key, value ->
74+
def backends = sql """SHOW BACKENDS"""
75+
def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
76+
for (be in cluster_bes) {
77+
def ip = be[1]
78+
def port = be[4]
79+
def (code, out, err) = update_be_config(ip, port, key, value)
80+
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
81+
}
82+
}
83+
84+
docker(options) {
85+
def clusterName = "warmup_cluster"
86+
87+
// Add one cluster
88+
cluster.addBackend(1, clusterName)
89+
90+
// Ensure we are in the cluster
91+
sql """use @${clusterName}"""
92+
93+
try {
94+
sql "set global enable_audit_plugin = false"
95+
} catch (Exception e) {
96+
logger.info("set global enable_audit_plugin = false failed: " + e.getMessage())
97+
}
98+
99+
sql """ DROP TABLE IF EXISTS ${testTable} """
100+
sql """ CREATE TABLE IF NOT EXISTS ${testTable} (
101+
`k1` int(11) NULL,
102+
`k2` int(11) NULL,
103+
`v3` int(11) NULL,
104+
`text` text NULL,
105+
INDEX idx_text (`text`) USING INVERTED PROPERTIES("parser" = "english")
106+
) unique KEY(`k1`, `k2`)
107+
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
108+
PROPERTIES (
109+
"replication_num" = "1"
110+
);
111+
"""
112+
113+
// Load data
114+
int dataCount = 4000000
115+
streamLoad {
116+
table "${testTable}"
117+
set 'column_separator', ','
118+
inputIterator (new Iterator<String>() {
119+
int current = 0
120+
@Override
121+
boolean hasNext() {
122+
return current < dataCount
123+
}
124+
@Override
125+
String next() {
126+
current++
127+
if (current % 2 == 0) {
128+
return "${current},${current},${current},hello doris ${current}"
129+
} else {
130+
return "${current},${current},${current},hello world ${current}"
131+
}
132+
}
133+
})
134+
135+
check { res, exception, startTime, endTime ->
136+
if (exception != null) {
137+
throw exception
138+
}
139+
def json = parseJson(res)
140+
assertEquals("success", json.Status.toLowerCase())
141+
}
142+
}
143+
sql "sync"
144+
145+
def backends = sql """SHOW BACKENDS"""
146+
def ip = backends[0][1]
147+
def brpcPort = backends[0][5]
148+
149+
sleep(3000)
150+
def cache_size_after_load = getBrpcMetrics(ip, brpcPort, "cache_cache_size")
151+
152+
// Clear file cache to ensure warm up actually does something
153+
clearFileCacheOnAllBackends()
154+
155+
sleep(3000)
156+
def cache_size_after_clear = getBrpcMetrics(ip, brpcPort, "cache_cache_size")
157+
assertEquals(cache_size_after_clear, 0)
158+
159+
// Set enable_only_warm_up_idx = true
160+
updateBeConf(clusterName, "enable_only_warm_up_idx", "true")
161+
162+
// Trigger warm up
163+
def jobId = sql "WARM UP CLUSTER ${clusterName} WITH TABLE ${testTable}"
164+
assertNotNull(jobId)
165+
def id = jobId[0][0]
166+
167+
// Wait for warm up job to finish
168+
def waitJobFinished = { job_id ->
169+
for (int i = 0; i < 60; i++) {
170+
def result = sql "SHOW WARM UP JOB WHERE ID = ${job_id}"
171+
if (result.size() > 0) {
172+
def status = result[0][3]
173+
logger.info("Warm up job ${job_id} status: ${status}")
174+
if (status == "FINISHED") {
175+
return true
176+
} else if (status == "CANCELLED") {
177+
throw new RuntimeException("Warm up job ${job_id} cancelled")
178+
}
179+
}
180+
sleep(1000)
181+
}
182+
return false
183+
}
184+
185+
assertTrue(waitJobFinished(id), "Warm up job ${id} did not finish in time")
186+
sleep(3000)
187+
def cache_size_after_warm = getBrpcMetrics(ip, brpcPort, "cache_cache_size")
188+
189+
logger.info("Cache size after load: ${cache_size_after_load}, after clear: ${cache_size_after_clear}, after warm up: ${cache_size_after_warm}")
190+
assertTrue(cache_size_after_warm < cache_size_after_load);
191+
192+
def s3ReadBefore = getBrpcMetrics(ip, brpcPort, "cached_remote_reader_s3_read")
193+
194+
// Verify data can be read
195+
def result = sql "SELECT COUNT() FROM ${testTable} WHERE text MATCH_ANY 'doris'"
196+
sleep(3000)
197+
// Get metrics after query
198+
def s3ReadAfter = getBrpcMetrics(ip, brpcPort, "cached_remote_reader_s3_read")
199+
200+
// Check no cache miss (s3 read count should not increase)
201+
assertEquals(s3ReadBefore, s3ReadAfter)
202+
}
203+
}

0 commit comments

Comments
 (0)