Skip to content

Commit 721e598

Browse files
Backport ClickHouse#87928 to 25.8: Fix merge with projections when the last block is empty
1 parent c513304 commit 721e598

File tree

5 files changed

+46
-16
lines changed

5 files changed

+46
-16
lines changed

src/Storages/MergeTree/MergeProjectionPartsTask.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ class MergeProjectionPartsTask : public IExecutableTask
5050
}
5151

5252
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
53-
void cancel() noexcept override { chassert(false, "Not implemented"); }
53+
void cancel() noexcept override
54+
{
55+
/// FIXME: See `executeHere` from MergeTask.h called in executeStep
56+
}
5457
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
5558
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
5659
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }

src/Storages/MergeTree/MergeTask.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo
852852
{
853853
const auto & projection = *global_ctx->projections_to_rebuild[i];
854854
Block block_to_squash = projection.calculate(block, global_ctx->context);
855+
/// Avoid replacing the projection squash header if nothing was generated (it used to return an empty block)
856+
if (block_to_squash.rows() == 0)
857+
return;
855858
auto & projection_squash_plan = ctx->projection_squashes[i];
856859
projection_squash_plan.setHeader(block_to_squash.cloneEmpty());
857860
Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()}));

src/Storages/ProjectionsDescription.cpp

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -503,22 +503,20 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context,
503503
pipeline.complete(sink);
504504
CompletedPipelineExecutor executor(pipeline);
505505
executor.execute();
506-
Block projection_block;
507-
if (sink->isAccumulatedSomething())
506+
507+
/// Always return the proper header, even if nothing was accumulated, in case the caller needs to use it
508+
Block projection_block = sink->isAccumulatedSomething() ? sink->getPort().getHeader().cloneWithColumns(sink->detachAccumulatedColumns())
509+
: sink->getPort().getHeader().cloneEmpty();
510+
/// Rename parent _part_offset to _parent_part_offset column
511+
if (with_parent_part_offset)
508512
{
509-
projection_block = sink->getPort().getHeader().cloneWithColumns(sink->detachAccumulatedColumns());
510-
511-
/// Rename parent _part_offset to _parent_part_offset column
512-
if (with_parent_part_offset)
513-
{
514-
chassert(projection_block.has("_part_offset"));
515-
chassert(!projection_block.has("_parent_part_offset"));
516-
517-
auto new_column = projection_block.getByName("_part_offset");
518-
new_column.name = "_parent_part_offset";
519-
projection_block.erase("_part_offset");
520-
projection_block.insert(std::move(new_column));
521-
}
513+
chassert(projection_block.has("_part_offset"));
514+
chassert(!projection_block.has("_parent_part_offset"));
515+
516+
auto new_column = projection_block.getByName("_part_offset");
517+
new_column.name = "_parent_part_offset";
518+
projection_block.erase("_part_offset");
519+
projection_block.insert(std::move(new_column));
522520
}
523521

524522
return projection_block;

tests/queries/0_stateless/03636_empty_projection_block.reference

Whitespace-only changes.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- This test would hit a LOGICAL_ERROR during merge
2+
CREATE TABLE post_state
3+
(
4+
`ts` DateTime,
5+
`id` Int64,
6+
`state` Nullable(UInt8) TTL ts + INTERVAL 1 MONTH,
7+
PROJECTION p_digest_posts_state
8+
(
9+
SELECT
10+
id,
11+
argMax(state, ts) AS state
12+
GROUP BY id
13+
)
14+
)
15+
ENGINE = MergeTree()
16+
ORDER BY id
17+
TTL ts + toIntervalSecond(0) WHERE state IS NULL
18+
SETTINGS index_granularity = 8192, deduplicate_merge_projection_mode='rebuild';
19+
20+
SYSTEM STOP MERGES post_state;
21+
INSERT INTO post_state VALUES ('2024-01-01 00:00:00', 1, NULL);
22+
INSERT INTO post_state VALUES ('2024-01-01 00:00:00', 1, NULL);
23+
INSERT INTO post_state VALUES ('2024-01-01 00:00:00', 1, 1);
24+
INSERT INTO post_state VALUES ('2024-01-01 00:00:00', 1, NULL);
25+
SYSTEM START MERGES post_state;
26+
OPTIMIZE TABLE post_state;

0 commit comments

Comments
 (0)