Skip to content

Commit c31d2ec

Browse files
committed
[feature](iceberg) Implements iceberg update & delete & merge into.
1 parent a676ae4 commit c31d2ec

File tree

143 files changed

+15353
-3240
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+15353
-3240
lines changed

be/CMakeLists.txt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -778,9 +778,11 @@ endif ()
778778

779779
if (MAKE_TEST)
780780
add_compile_options(-DGTEST_USE_OWN_TR1_TUPLE=0)
781-
# Only add GCC-style coverage when using GCC compiler
782-
# to avoid duplicate symbol errors (e.g., __gcov_fork, __gcov_reset)
783-
# between libgcov.a and libclang_rt.profile-x86_64.a
781+
# Only add GCC-style coverage when using GCC. When using Clang, do NOT add
782+
# -fprofile-arcs -ftest-coverage at all, otherwise both libgcov.a (from
783+
# toolchain or thirdparty) and libclang_rt.profile (from Clang's -fprofile-*)
784+
# get linked and cause duplicate symbol errors (__gcov_fork, __gcov_reset).
785+
# For Clang + MAKE_TEST, use -DENABLE_CLANG_COVERAGE=ON for coverage.
784786
if (COMPILER_GCC)
785787
add_compile_options(-fprofile-arcs -ftest-coverage)
786788
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-coverage")

be/src/common/consts.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
3030
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
3131
const std::string PARTIAL_UPDATE_AUTO_INC_COL = "__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
3232
const std::string VIRTUAL_COLUMN_PREFIX = "__DORIS_VIRTUAL_COL__";
33+
const std::string ICEBERG_ROWID_COL = "__DORIS_ICEBERG_ROWID_COL__";
3334

3435
/// The maximum precision representable by a 4-byte decimal (Decimal4Value)
3536
constexpr int MAX_DECIMAL32_PRECISION = 9;

be/src/exec/exchange/exchange_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,12 @@ Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool e
128128
}
129129

130130
Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) {
131-
auto rows = block->rows();
132131
{
133132
SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
134133
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block));
135134
}
136135
{
136+
auto rows = block->rows();
137137
SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
138138
const auto& channel_ids = _partitioner->get_channel_ids();
139139

be/src/exec/operator/exchange_sink_operator.cpp

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "exec/sink/scale_writer_partitioning_exchanger.hpp"
4242
#include "exec/sink/tablet_sink_hash_partitioner.h"
4343
#include "exprs/vexpr.h"
44+
#include "format/transformer/merge_partitioner.h"
4445
#include "runtime/runtime_profile.h"
4546
#include "util/uid_util.h"
4647

@@ -175,6 +176,20 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
175176
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
176177
custom_profile()->add_info_string(
177178
"Partitioner", fmt::format("ScaleWriterPartitioner({})", _partition_count));
179+
} else if (_part_type == TPartitionType::MERGE_PARTITIONED) {
180+
if (!p._has_merge_partition_info) {
181+
return Status::InternalError("Merge partition info is missing");
182+
}
183+
_partition_count = channels.size();
184+
const bool use_new_shuffle_hash_method =
185+
_state->query_options().__isset.enable_new_shuffle_hash_method &&
186+
_state->query_options().enable_new_shuffle_hash_method;
187+
_partitioner = std::make_unique<MergePartitioner>(_partition_count, p._merge_partition_info,
188+
use_new_shuffle_hash_method);
189+
RETURN_IF_ERROR(_partitioner->init({}));
190+
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
191+
custom_profile()->add_info_string("Partitioner",
192+
fmt::format("MergePartitioner({})", _partition_count));
178193
}
179194

180195
return Status::OK();
@@ -258,7 +273,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
258273
if (_part_type == TPartitionType::HASH_PARTITIONED ||
259274
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
260275
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
261-
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
276+
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
277+
_part_type == TPartitionType::MERGE_PARTITIONED) {
262278
RETURN_IF_ERROR(_partitioner->open(state));
263279
}
264280
return Status::OK();
@@ -301,13 +317,18 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
301317
sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
302318
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
303319
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
304-
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
320+
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
321+
sink.output_partition.type == TPartitionType::MERGE_PARTITIONED);
305322
#endif
306323
_name = "ExchangeSinkOperatorX";
307324
_pool = std::make_shared<ObjectPool>();
308325
if (sink.__isset.output_tuple_id) {
309326
_output_tuple_id = sink.output_tuple_id;
310327
}
328+
if (sink.output_partition.__isset.merge_partition_info) {
329+
_merge_partition_info = sink.output_partition.merge_partition_info;
330+
_has_merge_partition_info = true;
331+
}
311332

