Skip to content

Commit e6e712d

Browse files
authored
Merge pull request ClickHouse#92491 from ClickHouse/backport/25.8/88860
Backport ClickHouse#88860 to 25.8: Fix premature TTL column removal causing merge failures and wrong defaults
2 parents ca226db + 34891ff commit e6e712d

16 files changed

+315
-60
lines changed

src/Processors/Transforms/TTLTransform.cpp

Lines changed: 77 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,36 @@ static TTLExpressions getExpressions(const TTLDescription & ttl_descr, PreparedS
3131
return {expr.expression, where_expr.expression};
3232
}
3333

34+
SharedHeader TTLTransform::addExpiredColumnsToBlock(const SharedHeader & header, const NamesAndTypesList & expired_columns_)
35+
{
36+
if (expired_columns_.empty())
37+
return header;
38+
39+
auto output_block = *header;
40+
41+
for (const auto & col : expired_columns_)
42+
{
43+
if (output_block.has(col.name))
44+
continue;
45+
46+
output_block.insert({col.type->createColumn(), col.type, col.name});
47+
}
48+
49+
return std::make_shared<const Block>(std::move(output_block));
50+
}
51+
3452
TTLTransform::TTLTransform(
3553
const ContextPtr & context,
3654
SharedHeader header_,
3755
const MergeTreeData & storage_,
3856
const StorageMetadataPtr & metadata_snapshot_,
3957
const MergeTreeData::MutableDataPartPtr & data_part_,
58+
const NamesAndTypesList & expired_columns_,
4059
time_t current_time_,
4160
bool force_)
42-
: IAccumulatingTransform(header_, header_)
61+
: IAccumulatingTransform(header_, addExpiredColumnsToBlock(header_, expired_columns_))
4362
, data_part(data_part_)
63+
, expired_columns(expired_columns_)
4464
, log(getLogger(storage_.getLogName() + " (TTLTransform)"))
4565
{
4666
auto old_ttl_infos = data_part->ttl_infos;
@@ -71,32 +91,49 @@ TTLTransform::TTLTransform(
7191
old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_,
7292
getInputPort().getHeader(), storage_));
7393

74-
if (metadata_snapshot_->hasAnyColumnTTL())
94+
const auto & storage_columns = metadata_snapshot_->getColumns();
95+
const auto & column_defaults = storage_columns.getDefaults();
96+
97+
auto build_default_expr = [&](const String & name)
7598
{
76-
const auto & storage_columns = metadata_snapshot_->getColumns();
77-
const auto & column_defaults = storage_columns.getDefaults();
99+
using Result = std::pair<ExpressionActionsPtr, String>;
100+
auto it = column_defaults.find(name);
101+
if (it == column_defaults.end())
102+
return Result{};
103+
const auto & column = storage_columns.get(name);
104+
auto default_ast = it->second.expression->clone();
105+
default_ast = addTypeConversionToAST(std::move(default_ast), column.type->getName());
106+
auto syntax_result = TreeRewriter(storage_.getContext()).analyze(default_ast, storage_columns.getAll());
107+
auto actions = ExpressionAnalyzer{default_ast, syntax_result, storage_.getContext()}.getActions(true);
108+
return Result{actions, default_ast->getColumnName()};
109+
};
110+
111+
for (const auto & expired_column : expired_columns)
112+
{
113+
auto [default_expression, default_column_name] = build_default_expr(expired_column.name);
114+
expired_columns_data.emplace(
115+
expired_column.name, ExpiredColumnData{expired_column.type, std::move(default_expression), std::move(default_column_name)});
116+
}
78117

118+
if (metadata_snapshot_->hasAnyColumnTTL())
119+
{
120+
auto expired_columns_map = expired_columns.getNameToTypeMap();
79121
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
80122
{
81-
ExpressionActionsPtr default_expression;
82-
String default_column_name;
83-
auto it = column_defaults.find(name);
84-
if (it != column_defaults.end())
123+
if (!expired_columns_map.contains(name))
85124
{
86-
const auto & column = storage_columns.get(name);
87-
auto default_ast = it->second.expression->clone();
88-
default_ast = addTypeConversionToAST(std::move(default_ast), column.type->getName());
89-
90-
auto syntax_result
91-
= TreeRewriter(storage_.getContext()).analyze(default_ast, metadata_snapshot_->getColumns().getAllPhysical());
92-
default_expression = ExpressionAnalyzer{default_ast, syntax_result, storage_.getContext()}.getActions(true);
93-
default_column_name = default_ast->getColumnName();
125+
auto [default_expression, default_column_name] = build_default_expr(name);
126+
algorithms.emplace_back(std::make_unique<TTLColumnAlgorithm>(
127+
getExpressions(description, subqueries_for_sets, context),
128+
description,
129+
old_ttl_infos.columns_ttl[name],
130+
current_time_,
131+
force_,
132+
name,
133+
default_expression,
134+
default_column_name,
135+
isCompactPart(data_part)));
94136
}
95-
96-
algorithms.emplace_back(std::make_unique<TTLColumnAlgorithm>(
97-
getExpressions(description, subqueries_for_sets, context), description,
98-
old_ttl_infos.columns_ttl[name], current_time_,
99-
force_, name, default_expression, default_column_name, isCompactPart(data_part)));
100137
}
101138
}
102139

