Skip to content

Commit 7f77cc1

Browse files
committed
update.
1 parent f84fe0d commit 7f77cc1

File tree

89 files changed

+4276
-3417
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+4276
-3417
lines changed

be/src/exec/exchange/exchange_writer.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool e
128128
}
129129

130130
Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) {
131-
auto rows = block->rows();
132131
{
133132
SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
134133
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block));

be/src/exec/operator/exchange_sink_operator.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@
4141
#include "exec/sink/scale_writer_partitioning_exchanger.hpp"
4242
#include "exec/sink/tablet_sink_hash_partitioner.h"
4343
#include "exprs/vexpr.h"
44+
#include "format/transformer/merge_partitioner.h"
4445
#include "runtime/runtime_profile.h"
4546
#include "util/uid_util.h"
4647

47-
48-
4948
namespace doris {
5049
#include "common/compile_check_begin.h"
5150
bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
@@ -185,8 +184,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
185184
const bool use_new_shuffle_hash_method =
186185
_state->query_options().__isset.enable_new_shuffle_hash_method &&
187186
_state->query_options().enable_new_shuffle_hash_method;
188-
_partitioner = std::make_unique<vectorized::MergePartitioner>(
189-
_partition_count, p._merge_partition_info, use_new_shuffle_hash_method);
187+
_partitioner = std::make_unique<MergePartitioner>(_partition_count, p._merge_partition_info,
188+
use_new_shuffle_hash_method);
190189
RETURN_IF_ERROR(_partitioner->init({}));
191190
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
192191
custom_profile()->add_info_string("Partitioner",

be/src/pipeline/exec/iceberg_delete_sink_operator.cpp renamed to be/src/exec/operator/iceberg_delete_sink_operator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include "common/status.h"
2121

22-
namespace doris::pipeline {
22+
namespace doris {
2323
#include "common/compile_check_begin.h"
2424
Status IcebergDeleteSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
2525
RETURN_IF_ERROR(Base::init(state, info));
@@ -30,4 +30,4 @@ Status IcebergDeleteSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
3030
return Status::OK();
3131
}
3232

33-
} // namespace doris::pipeline
33+
} // namespace doris

be/src/pipeline/exec/iceberg_delete_sink_operator.h renamed to be/src/exec/operator/iceberg_delete_sink_operator.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
#pragma once
1919

20+
#include "exec/sink/viceberg_delete_sink.h"
2021
#include "operator.h"
21-
#include "vec/sink/viceberg_delete_sink.h"
2222

23-
namespace doris::pipeline {
23+
namespace doris {
2424
#include "common/compile_check_begin.h"
2525

2626
class IcebergDeleteSinkOperatorX;
2727

2828
class IcebergDeleteSinkLocalState final
29-
: public AsyncWriterSink<vectorized::VIcebergDeleteSink, IcebergDeleteSinkOperatorX> {
29+
: public AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX> {
3030
public:
31-
using Base = AsyncWriterSink<vectorized::VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
31+
using Base = AsyncWriterSink<VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
3232
using Parent = IcebergDeleteSinkOperatorX;
3333
ENABLE_FACTORY_CREATOR(IcebergDeleteSinkLocalState);
3434
IcebergDeleteSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
@@ -55,17 +55,17 @@ class IcebergDeleteSinkOperatorX final : public DataSinkOperatorX<IcebergDeleteS
5555
Status init(const TDataSink& thrift_sink) override {
5656
RETURN_IF_ERROR(Base::init(thrift_sink));
5757
// From the thrift expressions create the real exprs.
58-
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
58+
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
5959
return Status::OK();
6060
}
6161

6262
Status prepare(RuntimeState* state) override {
6363
RETURN_IF_ERROR(Base::prepare(state));
64-
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
65-
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
64+
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
65+
return VExpr::open(_output_vexpr_ctxs, state);
6666
}
6767

68-
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override {
68+
Status sink(RuntimeState* state, Block* in_block, bool eos) override {
6969
auto& local_state = get_local_state(state);
7070
SCOPED_TIMER(local_state.exec_time_counter());
7171
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
@@ -75,13 +75,13 @@ class IcebergDeleteSinkOperatorX final : public DataSinkOperatorX<IcebergDeleteS
7575
private:
7676
friend class IcebergDeleteSinkLocalState;
7777
template <typename Writer, typename Parent>
78-
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
78+
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
7979
friend class AsyncWriterSink;
8080
const RowDescriptor& _row_desc;
81-
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
81+
VExprContextSPtrs _output_vexpr_ctxs;
8282
const std::vector<TExpr>& _t_output_expr;
8383
ObjectPool* _pool = nullptr;
8484
};
8585

8686
#include "common/compile_check_end.h"
87-
} // namespace doris::pipeline
87+
} // namespace doris

be/src/pipeline/exec/iceberg_merge_sink_operator.cpp renamed to be/src/exec/operator/iceberg_merge_sink_operator.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@
1919

2020
#include "common/status.h"
2121

22-
namespace doris::pipeline {
22+
namespace doris {
2323
#include "common/compile_check_begin.h"
2424

2525
Status IcebergMergeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
2626
RETURN_IF_ERROR(Base::init(state, info));
2727
SCOPED_TIMER(exec_time_counter());
2828
SCOPED_TIMER(_init_timer);
2929
auto& p = _parent->cast<Parent>();
30-
RETURN_IF_ERROR(_writer->init_properties(p._pool));
30+
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
3131
return Status::OK();
3232
}
3333

3434
#include "common/compile_check_end.h"
35-
} // namespace doris::pipeline
35+
} // namespace doris

be/src/pipeline/exec/iceberg_merge_sink_operator.h renamed to be/src/exec/operator/iceberg_merge_sink_operator.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
#pragma once
1919

20+
#include "exec/sink/viceberg_merge_sink.h"
2021
#include "operator.h"
21-
#include "vec/sink/viceberg_merge_sink.h"
2222

23-
namespace doris::pipeline {
23+
namespace doris {
2424
#include "common/compile_check_begin.h"
2525

2626
class IcebergMergeSinkOperatorX;
2727

2828
class IcebergMergeSinkLocalState final
29-
: public AsyncWriterSink<vectorized::VIcebergMergeSink, IcebergMergeSinkOperatorX> {
29+
: public AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX> {
3030
public:
31-
using Base = AsyncWriterSink<vectorized::VIcebergMergeSink, IcebergMergeSinkOperatorX>;
31+
using Base = AsyncWriterSink<VIcebergMergeSink, IcebergMergeSinkOperatorX>;
3232
using Parent = IcebergMergeSinkOperatorX;
3333
ENABLE_FACTORY_CREATOR(IcebergMergeSinkLocalState);
3434
IcebergMergeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
@@ -54,17 +54,17 @@ class IcebergMergeSinkOperatorX final : public DataSinkOperatorX<IcebergMergeSin
5454

5555
Status init(const TDataSink& thrift_sink) override {
5656
RETURN_IF_ERROR(Base::init(thrift_sink));
57-
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
57+
RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
5858
return Status::OK();
5959
}
6060

6161
Status prepare(RuntimeState* state) override {
6262
RETURN_IF_ERROR(Base::prepare(state));
63-
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
64-
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
63+
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
64+
return VExpr::open(_output_vexpr_ctxs, state);
6565
}
6666

67-
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override {
67+
Status sink(RuntimeState* state, Block* in_block, bool eos) override {
6868
auto& local_state = get_local_state(state);
6969
SCOPED_TIMER(local_state.exec_time_counter());
7070
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
@@ -74,13 +74,13 @@ class IcebergMergeSinkOperatorX final : public DataSinkOperatorX<IcebergMergeSin
7474
private:
7575
friend class IcebergMergeSinkLocalState;
7676
template <typename Writer, typename Parent>
77-
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
77+
requires(std::is_base_of_v<AsyncResultWriter, Writer>)
7878
friend class AsyncWriterSink;
7979
const RowDescriptor& _row_desc;
80-
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
80+
VExprContextSPtrs _output_vexpr_ctxs;
8181
const std::vector<TExpr>& _t_output_expr;
8282
ObjectPool* _pool = nullptr;
8383
};
8484

8585
#include "common/compile_check_end.h"
86-
} // namespace doris::pipeline
86+
} // namespace doris

be/src/exec/operator/operator.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
#include "exec/operator/hashjoin_build_sink.h"
4343
#include "exec/operator/hashjoin_probe_operator.h"
4444
#include "exec/operator/hive_table_sink_operator.h"
45+
#include "exec/operator/iceberg_delete_sink_operator.h"
46+
#include "exec/operator/iceberg_merge_sink_operator.h"
4547
#include "exec/operator/iceberg_table_sink_operator.h"
46-
#include "pipeline/exec/iceberg_delete_sink_operator.h"
47-
#include "pipeline/exec/iceberg_merge_sink_operator.h"
4848
#include "exec/operator/jdbc_scan_operator.h"
4949
#include "exec/operator/jdbc_table_sink_operator.h"
5050
#include "exec/operator/local_merge_sort_source_operator.h"
@@ -816,6 +816,8 @@ DECLARE_OPERATOR(HiveTableSinkLocalState)
816816
DECLARE_OPERATOR(TVFTableSinkLocalState)
817817
DECLARE_OPERATOR(IcebergTableSinkLocalState)
818818
DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
819+
DECLARE_OPERATOR(IcebergDeleteSinkLocalState)
820+
DECLARE_OPERATOR(IcebergMergeSinkLocalState)
819821
DECLARE_OPERATOR(MCTableSinkLocalState)
820822
DECLARE_OPERATOR(AnalyticSinkLocalState)
821823
DECLARE_OPERATOR(BlackholeSinkLocalState)
@@ -936,6 +938,8 @@ template class AsyncWriterSink<doris::VTabletWriterV2, OlapTableSinkV2OperatorX>
936938
template class AsyncWriterSink<doris::VHiveTableWriter, HiveTableSinkOperatorX>;
937939
template class AsyncWriterSink<doris::VIcebergTableWriter, IcebergTableSinkOperatorX>;
938940
template class AsyncWriterSink<doris::VIcebergTableWriter, SpillIcebergTableSinkOperatorX>;
941+
template class AsyncWriterSink<doris::VIcebergDeleteSink, IcebergDeleteSinkOperatorX>;
942+
template class AsyncWriterSink<doris::VIcebergMergeSink, IcebergMergeSinkOperatorX>;
939943
template class AsyncWriterSink<doris::VMCTableWriter, MCTableSinkOperatorX>;
940944
template class AsyncWriterSink<doris::VTVFTableWriter, TVFTableSinkOperatorX>;
941945

be/src/exec/partitioner/partitioner.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,34 @@ class PartitionerBase {
5656
const HashValType _partition_count;
5757
};
5858

59+
class PartitionFunction {
60+
public:
61+
using HashValType = PartitionerBase::HashValType;
62+
63+
virtual ~PartitionFunction() = default;
64+
65+
virtual Status init(const std::vector<TExpr>& texprs) = 0;
66+
67+
virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc) = 0;
68+
69+
virtual Status open(RuntimeState* state) = 0;
70+
71+
virtual Status close(RuntimeState* state) = 0;
72+
73+
virtual Status get_partitions(RuntimeState* state, Block* block, size_t partition_count,
74+
std::vector<HashValType>& partitions) const = 0;
75+
76+
virtual HashValType partition_count() const = 0;
77+
78+
virtual Status clone(RuntimeState* state,
79+
std::unique_ptr<PartitionFunction>& function) const = 0;
80+
};
81+
82+
enum class ShuffleHashMethod {
83+
CRC32,
84+
CRC32C,
85+
};
86+
5987
template <typename ChannelIds>
6088
class Crc32HashPartitioner : public PartitionerBase {
6189
public:

be/src/exec/pipeline/pipeline_fragment_context.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@
6464
#include "exec/operator/hashjoin_build_sink.h"
6565
#include "exec/operator/hashjoin_probe_operator.h"
6666
#include "exec/operator/hive_table_sink_operator.h"
67+
#include "exec/operator/iceberg_delete_sink_operator.h"
68+
#include "exec/operator/iceberg_merge_sink_operator.h"
6769
#include "exec/operator/iceberg_table_sink_operator.h"
68-
#include "pipeline/exec/iceberg_delete_sink_operator.h"
69-
#include "pipeline/exec/iceberg_merge_sink_operator.h"
7070
#include "exec/operator/jdbc_scan_operator.h"
7171
#include "exec/operator/jdbc_table_sink_operator.h"
7272
#include "exec/operator/local_merge_sort_source_operator.h"
@@ -1096,6 +1096,22 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
10961096
}
10971097
break;
10981098
}
1099+
case TDataSinkType::ICEBERG_DELETE_SINK: {
1100+
if (!thrift_sink.__isset.iceberg_delete_sink) {
1101+
return Status::InternalError("Missing iceberg delete sink.");
1102+
}
1103+
_sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1104+
row_desc, output_exprs);
1105+
break;
1106+
}
1107+
case TDataSinkType::ICEBERG_MERGE_SINK: {
1108+
if (!thrift_sink.__isset.iceberg_merge_sink) {
1109+
return Status::InternalError("Missing iceberg merge sink.");
1110+
}
1111+
_sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1112+
output_exprs);
1113+
break;
1114+
}
10991115
case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
11001116
if (!thrift_sink.__isset.max_compute_table_sink) {
11011117
return Status::InternalError("Missing max compute table sink.");

0 commit comments

Comments
 (0)