Skip to content

Commit aa9a6a4

Browse files
Merge branch 'master' into dev-ms
2 parents 5f777f1 + 2951187 commit aa9a6a4

File tree

374 files changed

+13296
-5591
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

374 files changed

+13296
-5591
lines changed

be/src/cloud/cloud_delta_writer.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
namespace doris {
2828

29+
bvar::Adder<int64_t> g_cloud_commit_rowset_count("cloud_commit_rowset_count");
30+
bvar::Adder<int64_t> g_cloud_commit_empty_rowset_count("cloud_commit_empty_rowset_count");
31+
2932
CloudDeltaWriter::CloudDeltaWriter(CloudStorageEngine& engine, const WriteRequest& req,
3033
RuntimeProfile* profile, const UniqueId& load_id)
3134
: BaseDeltaWriter(req, profile, load_id), _engine(engine) {
@@ -108,10 +111,12 @@ const RowsetMetaSharedPtr& CloudDeltaWriter::rowset_meta() {
108111
}
109112

110113
Status CloudDeltaWriter::commit_rowset() {
114+
g_cloud_commit_rowset_count << 1;
111115
std::lock_guard<bthread::Mutex> lock(_mtx);
112116

113117
// Handle empty rowset (no data written)
114118
if (!_is_init) {
119+
g_cloud_commit_empty_rowset_count << 1;
115120
return _commit_empty_rowset();
116121
}
117122

be/src/cloud/cloud_internal_service.cpp

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "io/cache/block_file_cache.h"
2727
#include "io/cache/block_file_cache_downloader.h"
2828
#include "io/cache/block_file_cache_factory.h"
29+
#include "util/debug_points.h"
2930

3031
namespace doris {
3132
#include "common/compile_check_avoid_begin.h"
@@ -190,6 +191,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
190191
continue;
191192
}
192193
int64_t tablet_id = rs_meta.tablet_id();
194+
auto rowset_id = rs_meta.rowset_id();
193195
bool local_only = !(request->has_skip_existence_check() && request->skip_existence_check());
194196
auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = */ false,
195197
/* sync_delete_bitmap = */ true,
@@ -216,7 +218,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
216218
g_file_cache_warm_up_rowset_request_to_handle_slow_count << 1;
217219
LOG(INFO) << "warm up rowset (request to handle) took " << handle_ts - request_ts
218220
<< " us, tablet_id: " << rs_meta.tablet_id()
219-
<< ", rowset_id: " << rs_meta.rowset_id().to_string();
221+
<< ", rowset_id: " << rowset_id.to_string();
220222
}
221223
int64_t expiration_time =
222224
tablet_meta->ttl_seconds() == 0 || rs_meta.newest_write_timestamp() <= 0
@@ -227,16 +229,26 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
227229
}
228230

229231
if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpState::TRIGGERED_BY_JOB)) {
230-
LOG(INFO) << "found duplicate warmup task for rowset " << rs_meta.rowset_id()
232+
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
231233
<< ", skip it";
232234
continue;
233235
}
234236