@@ -131,6 +168,25 @@ void TTLTransform::consume(Chunk chunk)
131168
convertToFullIfSparse(chunk);
132169
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
133170

171+
/// Fill expired columns with default values which will later be handled in TTLColumnAlgorithm
172+
for (const auto & [column, data] : expired_columns_data)
173+
{
174+
auto default_column
175+
= ITTLAlgorithm::executeExpressionAndGetColumn(data.default_expression, block, data.default_column_name);
176+
if (default_column)
177+
default_column = default_column->convertToFullColumnIfConst();
178+
else
179+
default_column = data.type->createColumnConstWithDefaultValue(block.rows())->convertToFullColumnIfConst();
180+
181+
/// Expired column may pre-exist (e.g. from customized merges like ReplacingMergeTree with version key), so
182+
/// replace it with default instead of inserting a new one.
183+
auto * c = block.findByName(column);
184+
if (c)
185+
c->column = default_column;
186+
else
187+
block.insert(ColumnWithTypeAndName(default_column, data.type, column));
188+
}
189+
134190
for (const auto & algorithm : algorithms)
135191
algorithm->execute(block);
136192

src/Processors/Transforms/TTLTransform.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class TTLTransform : public IAccumulatingTransform
2020
const MergeTreeData & storage_,
2121
const StorageMetadataPtr & metadata_snapshot_,
2222
const MergeTreeData::MutableDataPartPtr & data_part_,
23+
const NamesAndTypesList & expired_columns_,
2324
time_t current_time,
2425
bool force_
2526
);
@@ -30,6 +31,8 @@ class TTLTransform : public IAccumulatingTransform
3031

3132
PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }
3233

34+
static SharedHeader addExpiredColumnsToBlock(const SharedHeader & header, const NamesAndTypesList & expired_columns_);
35+
3336
protected:
3437
void consume(Chunk chunk) override;
3538
Chunk generate() override;
@@ -46,6 +49,16 @@ class TTLTransform : public IAccumulatingTransform
4649

4750
/// ttl_infos and empty_columns are updating while reading
4851
const MergeTreeData::MutableDataPartPtr & data_part;
52+
53+
NamesAndTypesList expired_columns;
54+
55+
struct ExpiredColumnData
56+
{
57+
DataTypePtr type;
58+
ExpressionActionsPtr default_expression;
59+
String default_column_name;
60+
};
61+
std::unordered_map<String, ExpiredColumnData> expired_columns_data;
4962
LoggerPtr log;
5063
};
5164

src/Storages/MergeTree/IMergedBlockOutputStream.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,10 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
3838
NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
3939
const MergeTreeDataPartPtr & data_part,
4040
NamesAndTypesList & columns,
41+
const NameSet & empty_columns,
4142
SerializationInfoByName & serialization_infos,
4243
MergeTreeData::DataPart::Checksums & checksums)
4344
{
44-
const NameSet & empty_columns = data_part->expired_columns;
45-
4645
/// For compact part we have to override whole file with data, it's not
4746
/// worth it
4847
if (empty_columns.empty() || isCompactPart(data_part))

src/Storages/MergeTree/IMergedBlockOutputStream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ class IMergedBlockOutputStream
4646
}
4747

4848
protected:
49-
/// Remove all columns marked expired in data_part. Also, clears checksums
49+
/// Remove all columns in @empty_columns. Also, clears checksums
5050
/// and columns array. Return set of removed files names.
5151
NameSet removeEmptyColumnsFromPart(
5252
const MergeTreeDataPartPtr & data_part,
5353
NamesAndTypesList & columns,
54+
const NameSet & empty_columns,
5455
SerializationInfoByName & serialization_infos,
5556
MergeTreeData::DataPart::Checksums & checksums);
5657

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
266266
Names sort_key_columns_vec = sorting_key_expr->getRequiredColumns();
267267

