Skip to content

Commit ee2abd0

Browse files
committed
implement ttl that depends on cleanup, try to make tests more stable
1 parent fb2d7f7 commit ee2abd0

File tree

7 files changed

+166
-66
lines changed

7 files changed

+166
-66
lines changed

src/Core/Settings.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6879,6 +6879,10 @@ Ignore existing partition export and overwrite the zookeeper entry
68796879
)", 0) \
68806880
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
68816881
Maximum number of retries for exporting a merge tree part in an export partition task
6882+
)", 0) \
6883+
DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 180, R"(
6884+
Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination.
6885+
This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones.
68826886
)", 0) \
68836887
\
68846888
/* ####################################################### */ \

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct ExportReplicatedMergeTreePartitionManifest
2020
std::vector<String> parts;
2121
time_t create_time;
2222
size_t max_retries;
23+
size_t ttl_seconds;
2324

2425
std::string toJsonString() const
2526
{
@@ -38,6 +39,7 @@ struct ExportReplicatedMergeTreePartitionManifest
3839

3940
json.set("create_time", create_time);
4041
json.set("max_retries", max_retries);
42+
json.set("ttl_seconds", ttl_seconds);
4143
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
4244
oss.exceptions(std::ios::failbit);
4345
Poco::JSON::Stringifier::stringify(json, oss);
@@ -63,6 +65,7 @@ struct ExportReplicatedMergeTreePartitionManifest
6365
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));
6466

6567
manifest.create_time = json->getValue<time_t>("create_time");
68+
manifest.ttl_seconds = json->getValue<size_t>("ttl_seconds");
6669
return manifest;
6770
}
6871
};

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,35 @@ table_path/
5555
This is the overall idea, but please read the code to get a better understanding
5656
*/
5757

58+
struct CleanupLockRAII
59+
{
60+
CleanupLockRAII(const zkutil::ZooKeeperPtr & zk_, const std::string & cleanup_lock_path_, const std::string & replica_name_, const LoggerPtr & log_)
61+
: cleanup_lock_path(cleanup_lock_path_), zk(zk_), replica_name(replica_name_), log(log_)
62+
{
63+
is_locked = zk->tryCreate(cleanup_lock_path, replica_name, ::zkutil::CreateMode::Ephemeral) == Coordination::Error::ZOK;
64+
65+
if (is_locked)
66+
{
67+
LOG_INFO(log, "ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries");
68+
}
69+
}
70+
71+
~CleanupLockRAII()
72+
{
73+
if (is_locked)
74+
{
75+
LOG_INFO(log, "ExportPartition Manifest Updating Task: Releasing cleanup lock");
76+
zk->tryRemove(cleanup_lock_path);
77+
}
78+
}
79+
80+
bool is_locked;
81+
std::string cleanup_lock_path;
82+
zkutil::ZooKeeperPtr zk;
83+
std::string replica_name;
84+
LoggerPtr log;
85+
};
86+
5887
ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(StorageReplicatedMergeTree & storage_)
5988
: storage(storage_)
6089
{
@@ -69,12 +98,7 @@ void ExportPartitionManifestUpdatingTask::run()
6998
const std::string exports_path = fs::path(storage.zookeeper_path) / "exports";
7099
const std::string cleanup_lock_path = fs::path(storage.zookeeper_path) / "exports_cleanup_lock";
71100

72-
bool cleanup_lock_acquired = zk->tryCreate(cleanup_lock_path, "", ::zkutil::CreateMode::Ephemeral) == Coordination::Error::ZOK;
73-
74-
if (cleanup_lock_acquired)
75-
{
76-
LOG_INFO(storage.log, "ExportPartition: Cleanup lock acquired, will remove stale entries");
77-
}
101+
CleanupLockRAII cleanup_lock(zk, cleanup_lock_path, storage.replica_name, storage.log.load());
78102

79103
Coordination::Stat stat;
80104
const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback);
@@ -92,7 +116,7 @@ void ExportPartitionManifestUpdatingTask::run()
92116
std::string metadata_json;
93117
if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json))
94118
{
95-
LOG_INFO(storage.log, "ExportPartition: Skipping {}: missing metadata.json", key);
119+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing metadata.json", key);
96120
continue;
97121
}
98122

