Skip to content

Commit a4d894e

Browse files
authored
Merge pull request ClickHouse#80038 from aalexfvk/reset_metadata_version_on_attach
Fix metadata version on attach part
2 parents 7cfa1f9 + 9a6d286 commit a4d894e

File tree

6 files changed

+93
-21
lines changed

6 files changed

+93
-21
lines changed

src/Storages/MergeTree/IMergeTreeDataPart.cpp

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,7 @@ ColumnsStatistics IMergeTreeDataPart::loadStatistics() const
828828
return result;
829829
}
830830

831-
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
831+
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency, bool load_metadata_version)
832832
{
833833
/// Memory should not be limited during ATTACH TABLE query.
834834
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
@@ -839,7 +839,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
839839
{
840840
if (!isStoredOnReadonlyDisk())
841841
loadUUID();
842-
loadColumns(require_columns_checksums);
842+
loadColumns(require_columns_checksums, load_metadata_version);
843843
loadColumnsSubstreams();
844844
loadChecksums(require_columns_checksums);
845845

@@ -1228,6 +1228,19 @@ void IMergeTreeDataPart::writeVersionMetadata(const VersionMetadata & version_,
12281228
}
12291229
}
12301230

1231+
void IMergeTreeDataPart::writeMetadataVersion(ContextPtr context, int32_t metadata_version_, bool sync)
1232+
{
1233+
removeMetadataVersion();
1234+
{
1235+
auto out_metadata = getDataPartStorage().writeFile(METADATA_VERSION_FILE_NAME, 4096, context->getWriteSettings());
1236+
writeText(metadata_version_, *out_metadata);
1237+
out_metadata->finalize();
1238+
if (sync)
1239+
out_metadata->sync();
1240+
}
1241+
old_part_with_no_metadata_version_on_disk = false;
1242+
}
1243+
12311244
void IMergeTreeDataPart::removeDeleteOnDestroyMarker()
12321245
{
12331246
getDataPartStorage().removeFileIfExists(DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED);
@@ -1612,7 +1625,7 @@ void IMergeTreeDataPart::loadUUID()
16121625
}
16131626
}
16141627

1615-
void IMergeTreeDataPart::loadColumns(bool require)
1628+
void IMergeTreeDataPart::loadColumns(bool require, bool load_metadata_version)
16161629
{
16171630
String path = fs::path(getDataPartStorage().getRelativePath()) / "columns.txt";
16181631

@@ -1656,20 +1669,24 @@ void IMergeTreeDataPart::loadColumns(bool require)
16561669
if (auto in = readFileIfExists(SERIALIZATION_FILE_NAME))
16571670
infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in);
16581671

1659-
int32_t loaded_metadata_version;
1660-
if (auto in = readFileIfExists(METADATA_VERSION_FILE_NAME))
1672+
std::optional<int32_t> loaded_metadata_version;
1673+
if (load_metadata_version)
16611674
{
1662-
readIntText(loaded_metadata_version, *in);
1675+
if (auto in = readFileIfExists(METADATA_VERSION_FILE_NAME))
1676+
{
1677+
readIntText(loaded_metadata_version.emplace(), *in);
1678+
}
16631679
}
1664-
else
1680+
1681+
if (!loaded_metadata_version)
16651682
{
16661683
auto storage_metdata_snapshot = storage.getInMemoryMetadataPtr();
16671684
loaded_metadata_version = storage_metdata_snapshot->getMetadataVersion();
16681685
old_part_with_no_metadata_version_on_disk = true;
16691686
}
16701687

1671-
LOG_DEBUG(storage.log, "Loaded metadata version {}", loaded_metadata_version);
1672-
setColumns(loaded_columns, infos, loaded_metadata_version);
1688+
LOG_DEBUG(storage.log, "Loaded metadata version {}", *loaded_metadata_version);
1689+
setColumns(loaded_columns, infos, *loaded_metadata_version);
16731690
}
16741691

