99
1010namespace DB
1111{
12-
13- struct CleanupLockRAII
14- {
15- CleanupLockRAII (const zkutil::ZooKeeperPtr & zk_, const std::string & cleanup_lock_path_, const std::string & replica_name_, const LoggerPtr & log_)
16- : cleanup_lock_path(cleanup_lock_path_), zk(zk_), replica_name(replica_name_), log(log_)
17- {
18- is_locked = zk->tryCreate (cleanup_lock_path, replica_name, ::zkutil::CreateMode::Ephemeral) == Coordination::Error::ZOK;
19-
20- if (is_locked)
21- {
22- LOG_INFO (log, " ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries" );
23- }
24- }
25-
26- ~CleanupLockRAII ()
27- {
28- if (is_locked)
29- {
30- LOG_INFO (log, " ExportPartition Manifest Updating Task: Releasing cleanup lock" );
31- zk->tryRemove (cleanup_lock_path);
32- }
33- }
34-
35- bool is_locked;
36- std::string cleanup_lock_path;
37- zkutil::ZooKeeperPtr zk;
38- std::string replica_name;
39- LoggerPtr log;
40- };
41-
4212namespace
4313{
4414 /*
@@ -120,7 +90,11 @@ void ExportPartitionManifestUpdatingTask::poll()
12090 const std::string exports_path = fs::path (storage.zookeeper_path ) / " exports" ;
12191 const std::string cleanup_lock_path = fs::path (storage.zookeeper_path ) / " exports_cleanup_lock" ;
12292
123- CleanupLockRAII cleanup_lock (zk, cleanup_lock_path, storage.replica_name , storage.log .load ());
93+ auto cleanup_lock = zkutil::EphemeralNodeHolder::tryCreate (cleanup_lock_path, *zk, storage.replica_name );
94+ if (cleanup_lock)
95+ {
96+ LOG_INFO (storage.log , " ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries" );
97+ }
12498
12599 Coordination::Stat stat;
126100 const auto children = zk->getChildrenWatch (exports_path, &stat, storage.export_merge_tree_partition_watch_callback );
@@ -154,7 +128,7 @@ void ExportPartitionManifestUpdatingTask::poll()
154128 && local_entry->manifest .transaction_id == metadata.transaction_id ;
155129
156130 // / If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done.
157- if (!cleanup_lock. is_locked && has_local_entry_and_is_up_to_date)
131+ if (!cleanup_lock && has_local_entry_and_is_up_to_date)
158132 continue ;
159133
160134 auto status_watch_callback = std::make_shared<Coordination::WatchCallback>([this , key](const Coordination::WatchResponse &) {
@@ -173,7 +147,7 @@ void ExportPartitionManifestUpdatingTask::poll()
173147
174148 // / if we have the cleanup lock, try to cleanup
175149 // / if we successfully cleaned it up, early exit
176- if (cleanup_lock. is_locked )
150+ if (cleanup_lock)
177151 {
178152 bool cleanup_successful = tryCleanup (
179153 zk,
0 commit comments