Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ BaseDeltaWriter::~BaseDeltaWriter() {
}
}

void BaseDeltaWriter::set_tablet_load_rowset_num_info(
void BaseDeltaWriter::collect_tablet_load_rowset_num_info(
BaseTablet* tablet,
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_infos) {
auto* tablet = _rowset_builder->tablet().get();
if (tablet == nullptr) {
return;
}
Expand All @@ -114,6 +114,12 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info(
}
}

void BaseDeltaWriter::set_tablet_load_rowset_num_info(
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_infos) {
auto* tablet = _rowset_builder->tablet().get();
collect_tablet_load_rowset_num_info(tablet, tablet_infos);
}

DeltaWriter::~DeltaWriter() = default;

Status BaseDeltaWriter::init() {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class BaseDeltaWriter {

int64_t num_rows_filtered() const;

static void collect_tablet_load_rowset_num_info(
BaseTablet* tablet,
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_infos);

void set_tablet_load_rowset_num_info(
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_info);

Expand Down
58 changes: 57 additions & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "cloud/config.h"
#include "common/signal_handler.h"
#include "exec/tablet_info.h"
#include "olap/delta_writer.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -399,6 +400,13 @@ void IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int6
}
}

void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
std::lock_guard lock_guard(_lock);
for (const auto& [tablet_id, _] : _tablet_streams_map) {
tablet_ids->push_back(tablet_id);
}
}

void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
Expand Down Expand Up @@ -589,6 +597,43 @@ void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
}
}

void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
std::vector<int64_t> write_tablet_ids;
auto it = _index_streams_map.find(index_id);
if (it != _index_streams_map.end()) {
it->second->get_all_write_tablet_ids(&write_tablet_ids);
}

if (!write_tablet_ids.empty()) {
butil::IOBuf buf;
PLoadStreamResponse response;
auto* tablet_load_infos = response.mutable_tablet_load_rowset_num_infos();
_collect_tablet_load_info_from_tablets(write_tablet_ids, tablet_load_infos);
if (tablet_load_infos->empty()) {
return;
}
buf.append(response.SerializeAsString());
auto wst = _write_stream(stream, buf);
if (!wst.ok()) {
LOG(WARNING) << "report tablet load info failed with " << wst << ", " << *this;
}
}
}

void LoadStream::_collect_tablet_load_info_from_tablets(
const std::vector<int64_t>& tablet_ids,
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos) {
for (auto tablet_id : tablet_ids) {
BaseTabletSPtr tablet;
if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
tablet = std::move(res).value();
} else {
continue;
}
BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), tablet_load_infos);
}
}

Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
for (;;) {
int ret = 0;
Expand Down Expand Up @@ -699,7 +744,18 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
}

switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside TabletStream
case PStreamHeader::ADD_SEGMENT: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
_report_failure(id, st, hdr);
} else {
// Report tablet load info only on ADD_SEGMENT to reduce frequency.
// ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
// for every data batch. This reduces unnecessary writes and avoids
// potential stream write failures when the sender is closing.
_report_tablet_load_info(id, hdr.index_id());
}
} break;
case PStreamHeader::APPEND_DATA: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class IndexStream {
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets* failed_tablet_ids);

void get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids);

private:
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t tablet_id,
int64_t partition_id);
Expand Down Expand Up @@ -154,6 +156,10 @@ class LoadStream : public brpc::StreamInputHandler {
const std::vector<int64_t>& success_tablet_ids,
const FailedTablets& failed_tablets, bool eos);
void _report_schema(StreamId stream, const PStreamHeader& hdr);
void _report_tablet_load_info(StreamId stream, int64_t index_id);
void _collect_tablet_load_info_from_tablets(
const std::vector<int64_t>& tablet_ids,
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* tablet_load_infos);