16751692
void IMergeTreeDataPart::loadColumnsSubstreams()

src/Storages/MergeTree/IMergeTreeDataPart.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
137137

138138
/// Version of metadata for part (columns, pk and so on)
139139
int32_t getMetadataVersion() const { return metadata_version; }
140+
void setMetadataVersion(int32_t metadata_version_) noexcept { metadata_version = metadata_version_; }
141+
void writeMetadataVersion(ContextPtr local_context, int32_t metadata_version, bool sync);
140142

141143
const NamesAndTypesList & getColumns() const { return columns; }
142144
const ColumnsDescription & getColumnsDescription() const { return columns_description; }
@@ -164,7 +166,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
164166

165167
/// Initialize columns (from columns.txt if exists, or create from column files if not).
166168
/// Load various metadata into memory: checksums from checksums.txt, index if required, etc.
167-
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
169+
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency, bool load_metadata_version = true);
168170

169171
void loadRowsCountFileForUnexpectedPart();
170172

@@ -701,7 +703,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
701703

702704

703705
/// Reads columns names and types from columns.txt
704-
void loadColumns(bool require);
706+
void loadColumns(bool require, bool load_metadata_version);
705707

706708
/// Reads columns substreams from columns_substreams.txt (only in Compact parts).
707709
void loadColumnsSubstreams();

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5473,14 +5473,7 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part, C
54735473
/// Remove metadata version file and take it from table.
54745474
/// Currently we cannot attach parts with different schema, so
54755475
/// we can assume that it's equal to table's current schema.
5476-
part->removeMetadataVersion();
5477-
{
5478-
auto out_metadata = part->getDataPartStorage().writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, local_context->getWriteSettings());
5479-
writeText(metadata_version, *out_metadata);
5480-
out_metadata->finalize();
5481-
if (sync)
5482-
out_metadata->sync();
5483-
}
5476+
part->writeMetadataVersion(local_context, metadata_version, sync);
54845477

54855478
part->loadColumnsChecksumsIndexes(false, true);
54865479
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ namespace MergeTreeSetting
193193
extern const MergeTreeSettingsBool enable_the_endpoint_id_with_zookeeper_name_prefix;
194194
extern const MergeTreeSettingsFloat fault_probability_after_part_commit;
195195
extern const MergeTreeSettingsFloat fault_probability_before_part_commit;
196+
extern const MergeTreeSettingsBool fsync_after_insert;
196197
extern const MergeTreeSettingsUInt64 index_granularity_bytes;
197198
extern const MergeTreeSettingsSeconds lock_acquire_timeout_for_background_operations;
198199
extern const MergeTreeSettingsUInt64 max_bytes_to_merge_at_max_space_in_pool;
@@ -2236,7 +2237,8 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
22362237

