Skip to content

Commit 5eaa932

Browse files
Merge pull request ClickHouse#79566 from jkartseva/part-operations-plain-disks
Allow partition operations for tables on different plain_rewritable disks
2 parents 9a2019a + 8fb5dda commit 5eaa932

File tree

8 files changed

+126
-27
lines changed

8 files changed

+126
-27
lines changed

src/Disks/IDisk.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,9 @@ class IDisk : public Space
442442

443443
virtual bool isReadOnly() const { return false; }
444444

445+
/// If the disk is plain object storage.
446+
virtual bool isPlain() const { return false; }
447+
445448
virtual bool isWriteOnce() const { return false; }
446449

447450
virtual bool supportsHardLinks() const { return true; }

src/Disks/IStoragePolicy.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class IStoragePolicy
5959
VolumePtr getVolumeByName(const String & volume_name) const;
6060
/// Checks if storage policy can be replaced by another one.
6161
virtual void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const = 0;
62+
/// If the policy allows table partition operations (move, replace) with the other storage policy.
63+
virtual bool isCompatibleForPartitionOps(const StoragePolicyPtr & other) const = 0;
6264
/// Finds a volume index, which contains disk
6365
virtual std::optional<size_t> tryGetVolumeIndexByDiskName(const String & disk_name) const = 0;
6466
size_t getVolumeIndexByDiskName(const String & disk_name) const;

src/Disks/ObjectStorages/DiskObjectStorage.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
205205
/// with static files, so only read-only operations are allowed for this storage.
206206
bool isReadOnly() const override;
207207

208-
bool isPlain() const;
208+
bool isPlain() const override;
209209

210210
/// Is object write-once?
211211
/// For example: S3PlainObjectStorage is write once, this means that it

src/Disks/StoragePolicy.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <Disks/VolumeJBOD.h>
1313

1414
#include <algorithm>
15+
#include <ranges>
1516
#include <set>
1617

1718

@@ -392,6 +393,22 @@ void StoragePolicy::checkCompatibleWith(const StoragePolicyPtr & new_storage_pol
392393
}
393394
}
394395

396+
bool StoragePolicy::isCompatibleForPartitionOps(const StoragePolicyPtr & other) const
397+
{
398+
if (getName() == other->getName())
399+
return true;
400+
401+
constexpr auto is_compatible = [](const auto & disk) -> bool
402+
{
403+
if (disk->isPlain())
404+
return true;
405+
if (auto delegate = disk->getDelegateDiskIfExists())
406+
return delegate->isPlain();
407+
return false;
408+
};
409+
410+
return std::ranges::all_of(getDisks(), is_compatible) && std::ranges::all_of(other->getDisks(), is_compatible);
411+
}
395412

396413
std::optional<size_t> StoragePolicy::tryGetVolumeIndexByDiskName(const String & disk_name) const
397414
{

src/Disks/StoragePolicy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ class StoragePolicy : public IStoragePolicy
8282
/// Checks if storage policy can be replaced by another one.
8383
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const override;
8484

85+
/// If the policy allows table partition operations (move, replace) with the other storage policy.
86+
bool isCompatibleForPartitionOps(const StoragePolicyPtr & other) const override;
87+
8588
/// Check if we have any volume with stopped merges
8689
bool hasAnyVolumeWithDisabledMerges() const override;
8790

src/Storages/StorageMergeTree.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2268,6 +2268,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
22682268

22692269
static const String TMP_PREFIX = "tmp_replace_from_";
22702270

2271+
bool are_policies_partition_op_compatible = getStoragePolicy()->isCompatibleForPartitionOps(source_table->getStoragePolicy());
22712272
for (const DataPartPtr & src_part : src_parts)
22722273
{
22732274
if (is_all)
@@ -2294,7 +2295,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
22942295
clone_params,
22952296
local_context->getReadSettings(),
22962297
local_context->getWriteSettings(),
2297-
true/*must_on_same_disk*/);
2298+
!are_policies_partition_op_compatible /*must_on_same_disk*/);
22982299
dst_parts.emplace_back(std::move(dst_part));
22992300
dst_parts_locks.emplace_back(std::move(part_lock));
23002301
}
@@ -2371,13 +2372,19 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
23712372
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
23722373
"Table {} supports movePartitionToTable only for MergeTree family of table engines. Got {}",
23732374
getStorageID().getNameForLogs(), dest_table->getName());
2374-
if (dest_table_storage->getStoragePolicy() != this->getStoragePolicy())
2375-
throw Exception(ErrorCodes::UNKNOWN_POLICY,
2376-
"Destination table {} should have the same storage policy of source table {}. {}: {}, {}: {}",
2377-
dest_table_storage->getStorageID().getNameForLogs(),
2378-
getStorageID().getNameForLogs(), getStorageID().getNameForLogs(),
2379-
this->getStoragePolicy()->getName(), dest_table_storage->getStorageID().getNameForLogs(),
2380-
dest_table_storage->getStoragePolicy()->getName());
2375+
bool are_policies_partition_op_compatible = getStoragePolicy()->isCompatibleForPartitionOps(dest_table_storage->getStoragePolicy());
2376+
2377+
if (!are_policies_partition_op_compatible)
2378+
throw Exception(
2379+
ErrorCodes::UNKNOWN_POLICY,
2380+
"Destination table {} should have the same storage policy of source table, or the policies must be compatible for partition "
2381+
"operations {}. {}: {}, {}: {}",
2382+
dest_table_storage->getStorageID().getNameForLogs(),
2383+
getStorageID().getNameForLogs(),
2384+
getStorageID().getNameForLogs(),
2385+
this->getStoragePolicy()->getName(),
2386+
dest_table_storage->getStorageID().getNameForLogs(),
2387+
dest_table_storage->getStoragePolicy()->getName());
23812388

