Skip to content

Commit a1b36e3

Browse files
authored
Merge pull request ClickHouse#79281 from CurtizJ/lwu-on-fly
Support applying lightweight deletes on the fly
2 parents 823f596 + 64ec300 commit a1b36e3

File tree

3 files changed

+84
-1
lines changed

3 files changed

+84
-1
lines changed

src/Storages/MergeTree/AlterConversions.cpp

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#include <Storages/MergeTree/AlterConversions.h>
22
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
33
#include <Storages/MergeTree/MergeTreeRangeReader.h>
4+
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
45
#include <Storages/MutationCommands.h>
56
#include <Interpreters/MutationsInterpreter.h>
67
#include <Interpreters/MutationsNonDeterministicHelpers.h>
78
#include <Parsers/ASTAlterQuery.h>
89
#include <Parsers/ASTAssignment.h>
10+
#include <Parsers/ASTLiteral.h>
911
#include <Common/ProfileEvents.h>
1012
#include <ranges>
1113

@@ -64,6 +66,41 @@ static MutationCommand createCommandWithUpdatedColumns(
6466
return res;
6567
}
6668

69+
static bool isLightweightDeleteCommand(const String & column_name, const ASTPtr & ast)
70+
{
71+
if (column_name != RowExistsColumn::name)
72+
return false;
73+
74+
const auto * literal = ast->as<ASTLiteral>();
75+
if (!literal)
76+
return false;
77+
78+
if (literal->value.getType() != Field::Types::UInt64)
79+
return false;
80+
81+
return literal->value.safeGet<UInt64>() == 0;
82+
}
83+
84+
static MutationCommand createLightweightDeleteCommand(const MutationCommand & command)
85+
{
86+
chassert(command.type == MutationCommand::Type::UPDATE);
87+
chassert(command.predicate != nullptr);
88+
89+
auto alter_command = std::make_shared<ASTAlterCommand>();
90+
alter_command->type = ASTAlterCommand::DELETE;
91+
92+
if (command.partition)
93+
alter_command->partition = alter_command->children.emplace_back(command.partition->clone()).get();
94+
95+
alter_command->predicate = alter_command->children.emplace_back(command.predicate->clone()).get();
96+
auto mutation_command = MutationCommand::parse(alter_command.get());
97+
98+
if (!mutation_command)
99+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse command {}", alter_command->formatForErrorMessage());
100+
101+
return *mutation_command;
102+
}
103+
67104
AlterConversions::AlterConversions(
68105
const MutationCommands & mutation_commands_,
69106
const ContextPtr & context)
@@ -308,16 +345,29 @@ MutationCommands AlterConversions::filterMutationCommands(Names & read_columns,
308345
}
309346
else if (command.type == MutationCommand::Type::UPDATE)
310347
{
348+
bool has_lightweight_delete = false;
311349
std::unordered_map<String, ASTPtr> new_updated_columns;
350+
312351
for (const auto & [column, ast] : command.column_to_update_expression)
313352
{
314-
if (read_columns_set.contains(column))
353+
if (isLightweightDeleteCommand(column, ast))
354+
{
355+
has_lightweight_delete = true;
356+
}
357+
else if (read_columns_set.contains(column))
315358
{
316359
ast->collectIdentifierNames(source_columns);
317360
new_updated_columns.emplace(column, ast->clone());
318361
}
319362
}
320363

364+
if (has_lightweight_delete)
365+
{
366+
auto new_command = createLightweightDeleteCommand(command);
367+
new_command.predicate->collectIdentifierNames(source_columns);
368+
filtered_commands.push_back(std::move(new_command));
369+
}
370+
321371
if (!new_updated_columns.empty())
322372
{
323373
auto new_command = createCommandWithUpdatedColumns(command, std::move(new_updated_columns));
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
3
2+
9850
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
DROP TABLE IF EXISTS test_deletes;
2+
3+
CREATE TABLE test_deletes (a UInt64) ENGINE = MergeTree ORDER BY a;
4+
5+
INSERT INTO test_deletes VALUES (1) (2) (3);
6+
7+
SYSTEM STOP MERGES test_deletes;
8+
SET mutations_sync = 0;
9+
SET lightweight_deletes_sync = 0;
10+
11+
ALTER TABLE test_deletes DELETE WHERE a = 1 SETTINGS mutations_sync = 0;
12+
DELETE FROM test_deletes WHERE a = 2 SETTINGS lightweight_deletes_sync = 0;
13+
14+
SELECT a FROM test_deletes SETTINGS apply_mutations_on_fly = 1;
15+
16+
DROP TABLE test_deletes;
17+
18+
CREATE TABLE test_deletes (a UInt64, b UInt64) ENGINE = MergeTree ORDER BY a;
19+
20+
INSERT INTO test_deletes SELECT number, 0 FROM numbers(10000);
21+
22+
DELETE FROM test_deletes WHERE a >= 100 AND a < 200 SETTINGS lightweight_deletes_sync = 1;
23+
24+
SYSTEM STOP MERGES test_deletes;
25+
26+
ALTER TABLE test_deletes UPDATE b = 1 WHERE a >= 150 AND a < 250 SETTINGS mutations_sync = 0;
27+
DELETE FROM test_deletes WHERE b = 1 SETTINGS lightweight_deletes_sync = 0;
28+
29+
SELECT count() FROM test_deletes SETTINGS apply_mutations_on_fly = 1;
30+
31+
DROP TABLE test_deletes;

0 commit comments

Comments
 (0)