@@ -107,58 +131,58 @@ void ExportPartitionManifestUpdatingTask::run()
107131
&& local_entry->manifest.transaction_id == metadata.transaction_id;
108132

109133
/// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done.
110-
if (!cleanup_lock_acquired && has_local_entry_and_is_up_to_date)
134+
if (!cleanup_lock.is_locked && has_local_entry_and_is_up_to_date)
111135
continue;
112136

113137
std::string status;
114138
if (!zk->tryGet(fs::path(entry_path) / "status", status))
115139
{
116-
LOG_INFO(storage.log, "ExportPartition: Skipping {}: missing status", key);
140+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key);
117141
continue;
118142
}
119143

120144
bool is_not_pending = status != "PENDING";
121145

122-
if (cleanup_lock_acquired)
146+
if (cleanup_lock.is_locked)
123147
{
124-
bool has_expired = metadata.create_time < now - 180;
148+
bool has_expired = metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);
125149

126150
if (has_expired && is_not_pending)
127151
{
128152
zk->tryRemoveRecursive(fs::path(entry_path));
129153
auto it = entries_by_key.find(key);
130154
if (it != entries_by_key.end())
131155
entries_by_key.erase(it);
132-
LOG_INFO(storage.log, "ExportPartition: Removed {}: expired", key);
156+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Removed {}: expired", key);
133157
continue;
134158
}
135159
}
136160

137161
if (is_not_pending)
138162
{
139-
LOG_INFO(storage.log, "ExportPartition: Skipping {}: status is not PENDING", key);
163+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: status is not PENDING", key);
140164
continue;
141165
}
142166

143167

144-
if (cleanup_lock_acquired)
168+
if (cleanup_lock.is_locked)
145169
{
146170
std::vector<std::string> parts_in_processing_or_pending;
147171
if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(entry_path) / "processing", parts_in_processing_or_pending))
148172
{
149-
LOG_INFO(storage.log, "ExportPartition: Failed to get parts in processing or pending, skipping");
173+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get parts in processing or pending, skipping");
150174
continue;
151175
}
152176

153177
if (parts_in_processing_or_pending.empty())
154178
{
155-
LOG_INFO(storage.log, "ExportPartition: Cleanup found PENDING for {} with all parts exported, try to fix it by committing the export", entry_path);
179+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Cleanup found PENDING for {} with all parts exported, try to fix it by committing the export", entry_path);
156180

157181
const auto destination_storage_id = StorageID(QualifiedTableName {metadata.destination_database, metadata.destination_table});
158182
const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, storage.getContext());
159183
if (!destination_storage)
160184
{
161-
LOG_INFO(storage.log, "ExportPartition: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
185+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
162186
continue;
163187
}
164188

@@ -169,7 +193,7 @@ void ExportPartitionManifestUpdatingTask::run()
169193

170194
if (has_local_entry_and_is_up_to_date)
171195
{
172-
LOG_INFO(storage.log, "ExportPartition: Skipping {}: already exists", key);
196+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key);
173197
continue;
174198
}
175199

@@ -204,7 +228,7 @@ void ExportPartitionManifestUpdatingTask::run()
204228
}
205229

206230
const auto & transaction_id = it->manifest.transaction_id;
207-
LOG_INFO(storage.log, "ExportPartition: Export task {} was deleted, calling killExportPartition for transaction {}", key, transaction_id);
231+
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Export task {} was deleted, calling killExportPartition for transaction {}", key, transaction_id);
208232

209233
try
210234
{
@@ -218,7 +242,7 @@ void ExportPartitionManifestUpdatingTask::run()
218242
it = entries_by_key.erase(it);
219243
}
220244

221-
if (cleanup_lock_acquired)
245+
if (cleanup_lock.is_locked)
222246
{
223247
zk->tryRemove(cleanup_lock_path);
224248
}

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,42 +34,42 @@ void ExportPartitionTaskScheduler::run()
3434

