Skip to content

Commit 0da3ff4

Browse files
committed
support multi parition columns
1 parent 7e169ef commit 0da3ff4

File tree

3 files changed

+79
-53
lines changed

3 files changed

+79
-53
lines changed

utils/local-engine/Operator/PartitionColumnFillingTransform.cpp

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@
66
#include <IO/ReadBufferFromString.h>
77
#include <IO/ReadHelpers.h>
88
#include <Common/StringUtils.h>
9+
#include "Processors/Chunk.h"
910
#include <Columns/IColumn.h>
1011
#include <DataTypes/DataTypeNullable.h>
1112
#include <DataTypes/Serializations/ISerialization.h>
1213
#include <Interpreters/DatabaseAndTableWithAlias.h>
1314
#include <base/DayNum.h>
1415

16+
#include <Poco/Logger.h>
17+
#include <base/logger_useful.h>
18+
1519

1620
using namespace DB;
1721

@@ -26,40 +30,37 @@ namespace ErrorCodes
2630
namespace local_engine
2731
{
2832
template <typename Type>
29-
requires(
30-
std::is_same_v<Type, Int8> || std::is_same_v<Type, UInt16> || std::is_same_v<Type, Int16> || std::is_same_v<Type, Int32> || std::is_same_v<Type, Int64>)
31-
ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value)
33+
requires(std::is_same_v<Type, Int8> || std::is_same_v<Type, UInt16> || std::is_same_v<Type, Int16> || std::is_same_v<Type, Int32> || std::is_same_v<Type, Int64>)
34+
ColumnPtr createIntPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows)
3235
{
3336
Type value;
3437
auto value_buffer = ReadBufferFromString(partition_value);
3538
readIntText(value, value_buffer);
36-
return column_type->createColumnConst(1, value);
39+
return column_type->createColumnConst(rows, value);
3740
}
3841

3942
template <typename Type>
40-
requires(std::is_same_v<Type, Float32> || std::is_same_v<Type, Float64>) ColumnPtr
41-
createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value)
43+
requires(std::is_same_v<Type, Float32> || std::is_same_v<Type, Float64>)
44+
ColumnPtr createFloatPartitionColumn(DataTypePtr column_type, std::string partition_value, size_t rows)
4245
{
4346
Type value;
4447
auto value_buffer = ReadBufferFromString(partition_value);
4548
readFloatText(value, value_buffer);
46-
return column_type->createColumnConst(1, value);
49+
return column_type->createColumnConst(rows, value);
4750
}
4851

49-
//template <>
50-
//ColumnPtr createFloatPartitionColumn<Float32>(DataTypePtr column_type, std::string partition_value);
51-
//template <>
52-
//ColumnPtr createFloatPartitionColumn<Float64>(DataTypePtr column_type, std::string partition_value);
5352

5453
PartitionColumnFillingTransform::PartitionColumnFillingTransform(
55-
const DB::Block & input_, const DB::Block & output_, const String & partition_col_name_, const String & partition_col_value_)
56-
: ISimpleTransform(input_, output_, true), partition_col_name(partition_col_name_), partition_col_value(partition_col_value_)
54+
const DB::Block & input_, const DB::Block & output_, const PartitionValues & partition_columns_)
55+
: ISimpleTransform(input_, output_, true), partition_column_values(partition_columns_)
5756
{
58-
partition_col_type = output_.getByName(partition_col_name_).type;
59-
partition_column = createPartitionColumn();
57+
for (const auto & value : partition_column_values)
58+
{
59+
partition_columns[value.first] = value.second;
60+
}
6061
}
6162

