Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 190 additions & 7 deletions be/src/core/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,51 @@ template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
RuntimeProfile::Counter* memory_used_counter);

namespace {

// The no-clone fast path is only safe when the whole column tree is uniquely
// owned. A composite column with shared children still needs COW detachment.
bool is_recursively_exclusive(const IColumn& column) {
if (!column.is_exclusive()) {
return false;
}

bool exclusive = true;
IColumn::ColumnCallback callback = [&](IColumn::WrappedPtr& subcolumn) {
if (!exclusive) {
return;
}
const ColumnPtr& subcolumn_ptr = const_cast<const IColumn::WrappedPtr&>(subcolumn);
DCHECK(subcolumn_ptr);
exclusive = is_recursively_exclusive(*subcolumn_ptr);
};
// `for_each_subcolumn` only exposes a mutable callback type. This callback
// only reads the wrapped pointers and never calls the non-const accessors.
const_cast<IColumn&>(column).for_each_subcolumn(callback);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_recursively_exclusive() is intended to be a read-only ownership probe, but this calls the non-const for_each_subcolumn() through const_cast. ColumnVariant::for_each_subcolumn() is not read-only: after invoking the callback it assigns num_rows = serialized_sparse_column->size() and runs ENABLE_CHECK_CONSISTENCY(this). That means a plain Block::mutate_columns_scoped() acquisition can mutate a ColumnVariant (or throw from the consistency check) before any caller-requested mutation happens. Please add a const/read-only subcolumn traversal or avoid using for_each_subcolumn() for this exclusivity check.

return exclusive;
}

// Acquire one live Block slot transactionally. Shared columns are detached while
// the original slot is still intact, so a clone failure cannot leave Block with
// a moved-from/null column. Exclusive column trees keep the stealing fast path.
MutableColumnPtr scoped_mutate_column(ColumnPtr& column, const DataTypePtr& type) {
DCHECK(type);
if (!column) {
return type->create_column();
}

MutableColumnPtr mutable_column;
if (is_recursively_exclusive(*column)) {
mutable_column = std::move(*column).mutate();
} else {
mutable_column = IColumn::mutate(column);
}
column = nullptr;
return mutable_column;
}

} // namespace

Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {}

Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {}
Expand Down Expand Up @@ -576,12 +621,127 @@ Columns Block::get_columns_and_convert() {
return columns;
}

