Skip to content

Commit 7291a69

Browse files
Merge branch 'master' into sss
2 parents 487f53a + b515f86 commit 7291a69

File tree

170 files changed

+2334
-1598
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+2334
-1598
lines changed

be/src/cloud/cloud_tablet.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ namespace doris {
5454
using namespace ErrorCode;
5555

5656
static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
57+
static constexpr int LOAD_INITIATOR_ID = -1;
5758

5859
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
5960
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
@@ -504,13 +505,19 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
504505
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
505506
const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
506507
int64_t txn_expiration) {
507-
if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
508-
// May cause the segment files generated by the transient rowset writer unable to be
509-
// recycled, see `CloudRowsetWriter::build` for detail.
510-
LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
511-
DCHECK(false) << rowset.rowset_meta()->rowset_state();
508+
if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
509+
rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
510+
auto msg = fmt::format(
511+
"wrong rowset state when create_transient_rowset_writer, rowset state should be "
512+
"BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
513+
RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
514+
tablet_id());
515+
// see `CloudRowsetWriter::build` for detail.
516+
// if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
517+
// in `RowsetMeta::merge_rowset_meta()` in previous trials.
518+
LOG(WARNING) << msg;
519+
DCHECK(false) << msg;
512520
}
513-
514521
RowsetWriterContext context;
515522
context.rowset_state = PREPARED;
516523
context.segments_overlap = OVERLAPPING;
@@ -719,8 +726,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
719726
}
720727

721728
auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
722-
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
723-
*this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
729+
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
730+
new_delete_bitmap.get()));
724731

725732
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
726733
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do

be/src/cloud/cloud_tablet_hotspot.cpp

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,55 @@ TabletHotspot::~TabletHotspot() {
5757
}
5858
}
5959

60-
struct MapKeyHash {
61-
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
62-
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
60+
void get_return_partitions(
61+
const std::unordered_map<TabletHotspotMapKey,
62+
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
63+
hot_partition,
64+
const std::unordered_map<TabletHotspotMapKey,
65+
std::unordered_map<int64_t, TabletHotspotMapValue>, MapKeyHash>&
66+
last_hot_partition,
67+
std::vector<THotTableMessage>* hot_tables, int& return_partitions, int N) {
68+
for (const auto& [key, partition_to_value] : hot_partition) {
69+
THotTableMessage msg;
70+
msg.table_id = key.first;
71+
msg.index_id = key.second;
72+
for (const auto& [partition_id, value] : partition_to_value) {
73+
if (return_partitions > N) {
74+
return;
75+
}
76+
auto last_value_iter = last_hot_partition.find(key);
77+
if (last_value_iter != last_hot_partition.end()) {
78+
auto last_partition_iter = last_value_iter->second.find(partition_id);
79+
if (last_partition_iter != last_value_iter->second.end()) {
80+
const auto& last_value = last_partition_iter->second;
81+
if (std::abs(static_cast<int64_t>(value.qpd) -
82+
static_cast<int64_t>(last_value.qpd)) < 5 &&
83+
std::abs(static_cast<int64_t>(value.qpw) -
84+
static_cast<int64_t>(last_value.qpw)) < 10 &&
85+
std::abs(static_cast<int64_t>(value.last_access_time) -
86+
static_cast<int64_t>(last_value.last_access_time)) < 60) {
87+
LOG(INFO) << "skip partition_id=" << partition_id << " qpd=" << value.qpd
88+
<< " qpw=" << value.qpw
89+
<< " last_access_time=" << value.last_access_time
90+
<< " last_qpd=" << last_value.qpd
91+
<< " last_qpw=" << last_value.qpw
92+
<< " last_access_time=" << last_value.last_access_time;
93+
continue;
94+
}
95+
}
96+
}
97+
THotPartition hot_partition;
98+
hot_partition.__set_partition_id(partition_id);
99+
hot_partition.__set_query_per_day(value.qpd);
100+
hot_partition.__set_query_per_week(value.qpw);
101+
hot_partition.__set_last_access_time(value.last_access_time);
102+
msg.hot_partitions.push_back(hot_partition);
103+
return_partitions++;
104+
}
105+
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
106+
hot_tables->push_back(std::move(msg));
63107
}
64-
};
65-
struct TabletHotspotMapValue {
66-
uint64_t qpd = 0; // query per day
67-
uint64_t qpw = 0; // query per week
68-
int64_t last_access_time;
69-
};
70-
71-
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
108+
}
72109

