Skip to content

Commit c48244a

Browse files
authored
[feat](seq mapping) uniq table supports multi-stream merging through sequence mapping (#54936)
In some business scenarios, business operations need to update different columns in the same wide table through two or more data streams. For example, one data stream will write in real time to update some fields of this table; another data stream will perform imports on demand to update other columns of this table. During the update process, both data stream jobs need to ensure the order of replacement; and during queries, data from all columns should be queryable. ```shell CREATE TABLE `upsert_test` ( `a` bigint(20) NULL COMMENT "", `b` int(11) NULL COMMENT "", `c` int(11) NULL COMMENT "", `d` int(11) NULL COMMENT "", `e` int(11) NULL COMMENT "", `s1` int(11) NULL COMMENT "", `s2` int(11) NULL COMMENT "" ) ENGINE=OLAP UNIQUE KEY(`a`, `b`) COMMENT "OLAP" DISTRIBUTED BY HASH(`a`, `b`) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "sequence_mapping.s1" = "c,d", "sequence_mapping.s2" = "e" ); MySQL [test]> insert into upsert_test(a, b, c, d, s1) values (1,1,2,2,2); Query OK, 1 row affected (0.15 sec) {'label':'insert_fadb7c3fe6041c6-a51094f635017b12', 'status':'VISIBLE', 'txnId':'2'} MySQL [test]> insert into upsert_test(a, b, c, d, s1) values (1,1,1,1,1); Query OK, 1 row affected (0.07 sec) {'label':'insert_1f3d1d5eb28447fe-889427c9da075c11', 'status':'VISIBLE', 'txnId':'3'} MySQL [test]> select * from upsert_test; +------+------+------+------+------+------+------+ | a | b | c | d | e | s1 | s2 | +------+------+------+------+------+------+------+ | 1 | 1 | 2 | 2 | NULL | 2 | NULL | +------+------+------+------+------+------+------+ MySQL [test]> insert into upsert_test(a, b, e, s2) values (1,1,2,2); Query OK, 1 row affected (0.05 sec) {'label':'insert_3b9614ce9a4e4dc3-97bbbb9b881c24aa', 'status':'VISIBLE', 'txnId':'4'} MySQL [test]> select * from upsert_test; +------+------+------+------+------+------+------+ | a | b | c | d | e | s1 | s2 | +------+------+------+------+------+------+------+ | 1 | 1 | 2 | 2 | 2 | 2 | 2 | +------+------+------+------+------+------+------+ 1 row in set (0.01 sec) ``` documentation: apache/doris-website#2954
1 parent 13ea84d commit c48244a

File tree

53 files changed

+2825
-27
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2825
-27
lines changed

be/src/cloud/pb_convert.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
384384
if (in.has_binary_plain_encoding_default_impl()) {
385385
out->set_binary_plain_encoding_default_impl(in.binary_plain_encoding_default_impl());
386386
}
387+
if (in.has_seq_map()) {
388+
out->mutable_seq_map()->CopyFrom(in.seq_map());
389+
}
387390

388391
if (in.has___split_schema()) {
389392
out->mutable___split_schema()->CopyFrom(in.__split_schema());
@@ -428,6 +431,9 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
428431
if (in.has_binary_plain_encoding_default_impl()) {
429432
out->set_binary_plain_encoding_default_impl(in.binary_plain_encoding_default_impl());
430433
}
434+
if (in.has_seq_map()) {
435+
out->mutable_seq_map()->CopyFrom(in.seq_map());
436+
}
431437

432438
if (in.has___split_schema()) {
433439
out->mutable___split_schema()->CopyFrom(in.__split_schema());
@@ -485,6 +491,9 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
485491
if (in.has_binary_plain_encoding_default_impl()) {
486492
out->set_binary_plain_encoding_default_impl(in.binary_plain_encoding_default_impl());
487493
}
494+
if (in.has_seq_map()) {
495+
out->mutable_seq_map()->CopyFrom(in.seq_map());
496+
}
488497

489498
if (in.has___split_schema()) {
490499
out->mutable___split_schema()->CopyFrom(in.__split_schema());
@@ -530,6 +539,9 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
530539
if (in.has_binary_plain_encoding_default_impl()) {
531540
out->set_binary_plain_encoding_default_impl(in.binary_plain_encoding_default_impl());
532541
}
542+
if (in.has_seq_map()) {
543+
out->mutable_seq_map()->CopyFrom(in.seq_map());
544+
}
533545

534546
if (in.has___split_schema()) {
535547
out->mutable___split_schema()->CopyFrom(in.__split_schema());

be/src/olap/compaction.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ Status Compaction::merge_input_rowsets() {
204204
{
205205
SCOPED_TIMER(_merge_rowsets_latency_timer);
206206
// 1. Merge segment files and write bkd inverted index
207-
if (_is_vertical) {
207+
// TODO implement vertical compaction for seq map
208+
if (_is_vertical && !_tablet->tablet_schema()->has_seq_map()) {
208209
if (!_tablet->tablet_schema()->cluster_key_uids().empty()) {
209210
RETURN_IF_ERROR(update_delete_bitmap());
210211
}

be/src/olap/memtable.cpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,45 @@ Status MemTable::insert(const vectorized::Block* input_block,
249249
return Status::OK();
250250
}
251251

252+
void MemTable::_aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block,
253+
RowInBlock* src_row, RowInBlock* dst_row) {
254+
// for each mapping replace value columns according to the sequence column compare result
255+
// for example: a b c d s1 s2 (key:a , s1=>[b,c], s2=>[d])
256+
// src row: 1 4 5 6 8 9
257+
// dst row: 1 2 3 4 7 10
258+
// after aggregate
259+
// dst row: 1 4 5 4 8 10 (b,c,s1 will be replaced, d,s2)
260+
const auto& seq_map = _tablet_schema->seq_col_idx_to_value_cols_idx();
261+
for (const auto& it : seq_map) {
262+
auto sequence = it.first;
263+
auto* sequence_col_ptr = mutable_block.mutable_columns()[sequence].get();
264+
auto res = sequence_col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos,
265+
*sequence_col_ptr, -1);
266+
if (res > 0) {
267+
continue;
268+
}
269+
for (auto cid : it.second) {
270+
if (cid < _num_columns) {
271+
auto* col_ptr = mutable_block.mutable_columns()[cid].get();
272+
_agg_functions[cid]->add(dst_row->agg_places(cid),
273+
const_cast<const doris::vectorized::IColumn**>(&col_ptr),
274+
src_row->_row_pos, _arena);
275+
}
276+
}
277+
if (sequence < _num_columns) {
278+
_agg_functions[sequence]->add(
279+
dst_row->agg_places(sequence),
280+
const_cast<const doris::vectorized::IColumn**>(&sequence_col_ptr),
281+
src_row->_row_pos, _arena);
282+
// must use replace column instead of update row_pos
283+
// because one row may have multi sequence column
284+
// and agg function add method won't change the real column value
285+
sequence_col_ptr->replace_column_data(*sequence_col_ptr, src_row->_row_pos,
286+
dst_row->_row_pos);
287+
}
288+
}
289+
}
290+
252291
template <bool has_skip_bitmap_col>
253292
void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block,
254293
RowInBlock* src_row, RowInBlock* dst_row) {
@@ -509,7 +548,13 @@ void MemTable::_aggregate() {
509548
_init_row_for_agg(prev_row, mutable_block);
510549
}
511550
_stat.merged_rows++;
512-
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row);
551+
if (_tablet_schema->has_seq_map()) {
552+
_aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row);
553+
} else {
554+
_aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row,
555+
prev_row);
556+
}
557+
513558
// Clean up aggregation state of the merged row to avoid memory leak
514559
if (cur_row) {
515560
_clear_row_agg(cur_row);

be/src/olap/memtable.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ class MemTable {
213213
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
214214
RowInBlock* row_in_skiplist);
215215

216+
void _aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block,
217+
RowInBlock* new_row, RowInBlock* row_in_skiplist);
218+
216219
// Used to wrapped by to_block to do exception handle logic
217220
Status _to_block(std::unique_ptr<vectorized::Block>* res);
218221

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
186186
}
187187

188188
if (_should_push_down_value_predicates()) {
189-
if (_read_context->value_predicates != nullptr) {
189+
// sequence mapping currently only support merge on read, so can not push down value predicates
190+
if (_read_context->value_predicates != nullptr &&
191+
!read_context->tablet_schema->has_seq_map()) {
190192
_read_options.column_predicates.insert(_read_options.column_predicates.end(),
191193
_read_context->value_predicates->begin(),
192194
_read_context->value_predicates->end());

be/src/olap/schema.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class Schema {
8080
columns.push_back(std::make_shared<TabletColumn>(column));
8181
}
8282
_delete_sign_idx = tablet_schema->delete_sign_idx();
83-
if (tablet_schema->has_sequence_col()) {
83+
if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
8484
_has_sequence_col = true;
8585
}
8686
_init(columns, col_ids, num_key_columns);

be/src/olap/tablet_meta.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,16 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
191191
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
192192
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
193193
schema->set_sequence_col_idx(tablet_schema.sequence_col_idx);
194+
auto p_seq_map = schema->mutable_seq_map(); // ColumnGroupsPB
195+
196+
for (auto& it : tablet_schema.seq_map) { // std::vector< ::doris::TColumnGroup>
197+
uint32_t key = it.sequence_column;
198+
ColumnGroupPB* cg_pb = p_seq_map->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
199+
cg_pb->set_sequence_column(key);
200+
for (auto v : it.columns_in_group) {
201+
cg_pb->add_columns_in_group(v);
202+
}
203+
}
194204
switch (tablet_schema.keys_type) {
195205
case TKeysType::DUP_KEYS:
196206
schema->set_keys_type(KeysType::DUP_KEYS);

be/src/olap/tablet_schema.cpp

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,35 @@ void TabletSchema::append_column(TabletColumn column, ColumnType col_type) {
999999
}
10001000
_num_columns++;
10011001
_num_virtual_columns = _vir_col_idx_to_unique_id.size();
1002+
// generate column index mapping for seq map
1003+
if (_seq_col_uid_to_value_cols_uid.contains(column.unique_id())) {
1004+
const auto seq_idx = _field_uniqueid_to_index[column.unique_id()];
1005+
if (!_seq_col_idx_to_value_cols_idx.contains(seq_idx)) {
1006+
_seq_col_idx_to_value_cols_idx[seq_idx] = {};
1007+
}
1008+
}
1009+
if (_value_col_uid_to_seq_col_uid.contains(column.unique_id())) {
1010+
const auto seq_uid = _value_col_uid_to_seq_col_uid[column.unique_id()];
1011+
if (_field_uniqueid_to_index.contains(seq_uid)) {
1012+
bool all_uid_index_found = true;
1013+
std::vector<int32_t> value_cols_index;
1014+
for (const auto value_col_uid : _seq_col_uid_to_value_cols_uid[seq_uid]) {
1015+
if (!_field_uniqueid_to_index.contains(value_col_uid)) {
1016+
all_uid_index_found = false;
1017+
break;
1018+
}
1019+
value_cols_index.push_back(_field_uniqueid_to_index[value_col_uid]);
1020+
}
1021+
if (all_uid_index_found) {
1022+
const auto seq_idx = _field_uniqueid_to_index[seq_uid];
1023+
for (const auto col_idx : value_cols_index) {
1024+
_seq_col_idx_to_value_cols_idx[seq_idx].push_back(col_idx);
1025+
_value_col_idx_to_seq_col_idx[col_idx] = seq_idx;
1026+
}
1027+
_value_col_idx_to_seq_col_idx[seq_idx] = seq_idx;
1028+
}
1029+
}
1030+
}
10021031
}
10031032

10041033
void TabletSchema::append_index(TabletIndex&& index) {
@@ -1060,6 +1089,8 @@ void TabletSchema::clear_columns() {
10601089
_num_variant_columns = 0;
10611090
_num_null_columns = 0;
10621091
_num_key_columns = 0;
1092+
_seq_col_idx_to_value_cols_idx.clear();
1093+
_value_col_idx_to_seq_col_idx.clear();
10631094
_cols.clear();
10641095
}
10651096

@@ -1167,6 +1198,64 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
11671198
_storage_page_size = schema.storage_page_size();
11681199
_storage_dict_page_size = schema.storage_dict_page_size();
11691200
_schema_version = schema.schema_version();
1201+
if (schema.has_seq_map()) {
1202+
auto column_groups_pb = schema.seq_map();
1203+
_seq_col_uid_to_value_cols_uid.clear();
1204+
_value_col_uid_to_seq_col_uid.clear();
1205+
_seq_col_idx_to_value_cols_idx.clear();
1206+
_value_col_idx_to_seq_col_idx.clear();
1207+
/*
1208+
* ColumnGroupsPB is a list of cg_pb, and
1209+
* ColumnGroupsPB do not have begin() or end() method.
1210+
* we must use for(i=0;i<xx;i++) loop
1211+
*/
1212+
for (int i = 0; i < column_groups_pb.cg_size(); i++) {
1213+
ColumnGroupPB cg_pb = column_groups_pb.cg(i);
1214+
uint32_t key_uid = cg_pb.sequence_column();
1215+
auto found = _field_uniqueid_to_index.find(key_uid);
1216+
DCHECK(found != _field_uniqueid_to_index.end())
1217+
<< "could not find sequence col with unique id = " << key_uid
1218+
<< " table_id=" << _table_id;
1219+
int32_t seq_index = found->second;
1220+
_seq_col_uid_to_value_cols_uid[key_uid] = {};
1221+
_seq_col_idx_to_value_cols_idx[seq_index] = {};
1222+
for (auto val_uid : cg_pb.columns_in_group()) {
1223+
_seq_col_uid_to_value_cols_uid[key_uid].push_back(val_uid);
1224+
found = _field_uniqueid_to_index.find(val_uid);
1225+
DCHECK(found != _field_uniqueid_to_index.end())
1226+
<< "could not find value col with unique id = " << key_uid
1227+
<< " table_id=" << _table_id;
1228+
int32_t val_index = found->second;
1229+
_seq_col_idx_to_value_cols_idx[seq_index].push_back(val_index);
1230+
}
1231+
}
1232+
1233+
if (!_seq_col_uid_to_value_cols_uid.empty()) {
1234+
/*
1235+
|** KEY **| ** VALUE ** |
1236+
------------------------------------
1237+
|** KEY **| CDE is value| sequence|
1238+
|----|----|----|----|----|----|----|
1239+
A B C D E S1 S2
1240+
0 1 2 3 4 5 6
1241+
for example: _seq_map is {5:{2,3}, 6:{4}}
1242+
then, _value_to_seq = {2:5,3:5,5:5,4:6,6:6}
1243+
*/
1244+
for (auto& [seq_uid, cols_uid] : _seq_col_uid_to_value_cols_uid) {
1245+
for (auto col_uid : cols_uid) {
1246+
_value_col_uid_to_seq_col_uid[col_uid] = seq_uid;
1247+
}
1248+
_value_col_uid_to_seq_col_uid[seq_uid] = seq_uid;
1249+
}
1250+
1251+
for (auto& [seq_idx, value_cols_idx] : _seq_col_idx_to_value_cols_idx) {
1252+
for (auto col_idx : value_cols_idx) {
1253+
_value_col_idx_to_seq_col_idx[col_idx] = seq_idx;
1254+
}
1255+
_value_col_idx_to_seq_col_idx[seq_idx] = seq_idx;
1256+
}
1257+
}
1258+
}
11701259
// Default to V1 inverted index storage format for backward compatibility if not specified in schema.
11711260
if (!schema.has_inverted_index_storage_format()) {
11721261
_inverted_index_storage_format = InvertedIndexStorageFormatPB::V1;
@@ -1463,6 +1552,15 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
14631552
tablet_schema_pb->set_integer_type_default_use_plain_encoding(
14641553
_integer_type_default_use_plain_encoding);
14651554
tablet_schema_pb->set_binary_plain_encoding_default_impl(_binary_plain_encoding_default_impl);
1555+
auto column_groups_pb = tablet_schema_pb->mutable_seq_map();
1556+
for (const auto& it : _seq_col_uid_to_value_cols_uid) {
1557+
uint32_t key = it.first;
1558+
ColumnGroupPB* cg_pb = column_groups_pb->add_cg(); // ColumnGroupPB {key: {v1, v2, v3}}
1559+
cg_pb->set_sequence_column(key);
1560+
for (auto v : it.second) {
1561+
cg_pb->add_columns_in_group(v);
1562+
}
1563+
}
14661564
}
14671565

14681566
size_t TabletSchema::row_size() const {

be/src/olap/tablet_schema.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,16 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
678678
}
679679
return 0;
680680
}
681+
const std::unordered_map<uint32_t, std::vector<uint32_t>>& seq_col_idx_to_value_cols_idx()
682+
const {
683+
return _seq_col_idx_to_value_cols_idx;
684+
}
685+
686+
bool has_seq_map() const { return !_seq_col_idx_to_value_cols_idx.empty(); }
687+
688+
const std::unordered_map<uint32_t, uint32_t>& value_col_idx_to_seq_col_idx() const {
689+
return _value_col_idx_to_seq_col_idx;
690+
}
681691

682692
void add_pruned_columns_data_type(int32_t col_unique_id, vectorized::DataTypePtr data_type) {
683693
_pruned_columns_data_type[col_unique_id] = std::move(data_type);
@@ -798,6 +808,14 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
798808
bool _integer_type_default_use_plain_encoding {false};
799809
BinaryPlainEncodingTypePB _binary_plain_encoding_default_impl {
800810
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1};
811+
// Sequence column unique id mapping to value columns unique id
812+
std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_col_uid_to_value_cols_uid;
813+
// Value column unique id mapping to sequence column unique id(also map sequence column it self)
814+
std::unordered_map<uint32_t, uint32_t> _value_col_uid_to_seq_col_uid;
815+
// Sequence column index mapping to value column index
816+
std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_col_idx_to_value_cols_idx;
817+
// Value column index mapping to sequence column index(also map sequence column it self)
818+
std::unordered_map<uint32_t, uint32_t> _value_col_idx_to_seq_col_idx;
801819
};
802820

803821
bool operator==(const TabletSchema& a, const TabletSchema& b);

be/src/vec/exec/scan/olap_scanner.cpp

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ Status OlapScanner::_init_tablet_reader_params(
424424
_tablet_reader_params.return_columns.push_back(index);
425425
}
426426
// expand the sequence column
427-
if (tablet_schema->has_sequence_col()) {
427+
if (tablet_schema->has_sequence_col() || tablet_schema->has_seq_map()) {
428428
bool has_replace_col = false;
429429
for (auto col : _return_columns) {
430430
if (tablet_schema->column(col).aggregation() ==
@@ -434,10 +434,32 @@ Status OlapScanner::_init_tablet_reader_params(
434434
}
435435
}
436436
if (auto sequence_col_idx = tablet_schema->sequence_col_idx();
437-
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
438-
sequence_col_idx) == _return_columns.end()) {
437+
has_replace_col && tablet_schema->has_sequence_col() &&
438+
std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) ==
439+
_return_columns.end()) {
439440
_tablet_reader_params.return_columns.push_back(sequence_col_idx);
440441
}
442+
if (has_replace_col) {
443+
const auto& val_to_seq = tablet_schema->value_col_idx_to_seq_col_idx();
444+
std::set<uint32_t> return_seq_columns;
445+
446+
for (auto col : _tablet_reader_params.return_columns) {
447+
// we need to add the necessary sequence column in _return_columns, and
448+
// Avoid adding the same seq column twice
449+
const auto val_iter = val_to_seq.find(col);
450+
if (val_iter != val_to_seq.end()) {
451+
auto seq = val_iter->second;
452+
if (std::find(_tablet_reader_params.return_columns.begin(),
453+
_tablet_reader_params.return_columns.end(),
454+
seq) == _tablet_reader_params.return_columns.end()) {
455+
return_seq_columns.insert(seq);
456+
}
457+
}
458+
}
459+
_tablet_reader_params.return_columns.insert(
460+
std::end(_tablet_reader_params.return_columns),
461+
std::begin(return_seq_columns), std::end(return_seq_columns));
462+
}
441463
}
442464
}
443465

0 commit comments

Comments
 (0)