312333
if (_part_type != TPartitionType::UNPARTITIONED) {
313334
// if the destinations only one dest, we need to use broadcast
@@ -507,8 +528,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, Block* block, bool eos)
507528
(local_state.current_channel_idx + 1) % local_state.channels.size();
508529
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
509530
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
531+
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
510532
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
511-
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
533+
_part_type == TPartitionType::MERGE_PARTITIONED) {
512534
RETURN_IF_ERROR(local_state._writer->write(state, block, eos));
513535
} else if (_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) {
514536
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.

be/src/exec/operator/exchange_sink_operator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ class ExchangeSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<Exchan
244244
RuntimeState* _state = nullptr;
245245

246246
const std::vector<TExpr> _texprs;
247+
TMergePartitionInfo _merge_partition_info;
248+
bool _has_merge_partition_info = false;
247249

248250
const RowDescriptor& _row_desc;
249251
TTupleId _output_tuple_id = -1;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "iceberg_delete_sink_operator.h"
19+
20+
#include "common/status.h"
21+
22+
namespace doris {
23+
#include "common/compile_check_begin.h"
24+
Status IcebergDeleteSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
25+
RETURN_IF_ERROR(Base::init(state, info));
26+
SCOPED_TIMER(exec_time_counter());
27+
SCOPED_TIMER(_init_timer);
28+
auto& p = _parent->cast<Parent>();
29+
RETURN_IF_ERROR(_writer->init_properties(p._pool));
30+
return Status::OK();
31+
}
32+
33+
} // namespace doris
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "exec/sink/viceberg_delete_sink.h"
21+
#include "operator.h"
22+
23+
namespace doris {
24+
#include "common/compile_check_begin.h"
25+
26+
class IcebergDeleteSinkOperatorX;
27+
28+
class IcebergDeleteSinkLocalState final
29+
: public AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX> {
30+
public:
31+
using Base = AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
32+
using Parent = IcebergDeleteSinkOperatorX;
33+
ENABLE_FACTORY_CREATOR(IcebergDeleteSinkLocalState);
34+
IcebergDeleteSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
35+
: Base(parent, state) {};
36+
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
37+
Status open(RuntimeState* state) override {
38+
SCOPED_TIMER(exec_time_counter());
39+
SCOPED_TIMER(_open_timer);
40+
return Base::open(state);
41+
}
42+
friend class IcebergDeleteSinkOperatorX;
43+
};
44+
45+
class IcebergDeleteSinkOperatorX final : public DataSinkOperatorX<IcebergDeleteSinkLocalState> {
46+
public:
47+
using Base = DataSinkOperatorX<IcebergDeleteSinkLocalState>;
48+
IcebergDeleteSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
49+
const std::vector<TExpr>& t_output_expr)
50+
: Base(operator_id, 0, 0),
51+
_row_desc(row_desc),
52+
_t_output_expr(t_output_expr),
53+
_pool(pool) {};
54+
55+
Status init(const TDataSink& thrift_sink) override {
56+
RETURN_IF_ERROR(Base::init(thrift_sink));
57+
// From the thrift expressions create the real exprs.
58+
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
59+
return Status::OK();
60+
}
61+
62+
Status prepare(RuntimeState* state) override {
63+
RETURN_IF_ERROR(Base::prepare(state));
64+
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
65+
return VExpr::open(_output_vexpr_ctxs, state);
66+
}
67+
68+
Status sink(RuntimeState* state, Block* in_block, bool eos) override {
69+
auto& local_state = get_local_state(state);
70+
SCOPED_TIMER(local_state.exec_time_counter());
71+
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
72+
return local_state.sink(state, in_block, eos);
73+
}
74+
75+
private:
76+
friend class IcebergDeleteSinkLocalState;
77+
template <typename Writer, typename Parent>
78+
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
79+
friend class AsyncWriterSink;
80+
const RowDescriptor& _row_desc;
81+
VExprContextSPtrs _output_vexpr_ctxs;
82+
const std::vector<TExpr>& _t_output_expr;
83+
ObjectPool* _pool = nullptr;
84+
};
85+
86+
#include "common/compile_check_end.h"
87+
} // namespace doris
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "iceberg_merge_sink_operator.h"
19+
20+
#include "common/status.h"
21+
22+
namespace doris {
23+
#include "common/compile_check_begin.h"
24+
25+
Status IcebergMergeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
26+
RETURN_IF_ERROR(Base::init(state, info));
27+
SCOPED_TIMER(exec_time_counter());
28+
SCOPED_TIMER(_init_timer);
29+
auto& p = _parent->cast<Parent>();
30+
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
31+
return Status::OK();
32+
}
33+
34+
#include "common/compile_check_end.h"
35+
} // namespace doris
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "exec/sink/viceberg_merge_sink.h"
21+
#include "operator.h"
22+
23+
namespace doris {
24+
#include "common/compile_check_begin.h"
25+
26+
class IcebergMergeSinkOperatorX;
27+
28+
class IcebergMergeSinkLocalState final
29+
: public AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX> {
30+
public:
31+
using Base = AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX>;
32+
using Parent = IcebergMergeSinkOperatorX;
33+
ENABLE_FACTORY_CREATOR(IcebergMergeSinkLocalState);
34+
IcebergMergeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
35+
: Base(parent, state) {};
36+
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
37+
Status open(RuntimeState* state) override {
38+
SCOPED_TIMER(exec_time_counter());
39+
SCOPED_TIMER(_open_timer);
40+
return Base::open(state);
41+
}
42+
friend class IcebergMergeSinkOperatorX;
43+
};
44+
45+
class IcebergMergeSinkOperatorX final : public DataSinkOperatorX<IcebergMergeSinkLocalState> {
46+
public:
47+
using Base = DataSinkOperatorX<IcebergMergeSinkLocalState>;
48+
IcebergMergeSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
49+
const std::vector<TExpr>& t_output_expr)
50+
: Base(operator_id, 0, 0),
51+
_row_desc(row_desc),
52+
_t_output_expr(t_output_expr),
53+
_pool(pool) {};
54+
55+
Status init(const TDataSink& thrift_sink) override {
56+
RETURN_IF_ERROR(Base::init(thrift_sink));
57+
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
58+
return Status::OK();
59+
}
60+
61+
Status prepare(RuntimeState* state) override {
62+
RETURN_IF_ERROR(Base::prepare(state));
63+
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
64+
return VExpr::open(_output_vexpr_ctxs, state);
65+
}
66+
67+
Status sink(RuntimeState* state, Block* in_block, bool eos) override {
68+
auto& local_state = get_local_state(state);
69+
SCOPED_TIMER(local_state.exec_time_counter());
70+
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
71+
return local_state.sink(state, in_block, eos);
72+
}
73+
74+
private:
75+
friend class IcebergMergeSinkLocalState;
76+
template <typename Writer, typename Parent>
77+
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
78+
friend class AsyncWriterSink;
79+
const RowDescriptor& _row_desc;
80+
VExprContextSPtrs _output_vexpr_ctxs;
81+
const std::vector<TExpr>& _t_output_expr;
82+
ObjectPool* _pool = nullptr;
83+
};
84+
85+
#include "common/compile_check_end.h"
86+
} // namespace doris