73110
void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_tables) {
74111
// map<pair<table_id, index_id>, map<partition_id, value>> for day
@@ -108,33 +145,14 @@ void TabletHotspot::get_top_n_hot_partition(std::vector<THotTableMessage>* hot_t
108145
});
109146
constexpr int N = 50;
110147
int return_partitions = 0;
111-
auto get_return_partitions =
112-
[=, &return_partitions](
113-
const std::unordered_map<TabletHotspotMapKey,
114-
std::unordered_map<int64_t, TabletHotspotMapValue>,
115-
MapKeyHash>& hot_partition) {
116-
for (const auto& [key, partition_to_value] : hot_partition) {
117-
THotTableMessage msg;
118-
msg.table_id = key.first;
119-
msg.index_id = key.second;
120-
for (const auto& [partition_id, value] : partition_to_value) {
121-
if (return_partitions > N) {
122-
return;
123-
}
124-
THotPartition hot_partition;
125-
hot_partition.__set_partition_id(partition_id);
126-
hot_partition.__set_query_per_day(value.qpd);
127-
hot_partition.__set_query_per_week(value.qpw);
128-
hot_partition.__set_last_access_time(value.last_access_time);
129-
msg.hot_partitions.push_back(hot_partition);
130-
return_partitions++;
131-
}
132-
msg.__isset.hot_partitions = !msg.hot_partitions.empty();
133-
hot_tables->push_back(std::move(msg));
134-
}
135-
};
136-
get_return_partitions(day_hot_partitions);
137-
get_return_partitions(week_hot_partitions);
148+
149+
get_return_partitions(day_hot_partitions, _last_day_hot_partitions, hot_tables,
150+
return_partitions, N);
151+
get_return_partitions(week_hot_partitions, _last_week_hot_partitions, hot_tables,
152+
return_partitions, N);
153+
154+
_last_day_hot_partitions = std::move(day_hot_partitions);
155+
_last_week_hot_partitions = std::move(week_hot_partitions);
138156
}
139157

140158
void HotspotCounter::make_dot_point() {

be/src/cloud/cloud_tablet_hotspot.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,19 @@ struct HotspotCounter {
4949
};
5050

5151
using HotspotCounterPtr = std::shared_ptr<HotspotCounter>;
52+
using TabletHotspotMapKey = std::pair<int64_t, int64_t>;
53+
54+
struct TabletHotspotMapValue {
55+
uint64_t qpd = 0; // query per day
56+
uint64_t qpw = 0; // query per week
57+
int64_t last_access_time;
58+
};
59+
60+
struct MapKeyHash {
61+
int64_t operator()(const std::pair<int64_t, int64_t>& key) const {
62+
return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second);
63+
}
64+
};
5265

5366
class TabletHotspot {
5467
public:
@@ -71,6 +84,12 @@ class TabletHotspot {
7184
bool _closed {false};
7285
std::mutex _mtx;
7386
std::condition_variable _cond;
87+
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
88+
MapKeyHash>
89+
_last_day_hot_partitions;
90+
std::unordered_map<TabletHotspotMapKey, std::unordered_map<int64_t, TabletHotspotMapValue>,
91+
MapKeyHash>
92+
_last_week_hot_partitions;
7493
};
7594

7695
} // namespace doris

be/src/common/config.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1406,6 +1406,8 @@ DEFINE_Bool(enable_table_size_correctness_check, "false");
14061406
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
14071407
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");
14081408

1409+
DEFINE_mInt32(compaction_num_per_round, "1");
1410+
14091411
// clang-format off
14101412
#ifdef BE_TEST
14111413
// test s3

be/src/common/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,8 @@ DECLARE_Bool(enable_table_size_correctness_check);
14931493
// Enable sleep 5s between delete cumulative compaction.
14941494
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);
14951495