235237
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
236-
auto download_done = [&, tablet_id = rs_meta.tablet_id(),
237-
rowset_id = rs_meta.rowset_id().to_string(),
238-
segment_size = rs_meta.segment_file_size(segment_id),
239-
wait](Status st) {
238+
auto segment_size = rs_meta.segment_file_size(segment_id);
239+
auto download_done = [=, version = rs_meta.version()](Status st) {
240+
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
241+
auto sleep_time = dp->param<int>("sleep", 3);
242+
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
243+
rowset_id.to_string(), version.to_string(), sleep_time);
244+
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
245+
});
246+
DBUG_EXECUTE_IF(
247+
"CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_error", {
248+
st = Status::InternalError("injected error");
249+
LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}",
250+
tablet_id, rowset_id.to_string(), st.to_string());
251+
});
240252
if (st.ok()) {
241253
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
242254
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
@@ -250,25 +262,27 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
250262
now_ts - request_ts > config::warm_up_rowset_slow_log_ms * 1000) {
251263
g_file_cache_warm_up_rowset_slow_count << 1;
252264
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
253-
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
265+
<< " us, tablet_id: " << tablet_id
266+
<< ", rowset_id: " << rowset_id.to_string()
254267
<< ", segment_id: " << segment_id;
255268
}
256269
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
257270
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
258271
LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
259-
<< " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id
272+
<< " us, tablet_id: " << tablet_id
273+
<< ", rowset_id: " << rowset_id.to_string()
260274
<< ", segment_id: " << segment_id;
261275
}
262276
} else {
263277
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
264278
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
265279
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
266-
<< " rowset_id: " << rowset_id << ", error: " << st;
280+
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
267281
}
268-
if (tablet->complete_rowset_segment_warmup(rs_meta.rowset_id(), st) ==
282+
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 1, 0) ==
269283
WarmUpState::DONE) {
270-
VLOG_DEBUG << "warmup rowset " << rs_meta.version() << "(" << rowset_id
271-
<< ") completed";
284+
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
285+
<< rowset_id.to_string() << ") completed";
272286
}
273287
if (wait) {
274288
wait->signal();
@@ -277,31 +291,35 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
277291

278292
io::DownloadFileMeta download_meta {
279293
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
280-
.file_size = rs_meta.segment_file_size(segment_id),
294+
.file_size = segment_size,
281295
.offset = 0,
282-
.download_size = rs_meta.segment_file_size(segment_id),
296+
.download_size = segment_size,
283297
.file_system = storage_resource.value()->fs,
284-
.ctx =
285-
{
286-
.is_index_data = false,
287-
.expiration_time = expiration_time,
288-
.is_dryrun =
289-
config::enable_reader_dryrun_when_download_file_cache,
290-
},
298+
.ctx = {.is_index_data = false,
299+
.expiration_time = expiration_time,
300+
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
301+
.is_warmup = true},
291302
.download_done = std::move(download_done),
292303
};
293304
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
294-
g_file_cache_event_driven_warm_up_submitted_segment_size
295-
<< rs_meta.segment_file_size(segment_id);
305+
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
296306
if (wait) {
297307
wait->add_count();
298308
}
299309
_engine.file_cache_block_downloader().submit_download_task(download_meta);
300310

301-
auto download_inverted_index = [&](std::string index_path, uint64_t idx_size) {
311+
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
302312
auto storage_resource = rs_meta.remote_storage_resource();
303-
auto download_done = [=, tablet_id = rs_meta.tablet_id(),
304-
rowset_id = rs_meta.rowset_id().to_string()](Status st) {
313+
auto download_done = [=, version = rs_meta.version()](Status st) {
314+
DBUG_EXECUTE_IF(
315+
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
316+
auto sleep_time = dp->param<int>("sleep", 3);
317+
LOG_INFO(
318+
"[verbose] block download for rowset={}, inverted index "
319+
"file={}, sleep={}",
320+
rowset_id.to_string(), index_path, sleep_time);
321+
std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
322+
});
305323
if (st.ok()) {
306324
g_file_cache_event_driven_warm_up_finished_index_num << 1;
307325
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
@@ -318,14 +336,14 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
318336
g_file_cache_warm_up_rowset_slow_count << 1;
319337
LOG(INFO) << "warm up rowset took " << now_ts - request_ts
320338
<< " us, tablet_id: " << tablet_id
321-
<< ", rowset_id: " << rowset_id
339+
<< ", rowset_id: " << rowset_id.to_string()
322340
<< ", segment_id: " << segment_id;
323341
}
324342
if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
325343
g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
326344
LOG(INFO) << "warm up rowset (handle to finish) took "
327345
<< now_ts - handle_ts << " us, tablet_id: " << tablet_id
328-
<< ", rowset_id: " << rowset_id
346+
<< ", rowset_id: " << rowset_id.to_string()
329347
<< ", segment_id: " << segment_id;
330348
}
331349
} else {
@@ -334,6 +352,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
334352
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
335353
<< " rowset_id: " << rowset_id << ", error: " << st;
336354
}
355+
if (tablet->complete_rowset_segment_warmup(rowset_id, st, 0, 1) ==
356+
WarmUpState::DONE) {
357+
VLOG_DEBUG << "warmup rowset " << version.to_string() << "("
358+
<< rowset_id.to_string() << ") completed";
359+
}
337360
if (wait) {
338361
wait->signal();
339362
}
@@ -342,18 +365,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
342365
.path = io::Path(index_path),
343366
.file_size = static_cast<int64_t>(idx_size),
344367
.file_system = storage_resource.value()->fs,
345-
.ctx =
346-
{
347-
.is_index_data = false, // DORIS-20877
348-
.expiration_time = expiration_time,
349-
.is_dryrun = config::
350-
enable_reader_dryrun_when_download_file_cache,
351-
},
368+
.ctx = {.is_index_data = false, // DORIS-20877
369+
.expiration_time = expiration_time,
370+
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
371+
.is_warmup = true},
352372
.download_done = std::move(download_done),
353373
};
354374
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
355375
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
356-
376+
tablet->update_rowset_warmup_state_inverted_idx_num(rowset_id, 1);
357377
if (wait) {
358378
wait->add_count();
359379
}

0 commit comments

Comments
 (0)