3535
if (!destination_storage)
3636
{
37-
LOG_INFO(storage.log, "ExportPartition: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
37+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
3838
continue;
3939
}
4040

4141
std::string status;
4242
if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", status))
4343
{
44-
LOG_INFO(storage.log, "ExportPartition: Failed to get status, skipping");
44+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get status, skipping");
4545
continue;
4646
}
4747

4848
if (status != "PENDING")
4949
{
50-
LOG_INFO(storage.log, "ExportPartition: Skipping... Status is not PENDING");
50+
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status is not PENDING");
5151
continue;
5252
}
5353

5454
std::vector<std::string> parts_in_processing_or_pending;
5555

5656
if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "processing", parts_in_processing_or_pending))
5757
{
58-
LOG_INFO(storage.log, "ExportPartition: Failed to get parts in processing or pending, skipping");
58+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get parts in processing or pending, skipping");
5959
continue;
6060
}
6161

6262
if (parts_in_processing_or_pending.empty())
6363
{
64-
LOG_INFO(storage.log, "ExportPartition: No parts in processing or pending, skipping");
64+
LOG_INFO(storage.log, "ExportPartition scheduler task: No parts in processing or pending, skipping");
6565
continue;
6666
}
6767

6868
std::vector<std::string> locked_parts;
6969

7070
if (Coordination::Error::ZOK != zk->tryGetChildren(fs::path(storage.zookeeper_path) / "exports" / key / "locks", locked_parts))
7171
{
72-
LOG_INFO(storage.log, "ExportPartition: Failed to get locked parts, skipping");
72+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get locked parts, skipping");
7373
continue;
7474
}
7575

@@ -79,20 +79,20 @@ void ExportPartitionTaskScheduler::run()
7979
{
8080
if (locked_parts_set.contains(zk_part_name))
8181
{
82-
LOG_INFO(storage.log, "ExportPartition: Part {} is locked, skipping", zk_part_name);
82+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked, skipping", zk_part_name);
8383
continue;
8484
}
8585

8686
const auto part = storage.getPartIfExists(zk_part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
8787
if (!part)
8888
{
89-
LOG_INFO(storage.log, "ExportPartition: Part {} not found locally, skipping", zk_part_name);
89+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} not found locally, skipping", zk_part_name);
9090
continue;
9191
}
9292

9393
if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
9494
{
95-
LOG_INFO(storage.log, "ExportPartition: Failed to lock part {}, skipping", zk_part_name);
95+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name);
9696
continue;
9797
}
9898

@@ -155,22 +155,22 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
155155
const String & relative_path_in_destination_storage
156156
)
157157
{
158-
LOG_INFO(storage.log, "ExportPartition: Part {} exported successfully", relative_path_in_destination_storage);
158+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully", relative_path_in_destination_storage);
159159

160160
Coordination::Stat locked_by_stat;
161161
std::string locked_by;
162162

163163
if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat))
164164
{
165-
LOG_INFO(storage.log, "ExportPartition: Part {} is not locked by any replica, will not commit or set it as completed", part_name);
165+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not commit or set it as completed", part_name);
166166
return;
167167
}
168168

169169
/// Is this a good idea? what if the file we just pushed to s3 ends up triggering an exception in the replica that actually locks the part and it does not commit?
170170
/// I guess we should not throw if file already exists for export partition, hard coded.
171171
if (locked_by != storage.replica_name)
172172
{
173-
LOG_INFO(storage.log, "ExportPartition: Part {} is locked by another replica, will not commit or set it as completed", part_name);
173+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked by another replica, will not commit or set it as completed", part_name);
174174
return;
175175
}
176176

@@ -199,26 +199,26 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
199199
if (Coordination::Error::ZOK != zk->tryMulti(requests, responses))
200200
{
201201
/// todo arthur remember what to do here
202-
LOG_INFO(storage.log, "ExportPartition: Failed to update export path, skipping");
202+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping");
203203
return;
204204
}
205205

