Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
#include "common/status.h"
#include "util/defer_op.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_fixed_length_object.h"
#include "vec/columns/column_string.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_string.h"
Expand Down Expand Up @@ -110,7 +108,7 @@ class IAggregateFunction {
virtual void reset(AggregateDataPtr place) const = 0;

/// It is not necessary to delete data.
virtual bool has_trivial_destructor() const = 0;
virtual bool is_trivial() const = 0;

/// Get `sizeof` of structure with data.
virtual size_t size_of_data() const = 0;
Expand Down Expand Up @@ -641,7 +639,7 @@ class IAggregateFunctionDataHelper : public IAggregateFunctionHelper<Derived> {

void destroy(AggregateDataPtr __restrict place) const noexcept override { data(place).~Data(); }

bool has_trivial_destructor() const override { return std::is_trivially_destructible_v<Data>; }
bool is_trivial() const override { return false; }

size_t size_of_data() const override { return sizeof(Data); }

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_avg.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class AggregateFunctionAvg<T, TResult, Data> final
}
}

bool is_trivial() const override { return true; }

template <bool is_add>
NO_SANITIZE_UNDEFINED void update_value(AggregateDataPtr __restrict place,
const IColumn** columns, ssize_t row_num) const {
Expand Down Expand Up @@ -271,15 +273,13 @@ class AggregateFunctionAvg<T, TResult, Data> final
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column,
Arena& arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_foreach.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class AggregateFunctionForEach : public IAggregateFunctionDataHelper<AggregateFu
}
}

bool has_trivial_destructor() const override {
return std::is_trivially_destructible_v<Data> && nested_function->has_trivial_destructor();
bool is_trivial() const override {
return std::is_trivial_v<Data> && nested_function->is_trivial();
}

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Expand Down
89 changes: 83 additions & 6 deletions be/src/vec/aggregate_functions/aggregate_function_null.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
#include <glog/logging.h>

#include <array>
#include <memory>

#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_distinct.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"

Expand Down Expand Up @@ -161,9 +164,7 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
nested_function->reset(nested_place(place));
}

bool has_trivial_destructor() const override {
return nested_function->has_trivial_destructor();
}
bool is_trivial() const override { return false; }

size_t size_of_data() const override { return prefix_size + nested_function->size_of_data(); }

Expand All @@ -177,11 +178,10 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Arena& arena) const override {
if (result_is_nullable && get_flag(rhs)) {
if (get_flag(rhs)) {
set_flag(place);
nested_function->merge(nested_place(place), nested_place(rhs), arena);
}

nested_function->merge(nested_place(place), nested_place(rhs), arena);
}

void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
Expand All @@ -206,6 +206,83 @@ class AggregateFunctionNullBaseInline : public IAggregateFunctionHelper<Derived>
}
}

void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
const size_t num_rows) const override {
if (nested_function->is_trivial()) {
BufferReadable buf({column->get_data_at(0).data, 0});
size_t size_of_data = this->size_of_data();
if constexpr (result_is_nullable) {
for (int i = 0; i != num_rows; ++i) {
buf.read_binary(*(bool*)(rhs + size_of_data * i));
if (get_flag(rhs + size_of_data * i)) {
nested_function->deserialize(nested_place(rhs + size_of_data * i), buf,
arena);
}
}
for (size_t i = 0; i != num_rows; ++i) {
if (get_flag(rhs + size_of_data * i)) {
set_flag(places[i] + offset);
nested_function->merge(nested_place(places[i] + offset),
nested_place(rhs + size_of_data * i), arena);
}
}
} else {
for (size_t i = 0; i != num_rows; ++i) {
nested_function->deserialize(rhs + size_of_data * i, buf, arena);
}
for (size_t i = 0; i != num_rows; ++i) {
nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena);
}
}
} else {
IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec(places, offset, rhs,
column, arena, num_rows);
}
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column,
Arena& arena, const size_t num_rows) const override {
if (nested_function->is_trivial()) {
BufferReadable buf({column->get_data_at(0).data, 0});
size_t size_of_data = this->size_of_data();
if constexpr (result_is_nullable) {
for (int i = 0; i != num_rows; ++i) {
if (!places[i]) {
continue;
}
buf.read_binary(*(bool*)(rhs + size_of_data * i));
if (get_flag(rhs + size_of_data * i)) {
nested_function->deserialize(nested_place(rhs + size_of_data * i), buf,
arena);
}
}
for (size_t i = 0; i != num_rows; ++i) {
if (places[i] && get_flag(rhs + size_of_data * i)) {
set_flag(places[i] + offset);
nested_function->merge(nested_place(places[i] + offset),
nested_place(rhs + size_of_data * i), arena);
}
}
} else {
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
nested_function->deserialize(rhs + size_of_data * i, buf, arena);
}
}
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
nested_function->merge(places[i] + offset, rhs + size_of_data * i, arena);
}
}
}
} else {
IAggregateFunctionHelper<Derived>::deserialize_and_merge_vec_selected(
places, offset, rhs, column, arena, num_rows);
}
}

void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena& arena) const override {
bool flag = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class AggregateStateUnion : public IAggregateFunctionHelper<AggregateStateUnion>
_function->destroy(place);
}

bool has_trivial_destructor() const override { return _function->has_trivial_destructor(); }
bool is_trivial() const override { return false; }

size_t size_of_data() const override { return _function->size_of_data(); }

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/aggregate_functions/aggregate_function_sum.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class AggregateFunctionSum<T, TResult, Data> final
}
}

bool is_trivial() const override { return true; }

void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena&) const override {
const auto& column =
Expand Down Expand Up @@ -192,15 +194,13 @@ class AggregateFunctionSum<T, TResult, Data> final
AggregateDataPtr rhs, const IColumn* column, Arena& arena,
const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec(places, offset, rhs, arena, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
AggregateDataPtr rhs, const IColumn* column,
Arena& arena, const size_t num_rows) const override {
this->deserialize_from_column(rhs, *column, arena, num_rows);
DEFER({ this->destroy_vec(rhs, num_rows); });
this->merge_vec_selected(places, offset, rhs, arena, num_rows);
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/common/string_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ class BufferReadable {
template <typename Type>
void read_binary(Type& x) {
static_assert(std::is_standard_layout_v<Type>);
read(reinterpret_cast<char*>(&x), sizeof(x));
memcpy_fixed<Type>(reinterpret_cast<char*>(&x), _data);
_data += sizeof(x);
}

template <typename Type>
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/core/block.h"
#include "vec/core/types.h"
namespace doris {
class RuntimeProfile;
Expand Down
1 change: 1 addition & 0 deletions be/test/vec/olap/jsonb_value_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "gtest/gtest_pred_impl.h"
#include "vec/columns/column_string.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/olap/olap_data_convertor.h"
Expand Down
Loading