23822389
// Use the same back-pressure (delay/throw) logic as for INSERTs to be consistent and avoid possibility of exceeding part limits using MOVE PARTITION queries
23832390
dest_table_storage->delayInsertOrThrowIfNeeded(nullptr, local_context, true);
@@ -2437,7 +2444,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
24372444
clone_params,
24382445
local_context->getReadSettings(),
24392446
local_context->getWriteSettings(),
2440-
true/*must_on_same_disk*/
2447+
!are_policies_partition_op_compatible /*must_on_same_disk*/
24412448
);
24422449

24432450
dst_parts.emplace_back(std::move(dst_part));

tests/integration/test_s3_plain_rewritable_rotate_tables/configs/storage_conf.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,19 @@
1010
<access_key_id>minio</access_key_id>
1111
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
1212
</disk_s3_plain_rewritable>
13+
<disk_cache_s3_plain_rewritable>
14+
<type>cache</type>
15+
<disk>disk_s3_plain_rewritable</disk>
16+
<path>disks/cache/</path>
17+
<max_size>1000000000</max_size>
18+
<cache_on_write_operations>1</cache_on_write_operations>
19+
</disk_cache_s3_plain_rewritable>
20+
<disk_encrypted>
21+
<type>encrypted</type>
22+
<disk>disk_cache_s3_plain_rewritable</disk>
23+
<key>1234567812345678</key>
24+
<path>disks/encrypted/</path>
25+
</disk_encrypted>
1326
</disks>
1427
<policies>
1528
<s3_plain_rewritable>
@@ -19,6 +32,20 @@
1932
</main>
2033
</volumes>
2134
</s3_plain_rewritable>
35+
<cache>
36+
<volumes>
37+
<main>
38+
<disk>disk_cache_s3_plain_rewritable</disk>
39+
</main>
40+
</volumes>
41+
</cache>
42+
<encrypted>
43+
<volumes>
44+
<main>
45+
<disk>disk_encrypted</disk>
46+
</main>
47+
</volumes>
48+
</encrypted>
2249
</policies>
2350
</storage_configuration>
2451
</clickhouse>

tests/integration/test_s3_plain_rewritable_rotate_tables/test.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,37 @@ def randomize_name(table_name, random_suffix_length=8):
2323
return f"{table_name}_{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
2424

2525