268268
/// Collect columns used in the sorting key expressions.
269-
std::set<String> key_columns;
269+
NameSet key_columns;
270270
auto storage_columns = global_ctx->storage_columns.getNameSet();
271271
for (const auto & name : sort_key_columns_vec)
272272
{
@@ -292,6 +292,15 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
292292
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
293293
key_columns.emplace(global_ctx->merging_params.sign_column);
294294

295+
/// Force all columns params of Graphite mode
296+
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Graphite)
297+
{
298+
key_columns.emplace(global_ctx->merging_params.graphite_params.path_column_name);
299+
key_columns.emplace(global_ctx->merging_params.graphite_params.time_column_name);
300+
key_columns.emplace(global_ctx->merging_params.graphite_params.value_column_name);
301+
key_columns.emplace(global_ctx->merging_params.graphite_params.version_column_name);
302+
}
303+
295304
/// Force to merge at least one column in case of empty key
296305
if (key_columns.empty())
297306
key_columns.emplace(global_ctx->storage_columns.front().name);
@@ -303,6 +312,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
303312
key_columns.insert(minmax_columns.begin(), minmax_columns.end());
304313
}
305314

315+
key_columns.insert(global_ctx->deduplicate_by_columns.begin(), global_ctx->deduplicate_by_columns.end());
316+
317+
/// Key columns required for merge, must not be expired early.
318+
global_ctx->merge_required_key_columns = key_columns;
319+
306320
const auto & skip_indexes = global_ctx->metadata_snapshot->getSecondaryIndices();
307321

308322
for (const auto & index : skip_indexes)
@@ -468,23 +482,37 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
468482

469483
const auto & patch_parts = global_ctx->future_part->patch_parts;
470484

471-
/// Skip fully expired columns manually, since in case of
472-
/// need_remove_expired_values is not set, TTLTransform will not be used,
473-
/// and columns that had been removed by TTL (via TTLColumnAlgorithm) will
474-
/// be added again with default values.
485+
/// Determine columns that are absent in all source parts—either fully expired or never written—and mark them as
486+
/// expired to avoid unnecessary reads or writes during merges.
487+
///
488+
/// NOTE:
489+
/// Handling missing columns that have default expressions is non-trivial and currently unresolved
490+
/// (see https://github.com/ClickHouse/ClickHouse/issues/91127).
491+
/// For now, we conservatively avoid expiring such columns.
475492
///
476-
/// Also note, that it is better to do this here, since in other places it
477-
/// will be too late (i.e. they will be written, and we will burn CPU/disk
478-
/// resources for this).
479-
if (!ctx->need_remove_expired_values)
493+
/// The main challenges include:
494+
/// 1. A default expression may depend on other columns, which themselves may be missing or expired,
495+
/// making it unclear whether the default should be materialized or recomputed.
496+
/// 2. Default expressions may introduce semantic changes if re-evaluated during merges, leading to
497+
/// non-deterministic results across parts.
480498
{
481-
for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
499+
NameSet columns_present_in_parts;
500+
columns_present_in_parts.reserve(global_ctx->storage_columns.size());
501+
502+
/// Collect all column names that actually exist in the source parts
503+
for (const auto & part : global_ctx->future_part->parts)
482504
{
483-
if (ttl.finished())
484-
{
485-
global_ctx->new_data_part->expired_columns.insert(column_name);
486-
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
487-
}
505+
for (const auto & col : part->getColumns())
506+
columns_present_in_parts.emplace(col.name);
507+
}
508+
509+
const auto & columns_desc = global_ctx->metadata_snapshot->getColumns();
510+
511+
/// Any storage column not present in any part and without a default expression is considered expired
512+
for (const auto & storage_column : global_ctx->storage_columns)
513+
{
514+
if (!columns_present_in_parts.contains(storage_column.name) && !columns_desc.getDefault(storage_column.name))
515+
global_ctx->new_data_part->expired_columns.emplace(storage_column.name);
488516
}
489517
}
490518

@@ -510,8 +538,27 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
510538
if (!expired_columns.empty())
511539
{
512540
global_ctx->gathering_columns = global_ctx->gathering_columns.eraseNames(expired_columns);
513-
global_ctx->merging_columns = global_ctx->merging_columns.eraseNames(expired_columns);
514-
global_ctx->storage_columns = global_ctx->storage_columns.eraseNames(expired_columns);
541+
542+
auto filter_columns = [&](const NamesAndTypesList & input, NamesAndTypesList & expired_out)
543+
{
544+
NamesAndTypesList result;
545+
for (const auto & column : input)
546+
{
547+
bool is_expired = expired_columns.contains(column.name);
548+
bool is_required_for_merge = global_ctx->merge_required_key_columns.contains(column.name);
549+
550+
if (is_expired)
551+
expired_out.push_back(column);
552+
553+
if (!is_expired || is_required_for_merge)
554+
result.push_back(column);
555+
}
556+
557+
return result;
558+
};
559+
560+
global_ctx->merging_columns = filter_columns(global_ctx->merging_columns, global_ctx->merging_columns_expired_by_ttl);
561+
global_ctx->storage_columns = filter_columns(global_ctx->storage_columns, global_ctx->storage_columns_expired_by_ttl);
515562
}
516563