62-
/// In the case that a partition column is wrapper by nullable and LowCardinality, we need to keep the data type same
63+
/// In the case that a partition column is wrapper by nullable or LowCardinality, we need to keep the data type same
6364
/// as input.
6465
ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPtr & nested_col, DataTypePtr original_data_type)
6566
{
@@ -71,9 +72,10 @@ ColumnPtr PartitionColumnFillingTransform::tryWrapPartitionColumn(const ColumnPt
7172
return result;
7273
}
7374

74-
ColumnPtr PartitionColumnFillingTransform::createPartitionColumn()
75+
ColumnPtr PartitionColumnFillingTransform::createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t rows)
7576
{
7677
ColumnPtr result;
78+
auto partition_col_type = output.getHeader().getByName(parition_col).type;
7779
DataTypePtr nested_type = partition_col_type;
7880
if (const DataTypeNullable * nullable_type = checkAndGetDataType<DataTypeNullable>(partition_col_type.get()))
7981
{
@@ -86,45 +88,45 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn()
8688
WhichDataType which(nested_type);
8789
if (which.isInt8())
8890
{
89-
result = createIntPartitionColumn<Int8>(nested_type, partition_col_value);
91+
result = createIntPartitionColumn<Int8>(nested_type, partition_col_value, rows);
9092
}
9193
else if (which.isInt16())
9294
{
93-
result = createIntPartitionColumn<Int16>(nested_type, partition_col_value);
95+
result = createIntPartitionColumn<Int16>(nested_type, partition_col_value, rows);
9496
}
9597
else if (which.isInt32())
9698
{
97-
result = createIntPartitionColumn<Int32>(nested_type, partition_col_value);
99+
result = createIntPartitionColumn<Int32>(nested_type, partition_col_value, rows);
98100
}
99101
else if (which.isInt64())
100102
{
101-
result = createIntPartitionColumn<Int64>(nested_type, partition_col_value);
103+
result = createIntPartitionColumn<Int64>(nested_type, partition_col_value, rows);
102104
}
103105
else if (which.isFloat32())
104106
{
105-
result = createFloatPartitionColumn<Float32>(nested_type, partition_col_value);
107+
result = createFloatPartitionColumn<Float32>(nested_type, partition_col_value, rows);
106108
}
107109
else if (which.isFloat64())
108110
{
109-
result = createFloatPartitionColumn<Float64>(nested_type, partition_col_value);
111+
result = createFloatPartitionColumn<Float64>(nested_type, partition_col_value, rows);
110112
}
111113
else if (which.isDate())
112114
{
113115
DayNum value;
114116
auto value_buffer = ReadBufferFromString(partition_col_value);
115117
readDateText(value, value_buffer);
116-
result = nested_type->createColumnConst(1, value);
118+
result = nested_type->createColumnConst(rows, value);
117119
}
118120
else if (which.isDate32())
119121
{
120122
ExtendedDayNum value;
121123
auto value_buffer = ReadBufferFromString(partition_col_value);
122124
readDateText(value, value_buffer);
123-
result = nested_type->createColumnConst(1, value.toUnderType());
125+
result = nested_type->createColumnConst(rows, value.toUnderType());
124126
}
125127
else if (which.isString())
126128
{
127-
result = nested_type->createColumnConst(1, partition_col_value);
129+
result = nested_type->createColumnConst(rows, partition_col_value);
128130
}
129131
else
130132
{
@@ -136,14 +138,30 @@ ColumnPtr PartitionColumnFillingTransform::createPartitionColumn()
136138

137139
void PartitionColumnFillingTransform::transform(DB::Chunk & chunk)
138140
{
139-
size_t partition_column_position = output.getHeader().getPositionByName(partition_col_name);
140-
if (partition_column_position == input.getHeader().columns())
141-
{
142-
chunk.addColumn(partition_column->cloneResized(chunk.getNumRows()));
143-
}
144-
else
141+
auto rows = chunk.getNumRows();
142+
auto input_cols = chunk.detachColumns();
143+
Columns result_cols;
144+
auto input_header = input.getHeader();
145+
for (const auto & output_col : output.getHeader())
145146
{
146-
chunk.addColumn(partition_column_position, partition_column->cloneResized(chunk.getNumRows()));
147+
if (input_header.has(output_col.name))
148+
{
149+
size_t pos = input_header.getPositionByName(output_col.name);
150+
result_cols.push_back(input_cols[pos]);
151+
}
152+
else
153+
{
154+
// it's a partition column
155+
auto it = partition_columns.find(output_col.name);
156+
if (it == partition_columns.end())
157+
{
158+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found column({}) in parition columns", output_col.name);
159+
}
160+
result_cols.emplace_back(createPartitionColumn(it->first, it->second, rows));
161+
162+
}
163+
147164
}
165+
chunk = DB::Chunk(std::move(result_cols), rows);
148166
}
149167
}

utils/local-engine/Operator/PartitionColumnFillingTransform.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
#pragma once
22

33
#include <Processors/ISimpleTransform.h>
4+
#include "Common/StringUtils.h"
5+
#include "Columns/IColumn.h"
6+
#include "Core/Block.h"
7+
#include "DataTypes/Serializations/ISerialization.h"
48

59
namespace local_engine
610
{
@@ -10,22 +14,19 @@ class PartitionColumnFillingTransform : public DB::ISimpleTransform
1014
PartitionColumnFillingTransform(
1115
const DB::Block & input_,
1216
const DB::Block & output_,
13-
const String & partition_col_name_,
14-
const String & partition_col_value_);
17+
const PartitionValues & partition_columns_);
1518
void transform(DB::Chunk & chunk) override;
1619
String getName() const override
1720
{
1821
return "PartitionColumnFillingTransform";
1922
}
2023

2124
private:
22-
DB::ColumnPtr createPartitionColumn();
25+
DB::ColumnPtr createPartitionColumn(const String & parition_col, const String & partition_col_value, size_t row);
2326
static DB::ColumnPtr tryWrapPartitionColumn(const DB::ColumnPtr & nested_col, DB::DataTypePtr original_data_type);
2427

25-
DB::DataTypePtr partition_col_type;
26-
String partition_col_name;
27-
String partition_col_value;
28-
DB::ColumnPtr partition_column;
28+
PartitionValues partition_column_values;
29+
std::map<String, String> partition_columns;
2930
};
3031

3132
}

