diff --git a/be/src/runtime/load_stream_writer.cpp b/be/src/runtime/load_stream_writer.cpp index 5a77ee283c3327..f5e4c718a7eda2 100644 --- a/be/src/runtime/load_stream_writer.cpp +++ b/be/src/runtime/load_stream_writer.cpp @@ -23,7 +23,9 @@ #include #include +#include #include +#include #include #include #include @@ -93,6 +95,14 @@ Status LoadStreamWriter::init() { { return Status::InternalError("fault injection"); }); RETURN_IF_ERROR(_rowset_builder->init()); _rowset_writer = _rowset_builder->rowset_writer(); + + if (auto tablet_res = ExecEnv::get_tablet(_req.tablet_id); tablet_res.has_value()) { + auto tablet = std::dynamic_pointer_cast(tablet_res.value()); + if (tablet && tablet->data_dir()) { + _data_dir = tablet->data_dir(); + } + } + _is_init = true; return Status::OK(); } @@ -100,6 +110,22 @@ Status LoadStreamWriter::init() { Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf, FileType file_type) { SCOPED_ATTACH_TASK(_resource_ctx); + + // When memtable-on-sink is enabled + // Check disk capacity before writing data + if (!_is_init) { + return Status::Corruption("append_data failed, LoadStreamWriter is not inited"); + } + + if (_data_dir) { + int64_t incoming_data_size = static_cast(buf.size()); + if (_data_dir->reach_capacity_limit(incoming_data_size)) { + return Status::Error( + "disk {} exceed capacity limit when writing stream data, tablet_id={}", + _data_dir->path_hash(), _req.tablet_id); + } + } + io::FileWriter* file_writer = nullptr; auto& file_writers = file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; diff --git a/be/src/runtime/load_stream_writer.h b/be/src/runtime/load_stream_writer.h index e27db9462315f6..ce12f9451f26d9 100644 --- a/be/src/runtime/load_stream_writer.h +++ b/be/src/runtime/load_stream_writer.h @@ -38,6 +38,7 @@ class SlotDescriptor; class OlapTableSchemaParam; class RowsetWriter; class RuntimeProfile; +class DataDir; struct SegmentStatistics; using SegmentStatisticsSharedPtr = std::shared_ptr; class BaseRowsetBuilder; @@ -89,6 +90,7 @@ class LoadStreamWriter { std::vector _segment_file_writers; std::vector _inverted_file_writers; std::shared_ptr _resource_ctx; + DataDir* _data_dir = nullptr; }; using LoadStreamWriterSharedPtr = std::shared_ptr;