be/src/exec/operator/operator.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
#include "exec/operator/hashjoin_build_sink.h"
4343
#include "exec/operator/hashjoin_probe_operator.h"
4444
#include "exec/operator/hive_table_sink_operator.h"
45+
#include "exec/operator/iceberg_delete_sink_operator.h"
46+
#include "exec/operator/iceberg_merge_sink_operator.h"
4547
#include "exec/operator/iceberg_table_sink_operator.h"
4648
#include "exec/operator/jdbc_scan_operator.h"
4749
#include "exec/operator/jdbc_table_sink_operator.h"
@@ -814,6 +816,8 @@ DECLARE_OPERATOR(HiveTableSinkLocalState)
814816
DECLARE_OPERATOR(TVFTableSinkLocalState)
815817
DECLARE_OPERATOR(IcebergTableSinkLocalState)
816818
DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
819+
DECLARE_OPERATOR(IcebergDeleteSinkLocalState)
820+
DECLARE_OPERATOR(IcebergMergeSinkLocalState)
817821
DECLARE_OPERATOR(MCTableSinkLocalState)
818822
DECLARE_OPERATOR(AnalyticSinkLocalState)
819823
DECLARE_OPERATOR(BlackholeSinkLocalState)
@@ -934,6 +938,8 @@ template class AsyncWriterSink<doris::VTabletWriterV2, OlapTableSinkV2OperatorX>
934938
template class AsyncWriterSink<doris::VHiveTableWriter, HiveTableSinkOperatorX>;
935939
template class AsyncWriterSink<doris::VIcebergTableWriter, IcebergTableSinkOperatorX>;
936940
template class AsyncWriterSink<doris::VIcebergTableWriter, SpillIcebergTableSinkOperatorX>;
941+
template class AsyncWriterSink<doris::VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
942+
template class AsyncWriterSink<doris::VIcebergMergeSink, IcebergMergeSinkOperatorX>;
937943
template class AsyncWriterSink<doris::VMCTableWriter, MCTableSinkOperatorX>;
938944
template class AsyncWriterSink<doris::VTVFTableWriter, TVFTableSinkOperatorX>;
939945

0 commit comments

Comments
 (0)