Skip to content

Commit f1c4c24

Browse files
Merge pull request ClickHouse#87537 from ClickHouse/backport/25.8/87347
Backport ClickHouse#87347 to 25.8: Fix applying patches to columns with default expression
2 parents 41e0212 + c8aa339 commit f1c4c24

File tree

6 files changed

+230
-61
lines changed

6 files changed

+230
-61
lines changed

src/Storages/MergeTree/MergeTreeReadersChain.cpp

Lines changed: 98 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,33 @@ void MergeTreeReadersChain::executeActionsBeforePrewhere(
152152
}
153153

154154
auto patch_max_version = getMaxPatchVersionForStep(range_reader);
155+
const auto & result_header = range_reader.getReadSampleBlock();
156+
auto columns_for_patches = getColumnsForPatches(result_header, read_columns);
155157

156-
/// Apply patches without min version for new columns because
157-
/// if column is read on this step, it is not used by previous steps.
158-
applyPatches(
159-
range_reader.getReadSampleBlock(),
160-
read_columns,
161-
result.patch_versions_block,
162-
/*min_version=*/ {},
163-
patch_max_version,
164-
/*after_conversions=*/ false,
165-
result.columns_for_patches);
158+
auto apply_patches = [&](ColumnForPatch::Order order)
159+
{
160+
/// Apply patches without min version for new columns because
161+
/// if column is read on this step, it is not used by previous steps.
162+
applyPatches(
163+
result_header,
164+
read_columns,
165+
result.patch_versions_block,
166+
/*min_version=*/ {},
167+
patch_max_version,
168+
columns_for_patches,
169+
{order},
170+
result.columns_for_patches);
171+
};
172+
173+
apply_patches(ColumnForPatch::Order::BeforeConversions);
166174

167175
/// If columns not empty, then apply on-fly alter conversions if any required
168176
if (!prewhere_info || prewhere_info->perform_alter_conversions)
169177
{
170178
merge_tree_reader->performRequiredConversions(read_columns);
171179
}
172180

173-
applyPatches(
174-
range_reader.getReadSampleBlock(),
175-
read_columns,
176-
result.patch_versions_block,
177-
/*min_version=*/ {},
178-
patch_max_version,
179-
/*after_conversions=*/ true,
180-
result.columns_for_patches);
181+
apply_patches(ColumnForPatch::Order::AfterConversions);
181182

182183
/// If some columns absent in part, then evaluate default values
183184
if (should_evaluate_missing_defaults)
@@ -192,6 +193,8 @@ void MergeTreeReadersChain::executeActionsBeforePrewhere(
192193
addDummyColumnWithRowCount(additional_columns, result.num_rows);
193194
merge_tree_reader->evaluateMissingDefaults(additional_columns, read_columns);
194195
}
196+
197+
apply_patches(ColumnForPatch::Order::AfterEvaluatingDefaults);
195198
}
196199

197200
void MergeTreeReadersChain::executePrewhereActions(MergeTreeRangeReader & reader, ReadResult & result, const Block & previous_header, bool is_last_reader)
@@ -258,8 +261,57 @@ void MergeTreeReadersChain::readPatches(const Block & result_header, std::vector
258261
}
259262
}
260263

264+
ColumnsForPatches MergeTreeReadersChain::getColumnsForPatches(const Block & header, const Columns & columns) const
265+
{
266+
ColumnsForPatches res;
267+
auto block = header.cloneWithColumns(columns);
268+
using enum ColumnForPatch::Order;
269+
270+
for (const auto & patch_reader : patch_readers)
271+
{
272+
const auto & patch = patch_reader->getPatchPart();
273+
const auto & patch_columns = patch.part->getColumnsDescription();
274+
const auto & alter_conversions = patch.part->getAlterConversions();
275+
auto & columns_for_patch = res.emplace_back();
276+
277+
for (const auto & column : block)
278+
{
279+
if (isPatchPartSystemColumn(column.name))
280+
continue;
281+
282+
String column_name_in_patch = column.name;
283+
if (alter_conversions && alter_conversions->isColumnRenamed(column.name))
284+
column_name_in_patch = alter_conversions->getColumnOldName(column.name);
285+
286+
if (!patch_columns.hasColumnOrSubcolumn(GetColumnsOptions::All, column_name_in_patch))
287+
continue;
288+
289+
/// Add requested column name, not the column name in patch, for correct applying patches.
290+
/// This column name is translated to the column name in patch in MergeTree reader.
291+
/// If columns is not created at this stage, it will be filled with defaults at the latest stage of reading.
292+
if (!column.column)
293+
{
294+
columns_for_patch.emplace_back(column.name, AfterEvaluatingDefaults);
295+
}
296+
else if (patch.perform_alter_conversions)
297+
{
298+
columns_for_patch.emplace_back(column.name, AfterConversions);
299+
}
300+
else
301+
{
302+
columns_for_patch.emplace_back(column.name, BeforeConversions);
303+
}
304+
}
305+
}
306+
307+
return res;
308+
}
309+
261310
void MergeTreeReadersChain::applyPatchesAfterReader(ReadResult & result, size_t reader_index)
262311
{
312+
if (patch_readers.empty() || result.num_rows == 0 || result.columns.empty())
313+
return;
314+
263315
auto & current_reader = range_readers.at(reader_index);
264316

265317
std::optional<UInt64> min_version = getMaxPatchVersionForStep(current_reader);
@@ -277,53 +329,42 @@ void MergeTreeReadersChain::applyPatchesAfterReader(ReadResult & result, size_t
277329
column.column = ColumnUInt64::create(result.patch_versions_block.rows(), *min_version);
278330
}
279331

332+
const auto & result_header = current_reader.getSampleBlock();
333+
auto columns_for_patches = getColumnsForPatches(result_header, result.columns);
334+
335+
using enum ColumnForPatch::Order;
336+
std::set<ColumnForPatch::Order> suitable_orders = {AfterConversions, AfterEvaluatingDefaults};
337+
280338
applyPatches(
281-
current_reader.getSampleBlock(),
339+
result_header,
282340
result.columns,
283341
result.patch_versions_block,
284342
min_version,
285343
max_version,
286-
/*after_conversions=*/ true,
344+
columns_for_patches,
345+
suitable_orders,
287346
result.columns_for_patches);
288347
}
289348