206-
LOG_INFO(storage.log, "ExportPartition: Marked part export {} as completed", part_name);
206+
LOG_INFO(storage.log, "ExportPartition scheduler task: Marked part export {} as completed", part_name);
207207

208208
Strings parts_in_processing_or_pending;
209209
if (Coordination::Error::ZOK != zk->tryGetChildren(export_path / "processing", parts_in_processing_or_pending))
210210
{
211-
LOG_INFO(storage.log, "ExportPartition: Failed to get parts in processing or pending, will not try to commit export partition");
211+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get parts in processing or pending, will not try to commit export partition");
212212
return;
213213
}
214214

215215
if (!parts_in_processing_or_pending.empty())
216216
{
217-
LOG_INFO(storage.log, "ExportPartition: There are still parts in processing or pending, will not try to commit export partition");
217+
LOG_INFO(storage.log, "ExportPartition scheduler task: There are still parts in processing or pending, will not try to commit export partition");
218218
return;
219219
}
220220

221-
LOG_INFO(storage.log, "ExportPartition: All parts are processed, will try to commit export partition");
221+
LOG_INFO(storage.log, "ExportPartition scheduler task: All parts are processed, will try to commit export partition");
222222

223223
ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, storage.getContext());
224224
}
@@ -239,13 +239,13 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
239239

240240
if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat))
241241
{
242-
LOG_INFO(storage.log, "ExportPartition: Part {} is not locked by any replica, will not increment error counts", part_name);
242+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not increment error counts", part_name);
243243
return;
244244
}
245245

246246
if (locked_by != storage.replica_name)
247247
{
248-
LOG_INFO(storage.log, "ExportPartition: Part {} is locked by another replica, will not increment error counts", part_name);
248+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked by another replica, will not increment error counts", part_name);
249249
return;
250250
}
251251

@@ -267,7 +267,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
267267
ops.emplace_back(zkutil::makeCreateRequest(processing_part_path / "finished_by", storage.replica_name, zkutil::CreateMode::Persistent));
268268
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", "FAILED", -1));
269269

270-
LOG_INFO(storage.log, "ExportPartition: Retry count limit exceeded for part {}, will try to fail the entire task", part_name);
270+
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name);
271271
}
272272

273273
std::size_t num_exceptions = 0;
@@ -300,7 +300,7 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
300300
Coordination::Responses responses;
301301
if (Coordination::Error::ZOK != zk->tryMulti(ops, responses))
302302
{
303-
LOG_INFO(storage.log, "ExportPartition: All failure mechanism failed, will not try to update it");
303+
LOG_INFO(storage.log, "ExportPartition scheduler task: All failure mechanism failed, will not try to update it");
304304
return;
305305
}
306306
}

src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
namespace DB
55
{
66

7-
// Test fixture that uses the shared container definition
87
class ExportPartitionOrderingTest : public ::testing::Test
98
{
109
protected:
@@ -21,7 +20,6 @@ class ExportPartitionOrderingTest : public ::testing::Test
2120

2221
TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime)
2322
{
24-
// Create entries with different create_times (in reverse order)
2523
time_t base_time = 1000;
2624

2725
ExportReplicatedMergeTreePartitionManifest manifest1;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ namespace Setting
194194
extern const SettingsBool allow_experimental_export_merge_tree_part;
195195
extern const SettingsBool export_merge_tree_partition_force_export;
196196
extern const SettingsUInt64 export_merge_tree_partition_max_retries;
197+
extern const SettingsUInt64 export_merge_tree_partition_manifest_ttl;
197198
}
198199

199200
namespace MergeTreeSetting
@@ -8128,6 +8129,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
81288129
manifest.parts = part_names;
81298130
manifest.create_time = time(nullptr);
81308131
manifest.max_retries = query_context->getSettingsRef()[Setting::export_merge_tree_partition_max_retries];
8132+
manifest.ttl_seconds = query_context->getSettingsRef()[Setting::export_merge_tree_partition_manifest_ttl];
81318133

81328134
ops.emplace_back(zkutil::makeCreateRequest(
81338135
fs::path(partition_exports_path) / "metadata.json",

0 commit comments

Comments
 (0)