MutableColumns Block::mutate_columns() {
Block::ScopedMutableColumns::ScopedMutableColumns(Block& block) : _block(&block) {
const size_t num_columns = block.data.size();
_columns.resize(num_columns);
size_t acquired_columns = 0;
try {
for (; acquired_columns < num_columns; ++acquired_columns) {
auto& column_with_type_and_name = block.data[acquired_columns];
_columns[acquired_columns] = scoped_mutate_column(column_with_type_and_name.column,
column_with_type_and_name.type);
}
} catch (...) {
for (size_t i = 0; i < acquired_columns; ++i) {
block.data[i].column = std::move(_columns[i]);
}
_block = nullptr;
throw;
}
}

Block::ScopedMutableColumns::~ScopedMutableColumns() {
restore();
}

Block::ScopedMutableColumns::ScopedMutableColumns(ScopedMutableColumns&& other) noexcept
: _block(std::exchange(other._block, nullptr)), _columns(std::move(other._columns)) {}

Block::ScopedMutableColumns& Block::ScopedMutableColumns::operator=(
ScopedMutableColumns&& other) noexcept {
if (this != &other) {
restore();
_block = std::exchange(other._block, nullptr);
_columns = std::move(other._columns);
}
return *this;
}

const DataTypePtr& Block::ScopedMutableColumns::get_datatype_by_position(size_t position) const {
DCHECK(_block != nullptr);
return _block->get_by_position(position).type;
}

const std::string& Block::ScopedMutableColumns::get_name_by_position(size_t position) const {
DCHECK(_block != nullptr);
return _block->get_by_position(position).name;
}

MutableColumns Block::ScopedMutableColumns::release() {
DCHECK(_block != nullptr);
_block = nullptr;
return std::move(_columns);
}

void Block::ScopedMutableColumns::restore() {
if (_block != nullptr) {
_block->set_columns(std::move(_columns));
_block = nullptr;
}
}

Block::ScopedMutableColumn::ScopedMutableColumn(Block& block, size_t position)
: _block(&block), _position(position) {
DCHECK_LT(_position, _block->data.size());
auto& column_with_type_and_name = _block->data[_position];
DCHECK(column_with_type_and_name.type);
_column =
scoped_mutate_column(column_with_type_and_name.column, column_with_type_and_name.type);
}

Block::ScopedMutableColumn::~ScopedMutableColumn() {
restore();
}

Block::ScopedMutableColumn::ScopedMutableColumn(ScopedMutableColumn&& other) noexcept
: _block(std::exchange(other._block, nullptr)),
_position(other._position),
_column(std::move(other._column)) {}

Block::ScopedMutableColumn& Block::ScopedMutableColumn::operator=(
ScopedMutableColumn&& other) noexcept {
if (this != &other) {
restore();
_block = std::exchange(other._block, nullptr);
_position = other._position;
_column = std::move(other._column);
}
return *this;
}

void Block::ScopedMutableColumn::restore() {
if (_block != nullptr) {
DCHECK_LT(_position, _block->data.size());
_block->data[_position].column = std::move(_column);
_block = nullptr;
}
}

Block::ScopedMutableColumns Block::mutate_columns_scoped() & {
return ScopedMutableColumns(*this);
}

Block::ScopedMutableColumn Block::mutate_column_scoped(size_t position) & {
return ScopedMutableColumn(*this, position);
}

ScopedMutableBlock::ScopedMutableBlock(Block* block) {
DCHECK(block != nullptr);
DataTypes data_types = block->get_data_types();
std::vector<std::string> names = block->get_names();
auto columns_guard = block->mutate_columns_scoped();
_mutable_block.data_types() = std::move(data_types);
_mutable_block.get_names() = std::move(names);
_mutable_block.set_mutable_columns(columns_guard.release());
_block = block;
}

MutableColumns Block::mutate_columns() && {
size_t num_columns = data.size();
MutableColumns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i) {
DCHECK(data[i].type);
columns[i] = data[i].column ? (*std::move(data[i].column)).mutate()
columns[i] = data[i].column ? IColumn::mutate(std::move(data[i].column))
: data[i].type->create_column();
}
return columns;
Expand Down Expand Up @@ -644,7 +804,7 @@ void Block::clear() {
data.clear();
}

void Block::clear_column_data(int64_t column_size) noexcept {
void Block::clear_column_data(int64_t column_size) {
SCOPED_SKIP_MEMORY_CHECK();
// data.size() greater than column_size, means here have some
// function exec result in block, need erase it here
Expand All @@ -655,9 +815,26 @@ void Block::clear_column_data(int64_t column_size) noexcept {
}
for (auto& d : data) {
if (d.column) {
// Temporarily disable reference count check because a column might be referenced multiple times within a block.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fast path only checks top-level exclusivity, but composite columns can be uniquely owned at the parent while sharing child columns. For example, a cloned ColumnArray can have an exclusive array wrapper with shared data/offset children; assume_mutable()->clear() then clears the shared children and corrupts the alias. The new scoped_mutate_column() already uses recursive exclusivity for the same reason, so clear_column_data() should either use that recursive check or clone an empty column whenever the whole column tree is not exclusive. The vector overload below has the same pattern.

// Queries like this: `select c, c from t1;`
(*std::move(d.column)).assume_mutable()->clear();
if (d.column->is_exclusive()) {
d.column->assume_mutable()->clear();
} else {
d.column = d.column->clone_empty();
}
}
}
}
Comment thread
zclllyybb marked this conversation as resolved.

void Block::clear_column_data(const std::vector<uint32_t>& columns_to_clear) {
SCOPED_SKIP_MEMORY_CHECK();
for (auto col : columns_to_clear) {
DCHECK_LT(col, data.size());
auto& column = data[col].column;
if (column) {
if (column->is_exclusive()) {
column->assume_mutable()->clear();
} else {
column = column->clone_empty();
}
}
}
}
Expand Down Expand Up @@ -1085,7 +1262,13 @@ void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_
for (auto idx : char_type_idx) {
if (idx < data.size()) {
auto& col_and_name = this->get_by_position(idx);
col_and_name.column->assume_mutable()->shrink_padding_chars();
if (col_and_name.column->is_exclusive()) {
col_and_name.column->assume_mutable()->shrink_padding_chars();
} else {
auto mutable_col = std::move(*col_and_name.column).mutate();
mutable_col->shrink_padding_chars();
col_and_name.column = std::move(mutable_col);
}
}
}
}
Expand Down
Loading
Loading