Skip to content

Commit bf4a033

Browse files
authored
Merge pull request ClickHouse#78515 from CurtizJ/alter-conversions-refactoring
Minor refactoring near `AlterConversions`
2 parents f6b6495 + bab469d commit bf4a033

16 files changed

+241
-251
lines changed

src/Processors/QueryPlan/ReadFromMergeTree.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,12 +2279,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
22792279
{
22802280
for (const auto & ranges_in_data_part : result.parts_with_ranges)
22812281
{
2282-
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
2283-
ranges_in_data_part.data_part,
2284-
mutations_snapshot,
2285-
storage_snapshot->metadata,
2286-
getContext());
2287-
2282+
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(ranges_in_data_part.data_part, mutations_snapshot, getContext());
22882283
auto part_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(ranges_in_data_part.data_part, std::move(alter_conversions));
22892284
lazily_read_info->data_part_infos->emplace(ranges_in_data_part.part_index_in_query, std::move(part_info));
22902285
}

src/Storages/MergeTree/AlterConversions.cpp

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,38 @@ static MutationCommand createCommandWithUpdatedColumns(
6464
return res;
6565
}
6666

67+
AlterConversions::AlterConversions(
68+
const MutationCommands & mutation_commands_,
69+
const ContextPtr & context)
70+
{
71+
for (const auto & command : mutation_commands_)
72+
addMutationCommand(command, context);
73+
74+
/// Do not throw if there are no mutations or patches.
75+
if (number_of_alter_mutations > 1)
76+
{
77+
if (!mutation_commands.empty())
78+
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
79+
"Applying mutations on-fly is not supported with more than one ALTER MODIFY");
80+
}
81+
}
82+
6783
bool AlterConversions::isSupportedDataMutation(MutationCommand::Type type)
6884
{
69-
using enum MutationCommand::Type;
70-
return type == READ_COLUMN || type == UPDATE || type == DELETE;
85+
return type == MutationCommand::UPDATE || type == MutationCommand::DELETE;
86+
}
87+
88+
bool AlterConversions::isSupportedAlterMutation(MutationCommand::Type type)
89+
{
90+
return type == MutationCommand::READ_COLUMN;
7191
}
7292

7393
bool AlterConversions::isSupportedMetadataMutation(MutationCommand::Type type)
7494
{
7595
return type == MutationCommand::Type::RENAME_COLUMN;
7696
}
7797

78-
void AlterConversions::addMutationCommand(const MutationCommand & command)
98+
void AlterConversions::addMutationCommand(const MutationCommand & command, const ContextPtr & context)
7999
{
80100
using enum MutationCommand::Type;
81101

@@ -85,12 +105,12 @@ void AlterConversions::addMutationCommand(const MutationCommand & command)
85105
}
86106
else if (command.type == READ_COLUMN)
87107
{
88-
++number_of_alter_conversions;
108+
++number_of_alter_mutations;
89109
position_of_alter_conversion = mutation_commands.size();
90110
}
91111
else if (command.type == UPDATE || command.type == DELETE)
92112
{
93-
const auto result = findFirstNonDeterministicFunction(command, getContext());
113+
const auto result = findFirstNonDeterministicFunction(command, context);
94114
if (result.subquery)
95115
throw Exception(ErrorCodes::BAD_ARGUMENTS,
96116
"ALTER UPDATE/ALTER DELETE statement with subquery may be nondeterministic and cannot be applied on fly");
@@ -105,11 +125,6 @@ void AlterConversions::addMutationCommand(const MutationCommand & command)
105125

106126
mutation_commands.push_back(command);
107127
}
108-
109-
/// Do not throw if there are no mutations.
110-
if (number_of_alter_conversions > 1 && !mutation_commands.empty())
111-
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
112-
"Applying mutations on-fly is not supported with more than one ALTER MODIFY");
113128
}
114129

115130
bool AlterConversions::columnHasNewName(const std::string & old_name) const
@@ -157,10 +172,12 @@ std::string AlterConversions::getColumnOldName(const std::string & new_name) con
157172