26+
s3 = """disk(
27+
name={},
28+
type='s3_plain_rewritable',
29+
endpoint='http://minio1:9001/root/data/node1',
30+
access_key_id='minio',
31+
secret_access_key='ClickHouse_Minio_P@ssw0rd')"""
32+
33+
cache = """disk(
34+
type='cache',
35+
disk={},
36+
path='disks/cache/',
37+
cache_on_write_operations=1,
38+
max_size='1Gi')""".format(
39+
s3
40+
)
41+
42+
encrypted = """disk(
43+
type='encrypted',
44+
disk={},
45+
path='disks/encrypted/',
46+
key='1234567812345678')""".format(
47+
cache
48+
)
49+
50+
disk_defs = {
51+
"s3_plain_rewritable": s3,
52+
"cache": cache,
53+
"encrypted": encrypted,
54+
}
55+
56+
2657
@pytest.fixture(scope="module", autouse=True)
2758
def start_cluster():
2859
cluster.add_instance(
@@ -51,7 +82,16 @@ def start_cluster():
5182
cluster.shutdown()
5283

5384

54-
def test_alter_partition_after_table_rotation():
85+
@pytest.mark.parametrize(
86+
"storage_policy",
87+
[
88+
"s3_plain_rewritable",
89+
"cache",
90+
"encrypted",
91+
],
92+
)
93+
@pytest.mark.parametrize("partition_ops_on_same_disk", [1, 0])
94+
def test_alter_partition_after_table_rotation(start_cluster, storage_policy, partition_ops_on_same_disk):
5595
node1 = cluster.instances["node1"]
5696

5797
def create_insert(node, table_name, insert_values):
@@ -63,7 +103,7 @@ def create_insert(node, table_name, insert_values):
63103
) ENGINE=MergeTree()
64104
ORDER BY id
65105
PARTITION BY id%10
66-
SETTINGS storage_policy='s3_plain_rewritable'
106+
SETTINGS storage_policy='{storage_policy}'
67107
"""
68108
)
69109

@@ -80,44 +120,43 @@ def create_insert(node, table_name, insert_values):
80120

81121
node1.query(f"DETACH TABLE {table1} PERMANENTLY SYNC")
82122

83-
disk_name = randomize_name("disk")
84123
node2 = cluster.instances["node2"]
85124
table2 = randomize_name("table2")
86125
node2.query(f"DROP TABLE IF EXISTS {table2} SYNC")
126+
127+
disk_name = randomize_name("disk")
128+
disk_def = disk_defs[storage_policy].format(disk_name)
129+
policy_def = (
130+
f"disk={disk_def}"
131+
if partition_ops_on_same_disk
132+
else f"storage_policy='{storage_policy}'"
133+
)
134+
87135
node2.query(
88136
f"""CREATE TABLE {table2} (id Int64, data String)
89137
ENGINE=MergeTree()
90138
ORDER BY id
91139
PARTITION BY id%10
92-
SETTINGS disk=disk(
93-
name={disk_name},
94-
type='s3_plain_rewritable',
95-
endpoint='http://minio1:9001/root/data/node1',
96-
access_key_id='minio',
97-
secret_access_key='ClickHouse_Minio_P@ssw0rd')
140+
SETTINGS {policy_def}
98141
"""
99142
)
100143

101144
rotated_table = f"{table1}_rotated"
102145
node2.query(f"""DROP TABLE IF EXISTS {rotated_table} SYNC""")
146+
103147
node2.query(
104148
f"""ATTACH TABLE {rotated_table} UUID '{uuid1}' (id Int64, data String)
105149
ENGINE=MergeTree()
106150
ORDER BY id
107151
PARTITION BY id%10
108-
SETTINGS disk=disk(
109-
name={disk_name},
110-
type='s3_plain_rewritable',
111-
endpoint='http://minio1:9001/root/data/node1',
112-
access_key_id='minio',
113-
secret_access_key='ClickHouse_Minio_P@ssw0rd')
152+
SETTINGS disk={disk_def}
114153
"""
115154
)
116155

117156
assert (
118157
int(
119158
node2.query(
120-
f"SELECT count(*) FROM {rotated_table} WHERE _partition_id = '0'"
159+
f"SELECT count(*) FROM {rotated_table} WHERE _partition_id= '0'"
121160
)
122161
)
123162
== 100
@@ -126,9 +165,10 @@ def create_insert(node, table_name, insert_values):
126165
assert int(node2.query(f"SELECT count(*) FROM {rotated_table}")) == 1000
127166

128167
node2.query(f"""ALTER TABLE {rotated_table} MOVE PARTITION '0' TO TABLE {table2}""")
168+
node2.query(f"""ALTER TABLE {table2} REPLACE PARTITION '1' FROM {rotated_table}""")
129169

130170
assert int(node2.query(f"SELECT count(*) FROM {rotated_table}")) == 900
131-
assert int(node2.query(f"SELECT count(*) FROM {table2}")) == 100
171+
assert int(node2.query(f"SELECT count(*) FROM {table2}")) == 200
132172

133173
node2.query(f"DROP TABLE {rotated_table} SYNC")
134174
node2.query(f"DROP TABLE {table2} SYNC")

0 commit comments

Comments
 (0)