290-
static UInt128 getColumnsHash(const Names & column_names)
291-
{
292-
SipHash hash;
293-
for (const auto & column_name : column_names)
294-
hash.update(column_name);
295-
return hash.get128();
296-
}
297-
298-
Names getUpdatedColumns(const Block & block, const IMergeTreeDataPartInfoForReader & patch)
349+
struct NamesHash
299350
{
300-
Names res;
301-
const auto & patch_columns = patch.getColumnsDescription();
302-
const auto & alter_conversions = patch.getAlterConversions();
303-
304-
for (const auto & column : block)
351+
size_t operator()(const Names & column_names) const
305352
{
306-
if (isPatchPartSystemColumn(column.name))
307-
continue;
308-
309-
String column_name = column.name;
310-
if (alter_conversions && alter_conversions->isColumnRenamed(column_name))
311-
column_name = alter_conversions->getColumnOldName(column_name);
312-
313-
if (patch_columns.hasColumnOrSubcolumn(GetColumnsOptions::All, column_name))
314-
res.push_back(std::move(column_name));
353+
SipHash hash;
354+
for (const auto & column_name : column_names)
355+
hash.update(column_name);
356+
return hash.get64();
315357
}
316-
317-
return res;
318-
}
358+
};
319359

320360
void MergeTreeReadersChain::applyPatches(
321361
const Block & result_header,
322362
Columns & result_columns,
323363
Block & versions_block,
324364
std::optional<UInt64> min_version,
325365
std::optional<UInt64> max_version,
326-
bool after_conversions,
366+
const ColumnsForPatches & columns_for_patches,
367+
const std::set<ColumnForPatch::Order> & suitable_orders,
327368
const Block & additional_columns) const
328369
{
329370
if (patch_readers.empty() || result_columns.empty())
@@ -333,7 +374,7 @@ void MergeTreeReadersChain::applyPatches(
333374
addPatchVirtuals(result_block, additional_columns);
334375

335376
/// Combine patches with the same structure.
336-
std::unordered_map<UInt128, PatchesToApply> patches_to_apply;
377+
std::unordered_map<Names, PatchesToApply, NamesHash> patches_to_apply;
337378
UInt64 source_data_version = patch_readers.front()->getPatchPart().source_data_version;
338379

339380
for (size_t i = 0; i < patch_readers.size(); ++i)
@@ -354,15 +395,17 @@ void MergeTreeReadersChain::applyPatches(
354395
if (max_version.has_value() && patchHasHigherDataVersion(*patch.part, *max_version))
355396
continue;
356397

357-
if (after_conversions != patch.perform_alter_conversions)
358-
continue;
398+
Names updated_columns;
399+
for (const auto & columns_for_patch : columns_for_patches[i])
400+
{
401+
if (suitable_orders.contains(columns_for_patch.order))
402+
updated_columns.push_back(columns_for_patch.column_name);
403+
}
359404

360-
auto updated_columns = getUpdatedColumns(result_block, *patch.part);
361405
if (updated_columns.empty())
362406
continue;
363407

364408
std::sort(updated_columns.begin(), updated_columns.end());
365-
auto columns_hash = getColumnsHash(updated_columns);
366409

367410
for (const auto & patch_result : patch_results)
368411
{
@@ -372,16 +415,16 @@ void MergeTreeReadersChain::applyPatches(
372415
for (auto & patch_to_apply : patches)
373416
{
374417
if (!patch_to_apply->empty())
375-
patches_to_apply[columns_hash].push_back(std::move(patch_to_apply));
418+
patches_to_apply[updated_columns].push_back(std::move(patch_to_apply));
376419
}
377420
}
378421
}
379422

380423
if (min_version.has_value())
381424
source_data_version = std::max(source_data_version, *min_version);
382425

383-
for (const auto & [_, patches] : patches_to_apply)
384-
applyPatchesToBlock(result_block, versions_block, patches, source_data_version);
426+
for (const auto & [updated_columns, patches] : patches_to_apply)
427+
applyPatchesToBlock(result_block, versions_block, patches, updated_columns, source_data_version);
385428

386429
result_columns = result_block.getColumns();
387430
result_columns.resize(result_header.columns());

src/Storages/MergeTree/MergeTreeReadersChain.h

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,27 @@ namespace DB
77

88
using RangeReaders = std::vector<MergeTreeRangeReader>;
99

10+
struct ColumnForPatch
11+
{
12+
enum class Order
13+
{
14+
/// Apply patch before converting the column to actual type.
15+
BeforeConversions,
16+
/// Apply patch after converting the column to actual type.
17+
AfterConversions,
18+
/// Apply patch after evaluating missing defaults for the column.
19+
AfterEvaluatingDefaults,
20+
};
21+
22+
ColumnForPatch(const String & column_name_, Order order_) : column_name(column_name_), order(order_) {}
23+
24+
String column_name;
25+
Order order;
26+
};
27+
28+
using ColumnsForPatch = std::vector<ColumnForPatch>;
29+
using ColumnsForPatches = std::vector<ColumnsForPatch>;
30+
1031
class MergeTreeReadersChain
1132
{
1233
public:
@@ -44,14 +65,16 @@ class MergeTreeReadersChain
4465
void addPatchVirtuals(Block & to, const Block & from) const;
4566
void addPatchVirtuals(ReadResult & result, const Block & header) const;
4667
void applyPatchesAfterReader(ReadResult & result, size_t reader_index);
68+
ColumnsForPatches getColumnsForPatches(const Block & header, const Columns & columns) const;
4769

4870
void applyPatches(
4971
const Block & result_header,
5072
Columns & result_columns,
5173
Block & versions_block,
5274
std::optional<UInt64> min_version,
5375
std::optional<UInt64> max_version,
54-
bool after_conversions,
76+
const ColumnsForPatches & columns_for_patches,
77+
const std::set<ColumnForPatch::Order> & suitable_orders,
5578
const Block & additional_columns) const;
5679

5780
RangeReaders range_readers;

src/Storages/MergeTree/PatchParts/applyPatches.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ IColumn::Patch CombinedPatchBuilder::createPatchForColumn(const String & column_
249249
};
250250
}
251251

252-
Block getUpdatedHeader(const PatchesToApply & patches, const Block & result_block)
252+
Block getUpdatedHeader(const PatchesToApply & patches, const NameSet & updated_columns)
253253
{
254254
std::vector<Block> headers;
255255

@@ -266,9 +266,8 @@ Block getUpdatedHeader(const PatchesToApply & patches, const Block & result_bloc
266266

267267
for (const auto & column : patch->patch_blocks[0])
268268
{
269-
/// System columns may differ in patches because we allow to apply combined patches
270-
/// with different modes. Ignore columns that are not present in result block.
271-
if (isPatchPartSystemColumn(column.name) || !result_block.has(column.name))
269+
/// Ignore columns that are not updated.
270+
if (!updated_columns.contains(column.name))
272271
header.erase(column.name);
273272
}
274273

@@ -522,13 +521,15 @@ void applyPatchesToBlock(
522521
Block & result_block,
523522
Block & versions_block,
524523
const PatchesToApply & patches,
524+
const Names & updated_columns,
525525
UInt64 source_data_version)
526526
{
527527
if (patches.empty())
528528
return;
529529

530530
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ApplyPatchesMicroseconds);
531-
auto updated_header = getUpdatedHeader(patches, result_block);
531+
NameSet updated_columns_set(updated_columns.begin(), updated_columns.end());
532+
auto updated_header = getUpdatedHeader(patches, updated_columns_set);
532533

533534
if (canApplyPatchesRaw(patches))
534535
applyPatchesToBlockRaw(result_block, versions_block, patches, updated_header, source_data_version);

src/Storages/MergeTree/PatchParts/applyPatches.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ void applyPatchesToBlock(
6666
Block & result_block,
6767
Block & versions_block,
6868
const PatchesToApply & patches,
69+
const Names & updated_columns,
6970
UInt64 source_data_version);
7071

7172
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
0 999
2+
1 1000
3+
2 1000
4+
3 1000
5+
4 1000
6+
5 1000
7+
6 1000
8+
7 1000
9+
8 1000
10+
9 1000
11+
0 999
12+
1 1000
13+
2 1000
14+
3 1000
15+
4 1000
16+
5 1000
17+
6 1000
18+
7 1000
19+
8 1000
20+
9 1000
21+
110 999
22+
111 1000
23+
112 1000
24+
113 1000
25+
114 1000
26+
115 1000
27+
116 1000
28+
117 1000
29+
118 1000
30+
119 1000
31+
110 999
32+
111 1000
33+
112 1000
34+
113 1000
35+
114 1000
36+
115 1000
37+
116 1000
38+
117 1000
39+
118 1000
40+
119 1000

0 commit comments

Comments
 (0)