Skip to content

Commit faf7be5

Browse files
authored
Merge pull request ClickHouse#87222 from ClickHouse/backport/25.8/86479
Backport ClickHouse#86479 to 25.8: Handling loopback host ID properly to avoid collision when processing DDL tasks:
2 parents 7347e86 + aa15491 commit faf7be5

File tree

6 files changed

+236
-35
lines changed

6 files changed

+236
-35
lines changed

src/Interpreters/DDLTask.cpp

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
#include <Interpreters/DDLTask.h>
2-
#include <base/sort.h>
3-
#include <Common/DNSResolver.h>
4-
#include <Common/OpenTelemetryTraceContext.h>
5-
#include <Common/isLocalAddress.h>
6-
#include <Common/ZooKeeper/ZooKeeper.h>
1+
#include <Core/ServerSettings.h>
2+
#include <Core/ServerUUID.h>
73
#include <Core/Settings.h>
84
#include <Databases/DatabaseReplicated.h>
9-
#include <Interpreters/DatabaseCatalog.h>
10-
#include <Interpreters/Context.h>
11-
#include <IO/WriteHelpers.h>
12-
#include <IO/ReadHelpers.h>
135
#include <IO/Operators.h>
146
#include <IO/ReadBufferFromString.h>
15-
#include <Poco/Net/NetException.h>
16-
#include <Common/logger_useful.h>
7+
#include <IO/ReadHelpers.h>
8+
#include <IO/WriteHelpers.h>
9+
#include <Interpreters/Context.h>
10+
#include <Interpreters/DDLTask.h>
11+
#include <Interpreters/DDLWorker.h>
12+
#include <Interpreters/DatabaseCatalog.h>
1713
#include <Parsers/ASTQueryWithOnCluster.h>
14+
#include <Parsers/ASTQueryWithTableAndOutput.h>
1815
#include <Parsers/ParserQuery.h>
1916
#include <Parsers/parseQuery.h>
20-
#include <Parsers/ASTQueryWithTableAndOutput.h>
21-
17+
#include <base/sort.h>
18+
#include <Poco/Net/NetException.h>
19+
#include <Common/DNSResolver.h>
20+
#include <Common/OpenTelemetryTraceContext.h>
21+
#include <Common/ZooKeeper/ZooKeeper.h>
22+
#include <Common/isLocalAddress.h>
23+
#include <Common/logger_useful.h>
2224

2325
namespace DB
2426
{
@@ -31,7 +33,7 @@ namespace Setting
3133
extern const SettingsUInt64 max_parser_depth;
3234
extern const SettingsUInt64 max_parser_backtracks;
3335
extern const SettingsUInt64 max_query_size;
34-
}
36+
}
3537

