Skip to content

Commit 2df8357

Browse files
committed
Revert "Merge pull request ClickHouse#79369 from ilejn/ignore_error_distributed_ddl_queue"
This reverts commit 0492f67.
1 parent 0492f67 commit 2df8357

File tree

2 files changed

+24
-130
lines changed

2 files changed

+24
-130
lines changed

src/Storages/System/StorageSystemDDLWorkerQueue.cpp

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@ namespace Setting
2929
extern const SettingsUInt64 max_query_size;
3030
}
3131

32-
namespace ErrorCodes
33-
{
34-
extern const int SYNTAX_ERROR;
35-
}
36-
3732
enum class Status : uint8_t
3833
{
3934
INACTIVE,
@@ -67,7 +62,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription()
6762
{"entry_version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>()), "Version of the entry."},
6863
{"initiator_host", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "Host that initiated the DDL operation."},
6964
{"initiator_port", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt16>()), "Port used by the initiator."},
70-
{"cluster", std::make_shared<DataTypeString>(), "Cluster name, empty if not determined."},
65+
{"cluster", std::make_shared<DataTypeString>(), "Cluster name."},
7166
{"query", std::make_shared<DataTypeString>(), "Query executed."},
7267
{"settings", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()), "Settings used in the DDL operation."},
7368
{"query_create_time", std::make_shared<DataTypeDateTime>(), "Query created time."},
@@ -90,23 +85,8 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task)
9085

9186
String description = fmt::format("from {}", task.entry_path);
9287
ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]);
93-
ASTPtr query;
94-
95-
try
96-
{
97-
query = parseQuery(
98-
parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]);
99-
}
100-
catch (const Exception & e)
101-
{
102-
LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster");
103-
if (e.code() == ErrorCodes::SYNTAX_ERROR)
104-
{
105-
/// ignore parse error and present available information
106-
return "";
107-
}
108-
throw;
109-
}
88+
ASTPtr query = parseQuery(
89+
parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]);
11090

11191
String cluster_name;
11292
if (const auto * query_on_cluster = dynamic_cast<const ASTQueryWithOnCluster *>(query.get()))
Lines changed: 21 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import pytest
2-
import time
32

43
from helpers.cluster import ClickHouseCluster
54

@@ -26,131 +25,46 @@ def started_cluster():
2625
try:
2726
cluster.start()
2827

28+
for i, node in enumerate([node1, node2]):
29+
node.query("CREATE DATABASE testdb")
30+
node.query(
31+
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format(
32+
i
33+
)
34+
)
35+
for i, node in enumerate([node3, node4]):
36+
node.query("CREATE DATABASE testdb")
37+
node.query(
38+
"""CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format(
39+
i
40+
)
41+
)
2942
yield cluster
3043

3144
finally:
3245
cluster.shutdown()
3346

3447

35-
def maintain_test_table(test_table):
36-
tmark = time.time() # to guarantee ZK path uniqueness
37-
38-
for i, node in enumerate([node1, node2]):
39-
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
40-
node.query("DROP DATABASE IF EXISTS testdb")
41-
42-
node.query("CREATE DATABASE testdb")
43-
node.query(
44-
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;"
45-
)
46-
for i, node in enumerate([node3, node4]):
47-
node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC")
48-
node.query("DROP DATABASE IF EXISTS testdb")
49-
50-
node.query("CREATE DATABASE testdb")
51-
node.query(
52-
f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;"
53-
)
54-
55-
5648
def test_distributed_ddl_queue(started_cluster):
57-
test_table = "test_table"
58-
maintain_test_table(test_table)
5949
node1.query(
60-
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
50+
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
6151
)
6252
node3.query(
63-
f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)"
53+
"INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
6454
)
65-
node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
66-
node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
55+
node2.query("SYSTEM SYNC REPLICA testdb.test_table")
56+
node4.query("SYSTEM SYNC REPLICA testdb.test_table")
6757

6858
node1.query(
69-
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
59+
"ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val",
7060
settings={"replication_alter_partitions_sync": "2"},
7161
)
7262
for node in nodes:
73-
node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}")
74-
assert (
75-
node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n"
76-
)
63+
node.query("SYSTEM SYNC REPLICA testdb.test_table")
64+
assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n"
7765
assert (
7866
node.query(
7967
"SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')"
8068
)
8169
== "ok\n"
8270
)
83-
84-
node1.query(
85-
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn",
86-
settings={"replication_alter_partitions_sync": "2"},
87-
)
88-
89-
90-
def test_distributed_ddl_rubbish(started_cluster):
91-
test_table = "test_table_rubbish"
92-
maintain_test_table(test_table)
93-
node1.query(
94-
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val",
95-
settings={"replication_alter_partitions_sync": "2"},
96-
)
97-
98-
zk_content = node1.query(
99-
"SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true",
100-
parse=True,
101-
).to_dict("records")
102-
103-
original_query = ""
104-
new_query = "query-artificial-" + str(time.monotonic_ns())
105-
106-
# Copy information about query (one that added 'somenewcolumn') with new query ID
107-
# and broken query text (TABLE => TUBLE)
108-
for row in zk_content:
109-
if row["value"].find("somenewcolumn") >= 0:
110-
original_query = row["name"]
111-
break
112-
113-
rows_to_insert = []
114-
115-
for row in zk_content:
116-
if row["name"] == original_query:
117-
rows_to_insert.append(
118-
{
119-
"name": new_query,
120-
"path": row["path"],
121-
"value": row["value"].replace("TABLE", "TUBLE"),
122-
}
123-
)
124-
continue
125-
pos = row["path"].find(original_query)
126-
if pos >= 0:
127-
rows_to_insert.append(
128-
{
129-
"name": row["name"],
130-
"path": row["path"].replace(original_query, new_query),
131-
"value": row["value"],
132-
}
133-
)
134-
135-
# Ingest it to ZK
136-
for row in rows_to_insert:
137-
node1.query(
138-
"insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format(
139-
f'{row["name"]}', f'{row["path"]}', f'{row["value"]}'
140-
)
141-
)
142-
143-
# Ensure that data is visible via system.distributed_ddl_queue
144-
assert (
145-
int(
146-
node1.query(
147-
f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''"
148-
)
149-
)
150-
== 4
151-
)
152-
153-
node1.query(
154-
f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn",
155-
settings={"replication_alter_partitions_sync": "2"},
156-
)

0 commit comments

Comments
 (0)