1496+
DECLARE_mInt32(compaction_num_per_round);
1497+
14961498
#ifdef BE_TEST
14971499
// test s3
14981500
DECLARE_String(test_s3_resource);

be/src/common/daemon.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ void refresh_memory_state_after_memory_change() {
230230
}
231231

232232
void refresh_cache_capacity() {
233+
if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
234+
std::memory_order_relaxed)) {
235+
// the last cache capacity adjustment has not been completed.
236+
return;
237+
}
233238
if (refresh_cache_capacity_sleep_time_ms <= 0) {
234239
auto cache_capacity_reduce_mem_limit = int64_t(
235240
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
@@ -247,6 +252,8 @@ void refresh_cache_capacity() {
247252
new_cache_capacity_adjust_weighted;
248253
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
249254
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
255+
} else {
256+
refresh_cache_capacity_sleep_time_ms = 0;
250257
}
251258
}
252259
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;

be/src/common/status.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,8 @@ namespace ErrorCode {
293293
E(ENTRY_NOT_FOUND, -7002, false); \
294294
E(INVALID_TABLET_STATE, -7211, false); \
295295
E(ROWSETS_EXPIRED, -7311, false); \
296-
E(CGROUP_ERROR, -7411, false);
296+
E(CGROUP_ERROR, -7411, false); \
297+
E(FATAL_ERROR, -7412, false);
297298

298299
// Define constexpr int error_code_name = error_code_value
299300
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
@@ -446,6 +447,14 @@ class [[nodiscard]] Status {
446447

447448
static Status OK() { return {}; }
448449

450+
template <bool stacktrace = true, typename... Args>
451+
static Status FatalError(std::string_view msg, Args&&... args) {
452+
#ifndef NDEBUG
453+
LOG(FATAL) << fmt::format(msg, std::forward<Args>(args)...);
454+
#endif
455+
return Error<ErrorCode::FATAL_ERROR, stacktrace>(msg, std::forward<Args>(args)...);
456+
}
457+
449458
// default have stacktrace. could disable manually.
450459
#define ERROR_CTOR(name, code) \
451460
template <bool stacktrace = true, typename... Args> \

be/src/gutil/strings/escaping.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <limits>
1111
#include <ostream>
1212

13+
#include "common/exception.h"
14+
1315
using std::numeric_limits;
1416
#include <vector>
1517

@@ -1084,7 +1086,8 @@ int Base64UnescapeInternal(const char* src, int szsrc, char* dest, int szdest,
10841086

10851087
default:
10861088
// state should have no other values at this point.
1087-
LOG(FATAL) << "This can't happen; base64 decoder state = " << state;
1089+
throw doris::Exception(
1090+
doris::Status::FatalError("This can't happen; base64 decoder state = {}", state));
10881091
}
10891092

10901093
// The remainder of the string should be all whitespace, mixed with

be/src/gutil/strings/numbers.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <limits>
2020
#include <ostream>
2121

22+
#include "common/exception.h"
23+
2224
using std::numeric_limits;
2325
#include <string>
2426

@@ -772,8 +774,8 @@ uint64 atoi_kmgt(const char* s) {
772774
scale = GG_ULONGLONG(1) << 40;
773775
break;
774776
default:
775-
LOG(FATAL) << "Invalid mnemonic: `" << c << "';"
776-
<< " should be one of `K', `M', `G', and `T'.";
777+
throw doris::Exception(doris::Status::FatalError(
778+
"Invalid mnemonic: `{}'; should be one of `K', `M', `G', and `T'.", c));
777779
}
778780
}
779781
return n * scale;

be/src/gutil/strings/util.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <mutex>
2020
#include <ostream>
2121

22+
#include "common/exception.h"
23+
2224
using std::copy;
2325
using std::max;
2426
using std::min;
@@ -489,8 +491,7 @@ const char* strstr_delimited(const char* haystack, const char* needle, char deli
489491
++haystack;
490492
}
491493
}
492-
LOG(FATAL) << "Unreachable statement";
493-
return nullptr;
494+
throw doris::Exception(doris::Status::FatalError("Unreachable statement"));
494495
}
495496

496497
// ----------------------------------------------------------------------

0 commit comments

Comments
 (0)