22372238
try
22382239
{
2239-
part->loadColumnsChecksumsIndexes(true, true);
2240+
part->loadColumnsChecksumsIndexes(
2241+
/* require_columns_checksums = */ true, /* check_consistency = */ true, /* load_metadata_version = */ false);
22402242
}
22412243
catch (const Exception&)
22422244
{
@@ -2247,6 +2249,9 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
22472249

22482250
if (entry.part_checksum == part->checksums.getTotalChecksumHex())
22492251
{
2252+
auto metadata_version = getInMemoryMetadataPtr()->getMetadataVersion();
2253+
part->writeMetadataVersion(getContext(), metadata_version, (*getSettings())[MergeTreeSetting::fsync_after_insert]);
2254+
part->setMetadataVersion(metadata_version);
22502255
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();
22512256
return part;
22522257
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<clickhouse>
2+
<background_processing_pool_thread_sleep_seconds>1</background_processing_pool_thread_sleep_seconds>
3+
<background_processing_pool_thread_sleep_seconds_random_part>0</background_processing_pool_thread_sleep_seconds_random_part>
4+
<background_processing_pool_thread_sleep_seconds_if_nothing_to_do>0.0</background_processing_pool_thread_sleep_seconds_if_nothing_to_do>
5+
<background_processing_pool_task_sleep_seconds_when_no_work_min>0</background_processing_pool_task_sleep_seconds_when_no_work_min>
6+
<background_processing_pool_task_sleep_seconds_when_no_work_max>1</background_processing_pool_task_sleep_seconds_when_no_work_max>
7+
<background_processing_pool_task_sleep_seconds_when_no_work_multiplier>1</background_processing_pool_task_sleep_seconds_when_no_work_multiplier>
8+
<background_processing_pool_task_sleep_seconds_when_no_work_random_part>0</background_processing_pool_task_sleep_seconds_when_no_work_random_part>
9+
</clickhouse>

tests/integration/test_restore_replica/test.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def fill_nodes(nodes):
2323

2424

2525
cluster = ClickHouseCluster(__file__)
26-
configs = ["configs/remote_servers.xml"]
26+
configs = ["configs/remote_servers.xml", "configs/fast_background_pool.xml"]
2727

2828
node_1 = cluster.add_instance("replica1", with_zookeeper=True, main_configs=configs)
2929
node_2 = cluster.add_instance("replica2", with_zookeeper=True, main_configs=configs)
@@ -181,3 +181,49 @@ def test_restore_replica_alive_replicas(start_cluster):
181181
node_3.query("SYSTEM SYNC REPLICA test")
182182

183183
check_after_restoration()
184+
185+
186+
def test_fix_metadata_version_on_attach_part_after_restore(start_cluster):
187+
zk = cluster.get_kazoo_client("zoo1")
188+
189+
node_1.query("DROP TABLE IF EXISTS test_ttl ON CLUSTER 'test_cluster' SYNC")
190+
for node in [node_1, node_2]:
191+
node.query(
192+
"""
193+
CREATE TABLE test_ttl(n UInt32, d DateTime)
194+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_ttl/', '{replica}')
195+
TTL d + INTERVAL 5 SECOND DELETE
196+
ORDER BY n PARTITION BY n % 10
197+
SETTINGS merge_with_ttl_timeout = 0, number_of_free_entries_in_pool_to_execute_mutation = 0;
198+
""".format(
199+
replica=node.name
200+
)
201+
)
202+
203+
# Alter metadata to increment metadata version of the table
204+
node_1.query("ALTER TABLE test_ttl ADD COLUMN Added1 UInt32")
205+
node_1.query("ALTER TABLE test_ttl DROP COLUMN Added1")
206+
207+
node_1.query("SYSTEM STOP TTL MERGES test_ttl")
208+
209+
# Create a part
210+
node_1.query("INSERT INTO test_ttl VALUES (1, now())")
211+
212+
# Delete root zk metadata path for the table
213+
zk_rmr_with_retries(zk, "/clickhouse/tables/test_ttl")
214+
215+
# Restore replicas
216+
node_1.query("SYSTEM RESTART REPLICA test_ttl")
217+
node_1.query("SYSTEM RESTORE REPLICA test_ttl")
218+
# Disable TTL merges on node_1. We expect it to be done on node_2
219+
node_1.query("SYSTEM STOP TTL MERGES test_ttl")
220+
221+
node_2.query("SYSTEM RESTART REPLICA test_ttl")
222+
node_2.query("SYSTEM RESTORE REPLICA test_ttl")
223+
224+
# TTL merge should work.
225+
# Before fix it is failed due to metadata version discrepancy and we will have in the log:
226+
# "Source part metadata version 2 is newer then the table metadata version 0. ALTER_METADATA is still in progress"
227+
assert_eq_with_retry(
228+
node_2, "SELECT count() FROM test_ttl", "0\n", retry_count=60, sleep_time=1
229+
)

0 commit comments

Comments
 (0)