158173
PrewhereExprSteps AlterConversions::getMutationSteps(
159174
const IMergeTreeDataPartInfoForReader & part_info,
160-
const NamesAndTypesList & read_columns) const
175+
const NamesAndTypesList & read_columns,
176+
const StorageMetadataPtr & metadata_snapshot,
177+
const ContextPtr & context) const
161178
{
162-
auto actions_chain = getMutationActions(part_info, read_columns, true);
163-
auto settings = ExpressionActionsSettings(getContext());
179+
auto actions_chain = getMutationActions(part_info, read_columns, metadata_snapshot, context);
180+
auto settings = ExpressionActionsSettings(context);
164181

165182
PrewhereExprSteps steps;
166183
for (size_t i = 0; i < actions_chain.size(); ++i)
@@ -191,7 +208,8 @@ PrewhereExprSteps AlterConversions::getMutationSteps(
191208
std::vector<MutationActions> AlterConversions::getMutationActions(
192209
const IMergeTreeDataPartInfoForReader & part_info,
193210
const NamesAndTypesList & read_columns,
194-
bool can_execute) const
211+
const StorageMetadataPtr & metadata_snapshot,
212+
const ContextPtr & context) const
195213
{
196214
if (mutation_commands.empty())
197215
return {};
@@ -211,19 +229,16 @@ std::vector<MutationActions> AlterConversions::getMutationActions(
211229
storage_read_columns.emplace_back(name_in_storage);
212230
}
213231

214-
addColumnsRequiredForMaterialized(storage_read_columns, storage_read_columns_set);
232+
addColumnsRequiredForMaterialized(storage_read_columns, storage_read_columns_set, metadata_snapshot, context);
215233
auto filtered_commands = filterMutationCommands(storage_read_columns, std::move(storage_read_columns_set));
216234

217235
if (filtered_commands.empty())
218236
return {};
219237

220-
if (can_execute)
221-
{
222-
ProfileEvents::increment(ProfileEvents::ReadTasksWithAppliedMutationsOnFly);
223-
ProfileEvents::increment(ProfileEvents::MutationsAppliedOnFlyInAllReadTasks, filtered_commands.size());
224-
}
238+
ProfileEvents::increment(ProfileEvents::ReadTasksWithAppliedMutationsOnFly);
239+
ProfileEvents::increment(ProfileEvents::MutationsAppliedOnFlyInAllReadTasks, filtered_commands.size());
225240

226-
MutationsInterpreter::Settings settings(can_execute);
241+
MutationsInterpreter::Settings settings(true);
227242
settings.return_all_columns = true;
228243
settings.recalculate_dependencies_of_updated_columns = false;
229244

@@ -237,13 +252,17 @@ std::vector<MutationActions> AlterConversions::getMutationActions(
237252
metadata_snapshot,
238253
std::move(filtered_commands),
239254
std::move(storage_read_columns),
240-
getContext(),
255+
context,
241256
settings);
242257

243258
return interpreter.getMutationActions();
244259
}
245260

246-
void AlterConversions::addColumnsRequiredForMaterialized(Names & read_columns, NameSet & read_columns_set) const
261+
void AlterConversions::addColumnsRequiredForMaterialized(
262+
Names & read_columns,
263+
NameSet & read_columns_set,
264+
const StorageMetadataPtr & metadata_snapshot,
265+
const ContextPtr & context) const
247266
{
248267
NameSet required_source_columns;
249268
const auto & columns_desc = metadata_snapshot->getColumns();
@@ -255,7 +274,7 @@ void AlterConversions::addColumnsRequiredForMaterialized(Names & read_columns, N
255274
if (default_desc && default_desc->kind == ColumnDefaultKind::Materialized)
256275
{
257276
auto query = default_desc->expression->clone();
258-
auto syntax_result = TreeRewriter(getContext()).analyze(query, source_columns);
277+
auto syntax_result = TreeRewriter(context).analyze(query, source_columns);
259278

260279
for (const auto & dependency : syntax_result->requiredSourceColumns())
261280
{
@@ -324,4 +343,16 @@ MutationCommands AlterConversions::filterMutationCommands(Names & read_columns,
324343
return filtered_commands;
325344
}
326345

346+
void MutationCounters::assertNotNegative() const
347+
{
348+
if (num_data < 0)
349+
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", num_data);
350+
351+
if (num_alter < 0)
352+
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly alter mutations counter is negative ({})", num_alter);
353+
354+
if (num_metadata < 0)
355+
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", num_metadata);
356+
}
357+
327358
}

src/Storages/MergeTree/AlterConversions.h

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,27 @@ namespace DB
88
{
99

1010
struct StorageInMemoryMetadata;
11-
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
12-
1311
class IMergeTreeDataPartInfoForReader;
12+
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
1413

1514
/// Alter conversions which should be applied on-fly for part.
1615
/// Built from of the most recent mutation commands for part.
1716
/// Now ALTER RENAME COLUMN, ALTER UPDATE and ALTER DELETE are applied.
18-
class AlterConversions : private WithContext, boost::noncopyable
17+
class AlterConversions : private boost::noncopyable
1918
{
2019
public:
2120
AlterConversions() = default;
2221

23-
AlterConversions(StorageMetadataPtr metadata_snapshot_, ContextPtr context_)
24-
: WithContext(context_)
25-
, metadata_snapshot(std::move(metadata_snapshot_))
26-
{
27-
}
22+
AlterConversions(
23+
const MutationCommands & mutation_commands_,
24+
const ContextPtr & context);
2825

2926
struct RenamePair
3027
{
3128
std::string rename_to;
3229
std::string rename_from;
3330
};
3431

35-
void addMutationCommand(const MutationCommand & command);
3632
const std::vector<RenamePair> & getRenameMap() const { return rename_map; }
3733

3834
/// Column was renamed (lookup by value in rename_map)
@@ -45,6 +41,7 @@ class AlterConversions : private WithContext, boost::noncopyable
4541
std::string getColumnOldName(const std::string & new_name) const;
4642

4743
static bool isSupportedDataMutation(MutationCommand::Type type);
44+
static bool isSupportedAlterMutation(MutationCommand::Type type);
4845
static bool isSupportedMetadataMutation(MutationCommand::Type type);
4946

5047
const NameSet & getAllUpdatedColumns() const { return all_updated_columns; }
@@ -54,25 +51,32 @@ class AlterConversions : private WithContext, boost::noncopyable
5451
/// mutations that affect columns from @read_columns.
5552
PrewhereExprSteps getMutationSteps(
5653
const IMergeTreeDataPartInfoForReader & part_info,
57-
const NamesAndTypesList & read_columns) const;
54+
const NamesAndTypesList & read_columns,
55+
const StorageMetadataPtr & metadata_snapshot,
56+
const ContextPtr & context) const;
5857

5958
private:
59+
void addMutationCommand(const MutationCommand & command, const ContextPtr & context);
60+
6061
/// Returns a chain of actions that can be
6162
/// applied to block to execute mutation commands
6263
/// that affect columns from @read_columns.
6364
std::vector<MutationActions> getMutationActions(
6465
const IMergeTreeDataPartInfoForReader & part_info,
6566
const NamesAndTypesList & read_columns,
66-
bool can_execute) const;
67+
const StorageMetadataPtr & metadata_snapshot,
68+
const ContextPtr & context) const;
6769

6870
/// Adds source columns of expressions of MATERIALIZED columns from @read_columns if any.
69-
void addColumnsRequiredForMaterialized(Names & read_columns, NameSet & read_columns_set) const;
71+
void addColumnsRequiredForMaterialized(
72+
Names & read_columns,
73+
NameSet & read_columns_set,
74+
const StorageMetadataPtr & metadata_snapshot,
75+
const ContextPtr & context) const;
7076

7177
/// Returns only mutations commands that affect columns from set.
7278
MutationCommands filterMutationCommands(Names & read_columns, NameSet read_columns_set) const;
7379

74-
StorageMetadataPtr metadata_snapshot;
75-
7680
/// Rename map new_name -> old_name.
7781
std::vector<RenamePair> rename_map;
7882

@@ -85,12 +89,21 @@ class AlterConversions : private WithContext, boost::noncopyable
8589
/// The number of ALTER MODIFY COLUMN commands.
8690
/// Applying on-fly mutations is not supported
8791
/// in presence of more than one of such commands.
88-
size_t number_of_alter_conversions = 0;
92+
size_t number_of_alter_mutations = 0;
8993

9094
/// Names of columns which are updated by mutation commands.
9195
NameSet all_updated_columns;
9296
};
9397

9498
using AlterConversionsPtr = std::shared_ptr<const AlterConversions>;
9599

100+
struct MutationCounters
101+
{
102+
Int64 num_data = 0;
103+
Int64 num_alter = 0;
104+
Int64 num_metadata = 0;
105+
106+
void assertNotNegative() const;
107+
};
108+
96109
}

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
487487
infos.add(part_infos);
488488
}
489489

490-
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot, global_ctx->metadata_snapshot, global_ctx->context));
490+
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot, global_ctx->context));
491491
}
492492

493493
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;

0 commit comments

Comments
 (0)