517564
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
@@ -612,6 +659,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
612659
case MergeAlgorithm::Horizontal:
613660
{
614661
global_ctx->merging_columns = global_ctx->storage_columns;
662+
global_ctx->merging_columns_expired_by_ttl = global_ctx->storage_columns_expired_by_ttl;
615663
global_ctx->merging_skip_indexes = global_ctx->metadata_snapshot->getSecondaryIndices();
616664
global_ctx->gathering_columns.clear();
617665
global_ctx->skip_indexes_by_column.clear();
@@ -1786,11 +1834,13 @@ class TTLStep : public ITransformingStep
17861834
const MergeTreeData & storage_,
17871835
const StorageMetadataPtr & metadata_snapshot_,
17881836
const MergeTreeData::MutableDataPartPtr & data_part_,
1837+
const NamesAndTypesList & expired_columns_,
17891838
time_t current_time,
17901839
bool force_)
1791-
: ITransformingStep(input_header_, input_header_, getTraits())
1840+
: ITransformingStep(input_header_, TTLTransform::addExpiredColumnsToBlock(input_header_, expired_columns_), getTraits())
17921841
{
1793-
transform = std::make_shared<TTLTransform>(context_, input_header_, storage_, metadata_snapshot_, data_part_, current_time, force_);
1842+
transform = std::make_shared<TTLTransform>(
1843+
context_, input_header_, storage_, metadata_snapshot_, data_part_, expired_columns_, current_time, force_);
17941844
subqueries_for_sets = transform->getSubqueries();
17951845
}
17961846

@@ -2014,10 +2064,17 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
20142064
PreparedSets::Subqueries subqueries;
20152065

20162066
/// TTL step
2017-
if (ctx->need_remove_expired_values)
2067+
if (ctx->need_remove_expired_values || !global_ctx->merging_columns_expired_by_ttl.empty())
20182068
{
20192069
auto ttl_step = std::make_unique<TTLStep>(
2020-
merge_parts_query_plan.getCurrentHeader(), global_ctx->context, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
2070+
merge_parts_query_plan.getCurrentHeader(),
2071+
global_ctx->context,
2072+
*global_ctx->data,
2073+
global_ctx->metadata_snapshot,
2074+
global_ctx->new_data_part,
2075+
global_ctx->merging_columns_expired_by_ttl,
2076+
global_ctx->time_of_merge,
2077+
ctx->force_ttl);
20212078
subqueries = ttl_step->getSubqueries();
20222079
ttl_step->setStepDescription("TTL step");
20232080
merge_parts_query_plan.addStep(std::move(ttl_step));

src/Storages/MergeTree/MergeTask.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,11 @@ class MergeTask
197197
bool cleanup{false};
198198

199199
NamesAndTypesList gathering_columns{};
200+
NameSet merge_required_key_columns{};
200201
NamesAndTypesList merging_columns{};
202+
NamesAndTypesList merging_columns_expired_by_ttl{};
201203
NamesAndTypesList storage_columns{};
204+
NamesAndTypesList storage_columns_expired_by_ttl{};
202205
MergeTreeData::DataPart::Checksums checksums_gathered_columns{};
203206
ColumnsSubstreams gathered_columns_substreams{};
204207

src/Storages/MergeTree/MergedBlockOutputStream.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
231231
auto serialization_infos = new_part->getSerializationInfos();
232232

233233
serialization_infos.replaceData(new_serialization_infos);
234-
files_to_remove_after_sync = removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
234+
files_to_remove_after_sync
235+
= removeEmptyColumnsFromPart(new_part, part_columns, new_part->expired_columns, serialization_infos, checksums);
235236

236237
new_part->setColumns(part_columns, serialization_infos, metadata_snapshot->getMetadataVersion());
237238
}

src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,13 @@ MergedColumnOnlyOutputStream::fillChecksums(
9797
auto serialization_infos = new_part->getSerializationInfos();
9898
serialization_infos.replaceData(new_serialization_infos);
9999

100-
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);
100+
NameSet empty_columns;
101+
for (const auto & column : writer->getColumnsSample())
102+
{
103+
if (new_part->expired_columns.contains(column.name))
104+
empty_columns.emplace(column.name);
105+
}
106+
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, empty_columns, serialization_infos, checksums);
101107

102108
for (const String & removed_file : removed_files)
103109
{

0 commit comments

Comments
 (0)