Skip to content

Commit 3d79cdb

Browse files
authored
Merge pull request #843 from Altinity/backports/24.8.14/79369
24.8.14 Backport of ClickHouse#79369 -- Ignore parse error in system.distributed_ddl_queue
2 parents f45fd21 + ef784ea commit 3d79cdb

File tree

3 files changed

+139
-26
lines changed

3 files changed

+139
-26
lines changed

src/Storages/System/StorageSystemDDLWorkerQueue.cpp

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ namespace fs = std::filesystem;
2121
namespace DB
2222
{
2323

24+
namespace ErrorCodes
25+
{
26+
extern const int SYNTAX_ERROR;
27+
}
28+
2429
enum class Status : uint8_t
2530
{
2631
INACTIVE,
@@ -54,7 +59,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription()
5459
{"entry_version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "Version of the entry."},
5560
{"initiator_host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "Host that initiated the DDL operation."},
5661
{"initiator_port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>()), "Port used by the initiator."},
57-
{"cluster", std::make_shared<DataTypeString>(), "Cluster name."},
62+
{"cluster", std::make_shared<DataTypeString>(), "Cluster name, empty if not determined."},
5863
{"query", std::make_shared<DataTypeString>(), "Query executed."},
5964
{"settings", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()), "Settings used in the DDL operation."},
6065
{"query_create_time", std::make_shared<DataTypeDateTime>(), "Query created time."},
@@ -77,10 +82,25 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)
7782

7883
String description = fmt::format("from {}", task.entry_path);
7984
ParserQuery parser_query(end, settings.allow_settings_after_format_in_insert);
80-
ASTPtr query = parseQuery(parser_query, begin, end, description,
81-
settings.max_query_size,
82-
settings.max_parser_depth,
83-
settings.max_parser_backtracks);
85+
ASTPtr query;
86+
87+
try
88+
{
89+
query = parseQuery(parser_query, begin, end, description,
90+
settings.max_query_size,
91+
settings.max_parser_depth,
92+
settings.max_parser_backtracks);
93+
}
94+
catch (const Exception & e)
95+
{
96+
LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster");
97+
if (e.code() == ErrorCodes::SYNTAX_ERROR)
98+
{
99+
/// ignore parse error and present available information
100+
return "";
101+
}
102+
throw;
103+
}
84104

85105
String cluster_name;
86106
if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))

tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@
2525
</shard>
2626
</test_cluster>
2727
</remote_servers>
28+
<allow_zookeeper_write>1</allow_zookeeper_write>
2829
</clickhouse>
Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import pytest
2+
import time
3+
from io import StringIO
4+
import csv
5+
import logging
26

37
from helpers.cluster import ClickHouseCluster
48

@@ -25,46 +29,134 @@ def started_cluster():
2529
try:
2630
cluster.start()
2731

28-
for i, node in enumerate([node1, node2]):
29-
node.query("CREATE DATABASE testdb")
30-
node.query(
31-
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format(
32-
i
33-
)
34-
)
35-
for i, node in enumerate([node3, node4]):
36-
node.query("CREATE DATABASE testdb")
37-
node.query(
38-
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format(
39-
i
40-
)
41-
)
4232
yield cluster
4333

4434
finally:
4535
cluster.shutdown()
4636

4737

38+
def maintain_test_table(test_table):
39+
tmark = time.time() # to guarantee ZK path uniqueness
40+
41+
for i, node in enumerate([node1, node2]):
42+
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
43+
node.query("DROP DATABASE IF EXISTS testdb")
44+
45+
node.query("CREATE DATABASE testdb")
46+
node.query(
47+
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;"
48+
)
49+
for i, node in enumerate([node3, node4]):
50+
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
51+
node.query("DROP DATABASE IF EXISTS testdb")
52+
53+
node.query("CREATE DATABASE testdb")
54+
node.query(
55+
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;"
56+
)
57+
58+
4859
def test_distributed_ddl_queue(started_cluster):
60+
test_table = "test_table"
61+
maintain_test_table(test_table)
4962
node1.query(
50-
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
63+
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
5164
)
5265
node3.query(
53-
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
66+
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
5467
)
55-
node2.query("SYSTEM SYNC REPLICA testdb.test_table")
56-
node4.query("SYSTEM SYNC REPLICA testdb.test_table")
68+
node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
69+
node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
5770

5871
node1.query(
59-
"ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
72+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
6073
settings={"replication_alter_partitions_sync": "2"},
6174
)
6275
for node in nodes:
63-
node.query("SYSTEM SYNC REPLICA testdb.test_table")
64-
assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n"
76+
node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
77+
assert (
78+
node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n"
79+
)
6580
assert (
6681
node.query(
6782
"SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')"
6883
)
6984
== "ok\n"
7085
)
86+
87+
node1.query(
88+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn",
89+
settings={"replication_alter_partitions_sync": "2"},
90+
)
91+
92+
93+
def test_distributed_ddl_rubbish(started_cluster):
94+
test_table = "test_table_rubbish"
95+
maintain_test_table(test_table)
96+
node1.query(
97+
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val",
98+
settings={"replication_alter_partitions_sync": "2"},
99+
)
100+
101+
zk_content_raw = node1.query(
102+
"SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true FORMAT TabSeparatedWithNames",
103+
# parse=True,
104+
) # .to_dict("records")
105+
106+
dict_reader = csv.DictReader(StringIO(zk_content_raw), delimiter='\t')
107+
zk_content = [row for row in dict_reader]
108+
109+
original_query = ""
110+
new_query = "query-artificial-" + str(time.monotonic_ns())
111+
112+
# Copy information about query (one that added 'somenewcolumn') with new query ID
113+
# and broken query text (TABLE => TUBLE)
114+
for row in zk_content:
115+
if row["value"].find("somenewcolumn") >= 0:
116+
original_query = row["name"]
117+
break
118+
119+
rows_to_insert = []
120+
121+
for row in zk_content:
122+
if row["name"] == original_query:
123+
rows_to_insert.append(
124+
{
125+
"name": new_query,
126+
"path": row["path"],
127+
"value": row["value"].replace("TABLE", "TUBLE"),
128+
}
129+
)
130+
continue
131+
pos = row["path"].find(original_query)
132+
if pos >= 0:
133+
rows_to_insert.append(
134+
{
135+
"name": row["name"],
136+
"path": row["path"].replace(original_query, new_query),
137+
"value": row["value"],
138+
}
139+
)
140+
141+
# Ingest it to ZK
142+
for row in rows_to_insert:
143+
node1.query(
144+
"insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format(
145+
f'{row["name"]}', f'{row["path"]}', f'{row["value"]}'
146+
)
147+
)
148+
149+
# Ensure that data is visible via system.distributed_ddl_queue
150+
assert (
151+
int(
152+
node1.query(
153+
f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''"
154+
)
155+
)
156+
== 4
157+
)
158+
159+
# node1.query(
160+
# f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn",
161+
# settings={"replication_alter_partitions_sync": "2"},
162+
# )

0 commit comments

Comments
 (0)