Skip to content

Commit d95d647

Browse files
authored
[Improvement](serialize) use streamvbyte_encode in DataTypeFixedLengthObject::serialize (#60526)
This pull request introduces a new serialization format for `DataTypeFixedLengthObject` columns, leveraging streamvbyte encoding for efficient storage and transmission of large data blocks. The new format is activated for BE exec version 10 and above, which is now set as the maximum supported version. Additionally, the `AggregateFunctionCount` and `AggregateFunctionCountNotNullUnary` functions are marked as trivial, likely for optimization purposes. Below are the most important changes: ### Serialization and Deserialization Improvements * Introduced a new serialization/deserialization format for `DataTypeFixedLengthObject` that uses streamvbyte encoding for large data, improving efficiency for big data columns. The new logic is gated behind BE exec version 10 and includes fallback to the previous format for older versions (`be/src/vec/data_types/data_type_fixed_length_object.cpp`). [[1]](diffhunk://#diff-7d29ab3e43d23db58f2216e23cc131705067e133fb7ab2da72f2e67c725beb48L36-L41) [[2]](diffhunk://#diff-7d29ab3e43d23db58f2216e23cc131705067e133fb7ab2da72f2e67c725beb48L56-R130) [[3]](diffhunk://#diff-7d29ab3e43d23db58f2216e23cc131705067e133fb7ab2da72f2e67c725beb48L84-R144) [[4]](diffhunk://#diff-7d29ab3e43d23db58f2216e23cc131705067e133fb7ab2da72f2e67c725beb48R156-R172) * Updated the calculation of uncompressed serialized bytes to account for the new serialization format and potential streamvbyte compression (`be/src/vec/data_types/data_type_fixed_length_object.cpp`). * Added the `streamvbyte` library include to support the new encoding/decoding logic (`be/src/vec/data_types/data_type_fixed_length_object.cpp`). ### Version Management * Increased `BeExecVersionManager::max_be_exec_version` from 8 to 10, with detailed documentation and warnings about the sensitivity of this field. The new version enables the updated serialization logic (`be/src/agent/be_exec_version_manager.cpp`). * Defined a new constant `USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION = 10` to clearly mark the threshold for the new serialization format (`be/src/agent/be_exec_version_manager.h`). ### Aggregate Function Optimization * Marked `AggregateFunctionCount` and `AggregateFunctionCountNotNullUnary` as trivial by overriding the `is_trivial()` method to return `true`, which may allow for performance optimizations in the aggregation engine (`be/src/vec/aggregate_functions/aggregate_function_count.h`). [[1]](diffhunk://#diff-a5dbb09237f197bffdcbd3bec4fdd089913ec143d96806618c8eeb4c5dbb8cfeR64-R65) [[2]](diffhunk://#diff-a5dbb09237f197bffdcbd3bec4fdd089913ec143d96806618c8eeb4c5dbb8cfeR212-R213)
1 parent 3ad74c0 commit d95d647

13 files changed

+135
-53
lines changed

be/src/agent/be_exec_version_manager.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,22 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
110110
*
111111
* 7: start from doris 3.0.2
112112
* a. window funnel logic change
113-
* b. support const column in serialize/deserialize function: PR #41175
113+
* b. support const column in serialize/deserialize function: PR #41175
114114
*/
115115

116-
const int BeExecVersionManager::max_be_exec_version = 8;
116+
// /////////////////////////////////////////////////////////////////////////////
117+
// ATTN: !!! BE EXEC VERSION IS A VERY SENSITIVE COMPATIBILITY FIELD !!!
118+
// 1. We should avoid abusing be_exec_version, especially not using it to handle
119+
// compatibility issues of functions (use function aliases for that instead).
120+
// 2. Do not fork versions in past releases; all new be exec versions should
121+
// first go into master before entering new release versions.
122+
// !!! DO NOT CHANGE IT UNLESS YOU ARE 100% SURE WHAT YOU ARE DOING !!!
123+
// /////////////////////////////////////////////////////////////////////////////
124+
125+
// 10: start from doris 4.0.3
126+
// a. use new fixed object serialization way.
127+
128+
const int BeExecVersionManager::max_be_exec_version = 10;
117129
const int BeExecVersionManager::min_be_exec_version = 0;
118130
std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {};
119131
std::set<std::string> BeExecVersionManager::_function_restrict_map;

be/src/agent/be_exec_version_manager.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,7 @@
2525

2626
namespace doris {
2727

28-
constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1
29-
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
30-
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
31-
constexpr inline int AGGREGATION_2_1_VERSION =
32-
6; // some aggregation changed the data format after this version
33-
constexpr inline int USE_CONST_SERDE =
34-
8; // support const column in serialize/deserialize function: PR #41175
28+
constexpr inline int USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION = 10;
3529

3630
class BeExecVersionManager {
3731
public:

be/src/vec/aggregate_functions/aggregate_function_count.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class AggregateFunctionCount final
6161

6262
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
6363

64+
bool is_trivial() const override { return true; }
65+
6466
void add(AggregateDataPtr __restrict place, const IColumn**, ssize_t, Arena&) const override {
6567
++data(place).count;
6668
}
@@ -207,6 +209,8 @@ class AggregateFunctionCountNotNullUnary final
207209

208210
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt64>(); }
209211

212+
bool is_trivial() const override { return true; }
213+
210214
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
211215
Arena&) const override {
212216
data(place).count +=

be/src/vec/data_types/data_type_fixed_length_object.cpp

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "vec/data_types/data_type_fixed_length_object.h"
1919

2020
#include <glog/logging.h>
21+
#include <streamvbyte.h>
2122
#include <string.h>
2223

2324
#include <ostream>
@@ -33,12 +34,43 @@ namespace doris::vectorized {
3334

3435
char* DataTypeFixedLengthObject::serialize(const IColumn& column, char* buf,
3536
int be_exec_version) const {
36-
// const flag
37+
if (be_exec_version >= USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION) {
38+
// New serialization with streamvbyte encoding for large data
39+
const auto* data_column = &column;
40+
size_t real_need_copy_num = 0;
41+
buf = serialize_const_flag_and_row_num(&data_column, buf, &real_need_copy_num);
42+
43+
const auto& src_col = assert_cast<const ColumnType&>(*data_column);
44+
DCHECK(src_col.item_size() > 0)
45+
<< "[serialize]item size of DataTypeFixedLengthObject should be greater than 0";
46+
47+
// item size
48+
unaligned_store<size_t>(buf, src_col.item_size());
49+
buf += sizeof(size_t);
50+
51+
auto mem_size = real_need_copy_num * src_col.item_size();
52+
const auto* origin_data = src_col.get_data().data();
53+
54+
// column data
55+
if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) {
56+
memcpy(buf, origin_data, mem_size);
57+
return buf + mem_size;
58+
} else {
59+
// Throw exception if mem_size is large than UINT32_MAX
60+
auto encode_size = streamvbyte_encode(reinterpret_cast<const uint32_t*>(origin_data),
61+
cast_set<UInt32>(upper_int32(mem_size)),
62+
(uint8_t*)(buf + sizeof(size_t)));
63+
unaligned_store<size_t>(buf, encode_size);
64+
buf += sizeof(size_t);
65+
return buf + encode_size;
66+
}
67+
}
68+
69+
// Old serialization: const flag | row num | item size | data (memcpy)
3770
bool is_const_column = is_column_const(column);
3871
unaligned_store<bool>(buf, is_const_column);
3972
buf += sizeof(bool);
4073

41-
// row num
4274
const auto row_num = column.size();
4375
unaligned_store<size_t>(buf, row_num);
4476
buf += sizeof(size_t);
@@ -53,26 +85,49 @@ char* DataTypeFixedLengthObject::serialize(const IColumn& column, char* buf,
5385
DCHECK(src_col.item_size() > 0)
5486
<< "[serialize]item size of DataTypeFixedLengthObject should be greater than 0";
5587

56-
// item size
5788
unaligned_store<size_t>(buf, src_col.item_size());
5889
buf += sizeof(size_t);
59-
// column data
90+
6091
const auto* origin_data = src_col.get_data().data();
6192
memcpy(buf, origin_data, real_need_copy_num * src_col.item_size());
6293
buf += real_need_copy_num * src_col.item_size();
63-
6494
return buf;
6595
}
6696

6797
const char* DataTypeFixedLengthObject::deserialize(const char* buf, MutableColumnPtr* column,
6898
int be_exec_version) const {
69-
//const flag
99+
if (be_exec_version >= USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION) {
100+
// New deserialization with streamvbyte decoding for large data
101+
size_t real_have_saved_num = 0;
102+
buf = deserialize_const_flag_and_row_num(buf, column, &real_have_saved_num);
103+
104+
auto& dst_col = assert_cast<ColumnType&>(*(column->get()));
105+
auto item_size = unaligned_load<size_t>(buf);
106+
buf += sizeof(size_t);
107+
dst_col.set_item_size(item_size);
108+
109+
auto mem_size = real_have_saved_num * item_size;
110+
dst_col.resize(real_have_saved_num);
111+
if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) {
112+
memcpy(dst_col.get_data().data(), buf, mem_size);
113+
buf = buf + mem_size;
114+
} else {
115+
auto encode_size = unaligned_load<size_t>(buf);
116+
buf += sizeof(size_t);
117+
streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(dst_col.get_data().data()),
118+
cast_set<UInt32>(upper_int32(mem_size)));
119+
buf = buf + encode_size;
120+
}
121+
return buf;
122+
}
123+
124+
// Old deserialization
70125
bool is_const_column = unaligned_load<bool>(buf);
71126
buf += sizeof(bool);
72-
//row num
127+
73128
size_t row_num = unaligned_load<size_t>(buf);
74129
buf += sizeof(size_t);
75-
//item size
130+
76131
size_t item_size = unaligned_load<size_t>(buf);
77132
buf += sizeof(size_t);
78133

@@ -81,11 +136,12 @@ const char* DataTypeFixedLengthObject::deserialize(const char* buf, MutableColum
81136

82137
auto& dst_col = static_cast<ColumnType&>(*(column->get()));
83138
dst_col.set_item_size(item_size);
84-
// column data
139+
85140
auto real_copy_num = is_const_column ? 1 : row_num;
86141
dst_col.resize(real_copy_num);
87142
memcpy(dst_col.get_data().data(), buf, real_copy_num * item_size);
88143
buf += real_copy_num * item_size;
144+
89145
if (is_const_column) {
90146
auto const_column = ColumnConst::create((*column)->get_ptr(), row_num);
91147
*column = const_column->get_ptr();
@@ -97,6 +153,23 @@ const char* DataTypeFixedLengthObject::deserialize(const char* buf, MutableColum
97153
// data : item data1 | item data2...
98154
int64_t DataTypeFixedLengthObject::get_uncompressed_serialized_bytes(const IColumn& column,
99155
int be_exec_version) const {
156+
if (be_exec_version >= USE_NEW_FIXED_OBJECT_SERIALIZATION_VERSION) {
157+
// New format size calculation with streamvbyte
158+
auto size = sizeof(bool) + sizeof(size_t) + sizeof(size_t) + sizeof(size_t);
159+
auto real_need_copy_num = is_column_const(column) ? 1 : column.size();
160+
const auto& src_col = assert_cast<const ColumnType&>(column);
161+
auto mem_size = src_col.item_size() * real_need_copy_num;
162+
if (mem_size <= SERIALIZED_MEM_SIZE_LIMIT) {
163+
return size + mem_size;
164+
} else {
165+
// Throw exception if mem_size is large than UINT32_MAX
166+
return size + sizeof(size_t) +
167+
std::max(mem_size, streamvbyte_max_compressedbytes(
168+
cast_set<UInt32>(upper_int32(mem_size))));
169+
}
170+
}
171+
172+
// Old format size calculation
100173
auto size = sizeof(bool) + sizeof(size_t) + sizeof(size_t);
101174
const IColumn* data_column = &column;
102175
if (is_column_const(column)) {

be/test/vec/data_types/data_type_agg_state_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ TEST_P(DataTypeAggStateTest, CreateColumnTest) {
111111
// get_uncompressed_serialized_bytes
112112
ASSERT_EQ(datatype_agg_state_count->get_uncompressed_serialized_bytes(
113113
*column, BeExecVersionManager::get_newest_version()),
114-
25);
114+
33);
115115
}
116116

117117
void insert_data_agg_state(MutableColumns* agg_state_cols, DataTypePtr datatype_agg_state,

be/test/vec/data_types/data_type_datetime_v1_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,9 @@ TEST_F(DataTypeDateTimeV1Test, ser_deser) {
434434
}
435435
}
436436
};
437-
test_func(dt_date, *column_date, USE_CONST_SERDE);
437+
test_func(dt_date, *column_date, BeExecVersionManager::max_be_exec_version);
438438

439-
test_func(dt_datetime, *column_datetime, USE_CONST_SERDE);
439+
test_func(dt_datetime, *column_datetime, BeExecVersionManager::max_be_exec_version);
440440
}
441441
TEST_F(DataTypeDateTimeV1Test, to_string) {
442442
auto test_func = [](auto& dt, const auto& source_column) {

be/test/vec/data_types/data_type_datetime_v2_test.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,9 +759,9 @@ TEST_F(DataTypeDateTimeV2Test, ser_deser) {
759759
}
760760
}
761761
};
762-
test_func(dt_datetime_v2_0, *column_datetime_v2_0, USE_CONST_SERDE);
763-
test_func(dt_datetime_v2_5, *column_datetime_v2_5, USE_CONST_SERDE);
764-
test_func(dt_datetime_v2_6, *column_datetime_v2_6, USE_CONST_SERDE);
762+
test_func(dt_datetime_v2_0, *column_datetime_v2_0, BeExecVersionManager::max_be_exec_version);
763+
test_func(dt_datetime_v2_5, *column_datetime_v2_5, BeExecVersionManager::max_be_exec_version);
764+
test_func(dt_datetime_v2_6, *column_datetime_v2_6, BeExecVersionManager::max_be_exec_version);
765765
}
766766
TEST_F(DataTypeDateTimeV2Test, to_string) {
767767
auto test_func = [](auto& dt, const auto& source_column) {

be/test/vec/data_types/data_type_decimal_test.cpp

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -430,21 +430,21 @@ TEST_F(DataTypeDecimalTest, ser_deser) {
430430
}
431431
}
432432
};
433-
test_func(dt_decimal32_1, *column_decimal32_1, USE_CONST_SERDE);
434-
test_func(dt_decimal32_2, *column_decimal32_2, USE_CONST_SERDE);
435-
test_func(dt_decimal32_3, *column_decimal32_3, USE_CONST_SERDE);
436-
test_func(dt_decimal32_4, *column_decimal32_4, USE_CONST_SERDE);
437-
test_func(dt_decimal32_5, *column_decimal32_5, USE_CONST_SERDE);
438-
test_func(dt_decimal64_1, *column_decimal64_1, USE_CONST_SERDE);
439-
test_func(dt_decimal64_2, *column_decimal64_2, USE_CONST_SERDE);
440-
test_func(dt_decimal64_3, *column_decimal64_3, USE_CONST_SERDE);
441-
test_func(dt_decimal128v2, *column_decimal128_v2, USE_CONST_SERDE);
442-
test_func(dt_decimal128v3_1, *column_decimal128v3_1, USE_CONST_SERDE);
443-
test_func(dt_decimal128v3_2, *column_decimal128v3_2, USE_CONST_SERDE);
444-
test_func(dt_decimal128v3_3, *column_decimal128v3_3, USE_CONST_SERDE);
445-
test_func(dt_decimal256_1, *column_decimal256_1, USE_CONST_SERDE);
446-
test_func(dt_decimal256_2, *column_decimal256_2, USE_CONST_SERDE);
447-
test_func(dt_decimal256_3, *column_decimal256_3, USE_CONST_SERDE);
433+
test_func(dt_decimal32_1, *column_decimal32_1, BeExecVersionManager::max_be_exec_version);
434+
test_func(dt_decimal32_2, *column_decimal32_2, BeExecVersionManager::max_be_exec_version);
435+
test_func(dt_decimal32_3, *column_decimal32_3, BeExecVersionManager::max_be_exec_version);
436+
test_func(dt_decimal32_4, *column_decimal32_4, BeExecVersionManager::max_be_exec_version);
437+
test_func(dt_decimal32_5, *column_decimal32_5, BeExecVersionManager::max_be_exec_version);
438+
test_func(dt_decimal64_1, *column_decimal64_1, BeExecVersionManager::max_be_exec_version);
439+
test_func(dt_decimal64_2, *column_decimal64_2, BeExecVersionManager::max_be_exec_version);
440+
test_func(dt_decimal64_3, *column_decimal64_3, BeExecVersionManager::max_be_exec_version);
441+
test_func(dt_decimal128v2, *column_decimal128_v2, BeExecVersionManager::max_be_exec_version);
442+
test_func(dt_decimal128v3_1, *column_decimal128v3_1, BeExecVersionManager::max_be_exec_version);
443+
test_func(dt_decimal128v3_2, *column_decimal128v3_2, BeExecVersionManager::max_be_exec_version);
444+
test_func(dt_decimal128v3_3, *column_decimal128v3_3, BeExecVersionManager::max_be_exec_version);
445+
test_func(dt_decimal256_1, *column_decimal256_1, BeExecVersionManager::max_be_exec_version);
446+
test_func(dt_decimal256_2, *column_decimal256_2, BeExecVersionManager::max_be_exec_version);
447+
test_func(dt_decimal256_3, *column_decimal256_3, BeExecVersionManager::max_be_exec_version);
448448
}
449449
TEST_F(DataTypeDecimalTest, to_pb_column_meta) {
450450
auto test_func = [](auto dt, PGenericType_TypeId expected_type) {

be/test/vec/data_types/data_type_fixed_length_object_test.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ TEST_P(DataTypeFixedLengthObjectTest, CreateColumnTest) {
103103
// get_uncompressed_serialized_bytes
104104
ASSERT_EQ(datatype_fixed_length->get_uncompressed_serialized_bytes(
105105
*column, BeExecVersionManager::get_newest_version()),
106-
17);
106+
25);
107107
}
108108

109109
void insert_data_fixed_length_data(MutableColumns* fixed_length_cols,
@@ -132,9 +132,8 @@ TEST_P(DataTypeFixedLengthObjectTest, SerializeDeserializeTest) {
132132
auto size = datatype_fixed_length->get_uncompressed_serialized_bytes(
133133
*column, BeExecVersionManager::get_newest_version());
134134
std::unique_ptr<char[]> buf = std::make_unique<char[]>(size);
135-
auto* result = datatype_fixed_length->serialize(*column, buf.get(),
136-
BeExecVersionManager::get_newest_version());
137-
ASSERT_EQ(result, buf.get() + size);
135+
datatype_fixed_length->serialize(*column, buf.get(),
136+
BeExecVersionManager::get_newest_version());
138137

139138
auto column2 = datatype_fixed_length->create_column();
140139
datatype_fixed_length->deserialize(buf.get(), &column2,

be/test/vec/data_types/data_type_jsonb_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ TEST_F(DataTypeJsonbTest, ser_deser) {
230230
}
231231
}
232232
};
233-
test_func(dt_jsonb, *column_jsonb, USE_CONST_SERDE);
233+
test_func(dt_jsonb, *column_jsonb, BeExecVersionManager::max_be_exec_version);
234234
}
235235

236236
TEST_F(DataTypeJsonbTest, simple_func_test) {

0 commit comments

Comments
 (0)