utils/local-engine/Parser/SerializedPlanParser.cpp

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
#include <base/logger_useful.h>
1+
#include "SerializedPlanParser.h"
2+
#include <memory>
23
#include <AggregateFunctions/AggregateFunctionFactory.h>
34
#include <AggregateFunctions/registerAggregateFunctions.h>
45
#include <Builder/BroadCastJoinBuilder.h>
@@ -41,7 +42,7 @@
4142
#include <Common/MergeTreeTool.h>
4243
#include <Common/StringUtils.h>
4344

44-
#include "SerializedPlanParser.h"
45+
#include <google/protobuf/util/json_util.h>
4546

4647
namespace DB
4748
{
@@ -197,19 +198,14 @@ QueryPlanPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrait::R
197198
}
198199
auto header = parseNameStruct(rel.base_schema());
199200
PartitionValues partition_values = StringUtils::parsePartitionTablePath(files_info->files[0]);
200-
if (partition_values.size() > 1)
201-
{
202-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "doesn't support multiple level partition.");
203-
}
204-
ProcessorPtr partition_transform;
205-
if (!partition_values.empty())
201+
202+
auto origin_header = header.cloneEmpty();
203+
for (const auto & partition_value : partition_values)
206204
{
207-
auto origin_header = header.cloneEmpty();
208-
PartitionValue partition_value = partition_values[0];
209205
header.erase(partition_value.first);
210-
partition_transform
211-
= std::make_shared<PartitionColumnFillingTransform>(header, origin_header, partition_value.first, partition_value.second);
212206
}
207+
ProcessorPtr partition_transform = std::make_shared<PartitionColumnFillingTransform>(header, origin_header, partition_values);
208+
213209
auto query_plan = std::make_unique<QueryPlan>();
214210
std::shared_ptr<IProcessor> source = std::make_shared<BatchParquetFileSource>(files_info, header, context);
215211
auto source_pipe = Pipe(source);
@@ -1281,7 +1277,18 @@ QueryPlanPtr SerializedPlanParser::parse(std::string & plan)
12811277
{
12821278
auto plan_ptr = std::make_unique<substrait::Plan>();
12831279
plan_ptr->ParseFromString(plan);
1284-
LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", plan_ptr->DebugString());
1280+
1281+
auto printPlan = [](const std::string & plan_raw){
1282+
substrait::Plan plan;
1283+
plan.ParseFromString(plan_raw);
1284+
std::string json_ret;
1285+
google::protobuf::util::JsonPrintOptions json_opt;
1286+
json_opt.add_whitespace = true;
1287+
google::protobuf::util::MessageToJsonString(plan, &json_ret, json_opt);
1288+
return json_ret;
1289+
};
1290+
1291+
LOG_DEBUG(&Poco::Logger::get("SerializedPlanParser"), "parse plan \n{}", printPlan(plan));
12851292
return parse(std::move(plan_ptr));
12861293
}
12871294
void SerializedPlanParser::initFunctionEnv()

0 commit comments

Comments
 (0)