// report failure for one message
void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
Expand Down
10 changes: 10 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "io/fs/stream_load_pipe.h"
#include "io/io_common.h"
#include "olap/data_dir.h"
#include "olap/delta_writer.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
Expand Down Expand Up @@ -415,6 +416,7 @@ void PInternalService::open_load_stream(google::protobuf::RpcController* control
LOG(INFO) << "open load stream, load_id=" << request->load_id()
<< ", src_id=" << request->src_id();

std::vector<BaseTabletSPtr> tablets;
for (const auto& req : request->tablets()) {
BaseTabletSPtr tablet;
if (auto res = ExecEnv::get_tablet(req.tablet_id()); !res.has_value()) [[unlikely]] {
Expand All @@ -429,6 +431,14 @@ void PInternalService::open_load_stream(google::protobuf::RpcController* control
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
tablets.push_back(tablet);
}
if (!tablets.empty()) {
auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
for (const auto& tablet : tablets) {
BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(),
tablet_load_infos);
}
}

LoadStream* load_stream = nullptr;
Expand Down
45 changes: 45 additions & 0 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf
ss << ", status: " << st;
LOG(INFO) << ss.str();

if (response.tablet_load_rowset_num_infos_size() > 0) {
stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
}

if (response.has_load_stream_profile()) {
TRuntimeProfileTree tprofile;
const uint8_t* buf =
Expand Down Expand Up @@ -196,6 +200,9 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
_enable_unique_mow_for_index->emplace(resp.index_id(),
resp.enable_unique_key_merge_on_write());
}
if (response.tablet_load_rowset_num_infos_size() > 0) {
_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
}
if (cntl.Failed()) {
brpc::StreamClose(_stream_id);
_status = Status::InternalError("Failed to connect to backend {}: {}", _dst_id,
Expand Down Expand Up @@ -503,6 +510,44 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
}
}

void LoadStreamStub::_refresh_back_pressure_version_wait_time(
const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
tablet_load_infos) {
int64_t max_rowset_num_gap = 0;
// if any one tablet is under high load pressure, we would make the whole procedure
// sleep to prevent the corresponding BE return -235
std::for_each(
tablet_load_infos.begin(), tablet_load_infos.end(),
[&max_rowset_num_gap](auto& load_info) {
int64_t cur_rowset_num = load_info.current_rowset_nums();
int64_t high_load_point = load_info.max_config_rowset_nums() *
(config::load_back_pressure_version_threshold / 100);
DCHECK(cur_rowset_num > high_load_point);
max_rowset_num_gap = std::max(max_rowset_num_gap, cur_rowset_num - high_load_point);
});
// to slow down the high load pressure
// we would use the rowset num gap to calculate one sleep time
// for example:
// if the max tablet version is 2000, there are 3 BE
// A: ==================== 1800
// B: =================== 1700
// C: ================== 1600
// ================== 1600
// ^
// the high load point
// then then max gap is 1800 - (max tablet version * config::load_back_pressure_version_threshold / 100) = 200,
// we would make the whole send procesure sleep
// 1200ms for compaction to be done toe reduce the high pressure
auto max_time = config::max_load_back_pressure_version_wait_time_ms;
if (UNLIKELY(max_rowset_num_gap > 0)) {
_load_back_pressure_version_wait_time_ms.store(
std::min(max_rowset_num_gap + 1000, max_time));
LOG(INFO) << "try to back pressure version, wait time(ms): "
<< _load_back_pressure_version_wait_time_ms << ", load id: " << print_id(_load_id)
<< ", max_rowset_num_gap: " << max_rowset_num_gap;
}
}

std::string LoadStreamStub::to_string() {
std::ostringstream ss;
ss << *this;
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
_cancel_st.to_string_no_stack());
}

int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
return _load_back_pressure_version_wait_time_ms.exchange(0);
}

void _refresh_back_pressure_version_wait_time(
const ::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
tablet_load_infos);

private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Expand Down Expand Up @@ -278,6 +286,8 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {

bthread::Mutex _write_mutex;
size_t _bytes_written = 0;

std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
};

// a collection of LoadStreams connect to the same node
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"

extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;

VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep)
Expand Down Expand Up @@ -243,6 +245,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
_close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_operator_profile, "CloseWriterTime", "CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", "CloseWaitTime");
_load_back_pressure_version_time_ms =
ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");

if (config::share_delta_writers) {
_delta_writer_for_tablet = ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
Expand Down Expand Up @@ -467,6 +471,21 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
return status;
}

int64_t total_wait_time_ms = 0;
auto streams_for_node = _load_stream_map->get_streams_for_node();
for (const auto& [dst_id, streams] : streams_for_node) {
for (const auto& stream : streams->streams()) {
auto wait_time_ms = stream->get_and_reset_load_back_pressure_version_wait_time_ms();
if (wait_time_ms > 0) {
total_wait_time_ms = std::max(total_wait_time_ms, wait_time_ms);
}
}
}
if (UNLIKELY(total_wait_time_ms > 0)) {
std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
_load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
}

// check out of limit
RETURN_IF_ERROR(_send_new_partition_batch());

Expand Down Expand Up @@ -641,6 +660,9 @@ Status VTabletWriterV2::close(Status exec_status) {
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns());
auto back_pressure_time_ms = _load_back_pressure_version_block_ms.load();
COUNTER_SET(_load_back_pressure_version_time_ms, back_pressure_time_ms);
g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;

// close DeltaWriters
{
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;

std::mutex _close_mutex;
bool _is_closed = false;
Expand All @@ -256,6 +257,8 @@ class VTabletWriterV2 final : public AsyncResultWriter {

// tablet_id -> <total replicas num, load required replicas num>
std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;

std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
};

} // namespace vectorized
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ message PTabletSchemaWithIndex {
message POpenLoadStreamResponse {
optional PStatus status = 1;
repeated PTabletSchemaWithIndex tablet_schemas = 2;
repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 3;
}

message PFailedTablet {
Expand All @@ -971,6 +972,7 @@ message PLoadStreamResponse {
optional bytes load_stream_profile = 4;
repeated PTabletSchemaWithIndex tablet_schemas = 5;
optional bool eos = 6;
repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 7;
}

message PStreamHeader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ suite("test_load_back_pressure_version", "nonConcurrent") {
)
"""

try {
set_be_param("load_back_pressure_version_threshold", "0")
sql "insert into ${testTable} values(1,1)"
def res = sql "select * from ${testTable}"
logger.info("res: " + res.size())
assertTrue(res.size() == 1)
} finally {
set_be_param("load_back_pressure_version_threshold", "80")
sql """ set enable_memtable_on_sink_node=true """
def test_load_back_pressure_version = { int targetRows ->
try {
set_be_param("load_back_pressure_version_threshold", "0")
sql "insert into ${testTable} values(1,1)"
def res = sql "select * from ${testTable}"
logger.info("res: " + res.size())
assertTrue(res.size() == targetRows)
} finally {
set_be_param("load_back_pressure_version_threshold", "80")
}
}

test_load_back_pressure_version(1)
sql """ set enable_memtable_on_sink_node=true """
test_load_back_pressure_version(2)
}