3638
namespace ErrorCodes
3739
{
@@ -55,7 +57,27 @@ bool HostID::isLocalAddress(UInt16 clickhouse_port) const
5557
{
5658
try
5759
{
58-
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
60+
auto address = DNSResolver::instance().resolveAddress(host_name, port);
61+
return DB::isLocalAddress(address, clickhouse_port);
62+
}
63+
catch (const DB::NetException &)
64+
{
65+
/// Avoid "Host not found" exceptions
66+
return false;
67+
}
68+
catch (const Poco::Net::NetException &)
69+
{
70+
/// Avoid "Host not found" exceptions
71+
return false;
72+
}
73+
}
74+
75+
bool HostID::isLoopbackHost() const
76+
{
77+
try
78+
{
79+
auto address = DNSResolver::instance().resolveAddress(host_name, port);
80+
return address.host().isLoopback();
5981
}
6082
catch (const DB::NetException &)
6183
{
@@ -270,10 +292,7 @@ bool DDLTask::findCurrentHostID(ContextPtr global_context, LoggerPtr log, const
270292

271293
if (config_host_name)
272294
{
273-
bool is_local_port = (maybe_secure_port && HostID(*config_host_name, *maybe_secure_port).isLocalAddress(*maybe_secure_port)) ||
274-
HostID(*config_host_name, port).isLocalAddress(port);
275-
276-
if (!is_local_port)
295+
if (!IsSelfHostname(*config_host_name, maybe_secure_port, port))
277296
throw Exception(
278297
ErrorCodes::DNS_ERROR,
279298
"{} is not a local address. Check parameter 'host_name' in the configuration",
@@ -298,12 +317,28 @@ bool DDLTask::findCurrentHostID(ContextPtr global_context, LoggerPtr log, const
298317

299318
try
300319
{
301-
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
302-
bool is_local_port
303-
= (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
304-
305-
if (!is_local_port)
320+
if (!IsSelfHostID(host, maybe_secure_port, port))
306321
continue;
322+
323+
if (host.isLoopbackHost())
324+
{
325+
String current_host_id_str = host.toString();
326+
String active_id = toString(ServerUUID::get());
327+
String active_path = fs::path(global_context->getDDLWorker().getReplicasDir()) / current_host_id_str / "active";
328+
String content;
329+
Coordination::Stat stat;
330+
if (!zookeeper->tryGet(active_path, content, &stat))
331+
{
332+
LOG_TRACE(log, "HostID {} is a loopback host which has not been claimed by any replica", current_host_id_str);
333+
continue;
334+
}
335+
336+
if (content != active_id)
337+
{
338+
LOG_TRACE(log, "HostID {} is a loopback host which is claimed by another replica {}", current_host_id_str, content);
339+
continue;
340+
}
341+
}
307342
}
308343
catch (const Exception & e)
309344
{
@@ -506,6 +541,20 @@ String DDLTask::getShardID() const
506541
return res;
507542
}
508543

544+
bool DDLTask::IsSelfHostID(const HostID & checking_host_id, std::optional<UInt16> maybe_self_secure_port, UInt16 self_port)
545+
{
546+
// If the checking_host_id has a loopback address, it is not considered as the self host_id.
547+
// Because all replicas will try to claim it as their own hosts.
548+
return (maybe_self_secure_port && checking_host_id.isLocalAddress(*maybe_self_secure_port))
549+
|| checking_host_id.isLocalAddress(self_port);
550+
}
551+
552+
bool DDLTask::IsSelfHostname(const String & checking_host_name, std::optional<UInt16> maybe_self_secure_port, UInt16 self_port)
553+
{
554+
return (maybe_self_secure_port && HostID(checking_host_name, *maybe_self_secure_port).isLocalAddress(*maybe_self_secure_port))
555+
|| HostID(checking_host_name, self_port).isLocalAddress(self_port);
556+
}
557+
509558
DatabaseReplicatedTask::DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_)
510559
: DDLTaskBase(name, path)
511560
, database(database_)

src/Interpreters/DDLTask.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ struct HostID
6161
}
6262

6363
bool isLocalAddress(UInt16 clickhouse_port) const;
64+
bool isLoopbackHost() const;
6465

6566
static String applyToString(const HostID & host_id)
6667
{
@@ -157,6 +158,9 @@ struct DDLTask : public DDLTaskBase
157158

158159
String getShardID() const override;
159160

161+
static bool IsSelfHostID(const HostID & checking_host_id, std::optional<UInt16> maybe_self_secure_port, UInt16 self_port);
162+
static bool IsSelfHostname(const String & checking_host_name, std::optional<UInt16> maybe_self_secure_port, UInt16 self_port);
163+
160164
private:
161165
bool tryFindHostInCluster();
162166
bool tryFindHostInClusterViaResolving(ContextPtr context);

src/Interpreters/DDLWorker.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11

2+
#include <Core/ServerSettings.h>
23
#include <Core/ServerUUID.h>
34
#include <Core/Settings.h>
5+
#include <IO/NullWriteBuffer.h>
46
#include <IO/ReadBufferFromString.h>
57
#include <IO/ReadHelpers.h>
68
#include <IO/WriteHelpers.h>
79
#include <Interpreters/Cluster.h>
810
#include <Interpreters/Context.h>
9-
#include <Interpreters/DatabaseCatalog.h>
1011
#include <Interpreters/DDLTask.h>
1112
#include <Interpreters/DDLWorker.h>
13+
#include <Interpreters/DatabaseCatalog.h>
1214
#include <Interpreters/ZooKeeperLog.h>
1315
#include <Interpreters/executeQuery.h>
1416
#include <Parsers/ASTAlterQuery.h>
@@ -19,7 +21,6 @@
1921
#include <Parsers/ASTQueryWithOnCluster.h>
2022
#include <Parsers/ASTQueryWithTableAndOutput.h>
2123
#include <Parsers/ParserQuery.h>
22-
#include <IO/NullWriteBuffer.h>
2324
#include <Storages/IStorage.h>
2425
#include <Storages/StorageReplicatedMergeTree.h>
2526
#include <Poco/Timestamp.h>
@@ -1349,10 +1350,7 @@ void DDLWorker::markReplicasActive(bool reinitialized)
13491350
try
13501351
{
13511352
HostID host = HostID::fromString(host_id);
1352-
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
1353-
bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
1354-
1355-
if (is_local_host)
1353+
if (DDLTask::IsSelfHostID(host, maybe_secure_port, port))
13561354
local_host_ids.emplace(host_id);
13571355
}
13581356
catch (const Exception & e)
@@ -1366,17 +1364,36 @@ void DDLWorker::markReplicasActive(bool reinitialized)
13661364
{
13671365
auto it = active_node_holders.find(host_id);
13681366
if (it != active_node_holders.end())
1369-
{
13701367
continue;
1371-
}
13721368

13731369
String active_path = fs::path(replicas_dir) / host_id / "active";
13741370
String active_id = toString(ServerUUID::get());
13751371

13761372
LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id);
1373+
if (HostID::fromString(host_id).isLoopbackHost())
1374+
{
1375+
String content;
1376+
Coordination::Stat stat;
1377+
if (zookeeper->tryGet(active_path, content, &stat))
1378+
{
1379+
// For a loopback host, many replicas might try to claim it as their own host.
1380+
// If the host is claimed by a replica, we skip it.
1381+
// Loopback host is supposed to be used in test environment.
1382+
if (content != active_id)
1383+
{
1384+
LOG_TRACE(log, "HostID {} is a loopback host which is claimed by another replica {}", host_id, content);
1385+
continue;
1386+
}
13771387

1378-
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
1379-
1388+
auto code = zookeeper->tryRemove(active_path, stat.version);
1389+
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
1390+
throw Coordination::Exception::fromPath(code, active_path);
1391+
}
1392+
}
1393+
else
1394+
{
1395+
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
1396+
}
13801397
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
13811398
auto active_node_holder_zookeeper = zookeeper;
13821399
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);

tests/integration/test_ddl_worker_with_loopback_hosts/__init__.py

Whitespace-only changes.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<clickhouse>
2+
<remote_servers>
3+
<test_cluster>
4+
<shard>
5+
<internal_replication>true</internal_replication>
6+
<replica>
7+
<host>node1</host>
8+
<port>9000</port>
9+
</replica>
10+
<replica>
11+
<host>node2</host>
12+
<port>9000</port>
13+
</replica>
14+
</shard>
15+
</test_cluster>
16+
<test_loopback_cluster1>
17+
<shard>
18+
<replica>
19+
<host>127.0.0.1</host>
20+
<port>9000</port>
21+
</replica>
22+
</shard>
23+
</test_loopback_cluster1>
24+
<test_loopback_cluster2>
25+
<shard>
26+
<replica>
27+
<host>localhost</host>
28+
<port>9000</port>
29+
</replica>
30+
</shard>
31+
</test_loopback_cluster2>
32+
</remote_servers>
33+
</clickhouse>
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import time
2+
3+
import pytest
4+
5+
from helpers.client import QueryRuntimeException
6+
from helpers.cluster import ClickHouseCluster
7+
from helpers.network import PartitionManager
8+
9+
cluster = ClickHouseCluster(__file__)
10+
node1 = cluster.add_instance(
11+
"node1",
12+
main_configs=[
13+
"configs/remote_servers.xml",
14+
],
15+
with_zookeeper=True,
16+
)
17+
node2 = cluster.add_instance(
18+
"node2",
19+
main_configs=[
20+
"configs/remote_servers.xml",
21+
],
22+
with_zookeeper=True,
23+
)
24+
25+
26+
@pytest.fixture(scope="module")
27+
def started_cluster():
28+
try:
29+
cluster.start()
30+
31+
yield cluster
32+
finally:
33+
cluster.shutdown()
34+
35+
36+
def count_table(node, table_name):
37+
return int(
38+
node.query(
39+
f"SELECT count() FROM system.tables WHERE name='{table_name}'"
40+
).strip()
41+
)
42+
43+
44+
def test_ddl_worker_with_loopback_hosts(
45+
started_cluster,
46+
):
47+
node1.query("DROP TABLE IF EXISTS t1 SYNC")
48+
node2.query("DROP TABLE IF EXISTS t1 SYNC")
49+
node1.query("DROP TABLE IF EXISTS t2 SYNC")
50+
node2.query("DROP TABLE IF EXISTS t2 SYNC")
51+
node1.query("DROP TABLE IF EXISTS t3 SYNC")
52+
node2.query("DROP TABLE IF EXISTS t3 SYNC")
53+
node1.query("DROP TABLE IF EXISTS t4 SYNC")
54+
node2.query("DROP TABLE IF EXISTS t4 SYNC")
55+
56+
node1.query(
57+
"CREATE TABLE t1 ON CLUSTER 'test_cluster' (x INT) ENGINE=MergeTree() ORDER BY x",
58+
settings={
59+
"distributed_ddl_task_timeout": 10,
60+
},
61+
)
62+
63+
node2.query(
64+
"CREATE TABLE t2 ON CLUSTER 'test_cluster' (x INT) ENGINE=MergeTree() ORDER BY x",
65+
settings={
66+
"distributed_ddl_task_timeout": 10,
67+
},
68+
)
69+
70+
assert count_table(node1, "t2") == 1
71+
assert count_table(node2, "t1") == 1
72+
73+
node1.query(
74+
"CREATE TABLE t3 ON CLUSTER 'test_loopback_cluster1' (x INT) ENGINE=MergeTree() ORDER BY x",
75+
settings={
76+
"distributed_ddl_task_timeout": 10,
77+
},
78+
)
79+
# test_loopback_cluster1 has a loopback host, only 1 replica processed the query
80+
assert count_table(node1, "t3") == 1 or count_table(node2, "t3") == 1
81+
82+
node2.query(
83+
"CREATE TABLE t4 ON CLUSTER 'test_loopback_cluster2' (x INT) ENGINE=MergeTree() ORDER BY x",
84+
settings={
85+
"distributed_ddl_task_timeout": 10,
86+
},
87+
)
88+
# test_loopback_cluster2 has a loopback host, only 1 replica processed the query
89+
assert count_table(node1, "t4") == 1 or count_table(node2, "t4") == 1
90+
91+
node1.query("DROP TABLE IF EXISTS t1 SYNC")
92+
node2.query("DROP TABLE IF EXISTS t1 SYNC")
93+
node1.query("DROP TABLE IF EXISTS t2 SYNC")
94+
node2.query("DROP TABLE IF EXISTS t2 SYNC")
95+
node1.query("DROP TABLE IF EXISTS t3 SYNC")
96+
node2.query("DROP TABLE IF EXISTS t3 SYNC")
97+
node1.query("DROP TABLE IF EXISTS t4 SYNC")
98+
node2.query("DROP TABLE IF EXISTS t4 SYNC")

0 commit comments

Comments
 (0)