Skip to content

Commit 84c3a76

Browse files
authored
Merge pull request #889 from Altinity/backports/25.3.5/79369
25.3.5 Backport of ClickHouse#79369 -- Ignore parse error in system.distributed_ddl_queue
2 parents acc3993 + f6b8404 commit 84c3a76

File tree

3 files changed

+131
-24
lines changed

3 files changed

+131
-24
lines changed

src/Storages/System/StorageSystemDDLWorkerQueue.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ namespace Setting
2929
extern const SettingsUInt64 max_query_size;
3030
}
3131

32+
namespace ErrorCodes
33+
{
34+
extern const int SYNTAX_ERROR;
35+
}
36+
3237
enum class Status : uint8_t
3338
{
3439
INACTIVE,
@@ -62,7 +67,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription()
6267
{"entry_version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "Version of the entry."},
6368
{"initiator_host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "Host that initiated the DDL operation."},
6469
{"initiator_port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>()), "Port used by the initiator."},
65-
{"cluster", std::make_shared<DataTypeString>(), "Cluster name."},
70+
{"cluster", std::make_shared<DataTypeString>(), "Cluster name, empty if not determined."},
6671
{"query", std::make_shared<DataTypeString>(), "Query executed."},
6772
{"settings", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()), "Settings used in the DDL operation."},
6873
{"query_create_time", std::make_shared<DataTypeDateTime>(), "Query created time."},
@@ -85,8 +90,23 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)
8590

8691
String description = fmt::format("from {}", task.entry_path);
8792
ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]);
88-
ASTPtr query = parseQuery(
89-
parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]);
93+
ASTPtr query;
94+
95+
try
96+
{
97+
query = parseQuery(
98+
parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]);
99+
}
100+
catch (const Exception & e)
101+
{
102+
LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster");
103+
if (e.code() == ErrorCodes::SYNTAX_ERROR)
104+
{
105+
/// ignore parse error and present available information
106+
return "";
107+
}
108+
throw;
109+
}
90110

91111
String cluster_name;
92112
if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))

tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@
2525
</shard>
2626
</test_cluster>
2727
</remote_servers>
28+
<allow_zookeeper_write>1</allow_zookeeper_write>
2829
</clickhouse>
Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
import time
23

34
from helpers.cluster import ClickHouseCluster
45

@@ -25,46 +26,131 @@ def started_cluster():
2526
try:
2627
cluster.start()
2728

28-
for i, node in enumerate([node1, node2]):
29-
node.query("CREATE DATABASE testdb")
30-
node.query(
31-
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format(
32-
i
33-
)
34-
)
35-
for i, node in enumerate([node3, node4]):
36-
node.query("CREATE DATABASE testdb")
37-
node.query(
38-
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format(
39-
i
40-
)
41-
)
4229
yield cluster
4330

4431
finally:
4532
cluster.shutdown()
4633

4734

35+
def maintain_test_table(test_table):
36+
tmark = time.time() # to guarantee ZK path uniqueness
37+
38+
for i, node in enumerate([node1, node2]):
39+
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
40+
node.query("DROP DATABASE IF EXISTS testdb")
41+
42+
node.query("CREATE DATABASE testdb")
43+
node.query(
44+
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;"
45+
)
46+
for i, node in enumerate([node3, node4]):
47+
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
48+
node.query("DROP DATABASE IF EXISTS testdb")
49+
50+
node.query("CREATE DATABASE testdb")
51+
node.query(
52+
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;"
53+
)
54+
55+
4856
def test_distributed_ddl_queue(started_cluster):
57+
test_table = "test_table"
58+
maintain_test_table(test_table)
4959
node1.query(
50-
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
60+
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
5161
)
5262
node3.query(
53-
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
63+
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
5464
)
55-
node2.query("SYSTEM SYNC REPLICA testdb.test_table")
56-
node4.query("SYSTEM SYNC REPLICA testdb.test_table")
65+
node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
66+
node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
5767

5868
node1.query(
59-
"ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
69+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
6070
settings={"replication_alter_partitions_sync": "2"},
6171
)
6272
for node in nodes:
63-
node.query("SYSTEM SYNC REPLICA testdb.test_table")
64-
assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n"
73+
node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
74+
assert (
75+
node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n"
76+
)
6577
assert (
6678
node.query(
6779
"SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')"
6880
)
6981
== "ok\n"
7082
)
83+
84+
node1.query(
85+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn",
86+
settings={"replication_alter_partitions_sync": "2"},
87+
)
88+
89+
90+
def test_distributed_ddl_rubbish(started_cluster):
91+
test_table = "test_table_rubbish"
92+
maintain_test_table(test_table)
93+
node1.query(
94+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val",
95+
settings={"replication_alter_partitions_sync": "2"},
96+
)
97+
98+
zk_content = node1.query(
99+
"SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true",
100+
parse=True,
101+
).to_dict("records")
102+
103+
original_query = ""
104+
new_query = "query-artificial-" + str(time.monotonic_ns())
105+
106+
# Copy information about query (one that added 'somenewcolumn') with new query ID
107+
# and broken query text (TABLE => TUBLE)
108+
for row in zk_content:
109+
if row["value"].find("somenewcolumn") >= 0:
110+
original_query = row["name"]
111+
break
112+
113+
rows_to_insert = []
114+
115+
for row in zk_content:
116+
if row["name"] == original_query:
117+
rows_to_insert.append(
118+
{
119+
"name": new_query,
120+
"path": row["path"],
121+
"value": row["value"].replace("TABLE", "TUBLE"),
122+
}
123+
)
124+
continue
125+
pos = row["path"].find(original_query)
126+
if pos >= 0:
127+
rows_to_insert.append(
128+
{
129+
"name": row["name"],
130+
"path": row["path"].replace(original_query, new_query),
131+
"value": row["value"],
132+
}
133+
)
134+
135+
# Ingest it to ZK
136+
for row in rows_to_insert:
137+
node1.query(
138+
"insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format(
139+
f'{row["name"]}', f'{row["path"]}', f'{row["value"]}'
140+
)
141+
)
142+
143+
# Ensure that data is visible via system.distributed_ddl_queue
144+
assert (
145+
int(
146+
node1.query(
147+
f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''"
148+
)
149+
)
150+
== 4
151+
)
152+
153+
node1.query(
154+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn",
155+
settings={"replication_alter_partitions_sync": "2"},
156+
)

0 commit comments

Comments
 (0)