@@ -1140,7 +1140,6 @@ bool DDLWorker::initializeMainThread()
11401140 auto zookeeper = getAndSetZooKeeper ();
11411141 zookeeper->createAncestors (fs::path (queue_dir) / " " );
11421142 initializeReplication ();
1143- markReplicasActive (true );
11441143 initialized = true ;
11451144 return true ;
11461145 }
@@ -1213,6 +1212,14 @@ void DDLWorker::runMainThread()
12131212 }
12141213
12151214 cleanup_event->set ();
1215+ try
1216+ {
1217+ markReplicasActive (reinitialized);
1218+ }
1219+ catch (...)
1220+ {
1221+ tryLogCurrentException (log, " An error occurred when markReplicasActive: " );
1222+ }
12161223 scheduleTasks (reinitialized);
12171224 subsequent_errors_count = 0 ;
12181225
@@ -1291,20 +1298,23 @@ void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet
12911298 zookeeper->createAncestors (fs::path (replicas_dir) / host_id / " " );
12921299}
12931300
1294- void DDLWorker::markReplicasActive (bool /* reinitialized*/ )
1301+ void DDLWorker::markReplicasActive (bool reinitialized)
12951302{
12961303 auto zookeeper = getZooKeeper ();
12971304
1298- // Reset all active_node_holders
1299- for (auto & it : active_node_holders)
1305+ if (reinitialized)
13001306 {
1301- auto & active_node_holder = it.second .second ;
1302- if (active_node_holder)
1303- active_node_holder->setAlreadyRemoved ();
1304- active_node_holder.reset ();
1305- }
1307+ // Reset all active_node_holders
1308+ for (auto & it : active_node_holders)
1309+ {
1310+ auto & active_node_holder = it.second .second ;
1311+ if (active_node_holder)
1312+ active_node_holder->setAlreadyRemoved ();
1313+ active_node_holder.reset ();
1314+ }
13061315
1307- active_node_holders.clear ();
1316+ active_node_holders.clear ();
1317+ }
13081318
13091319 for (auto it = active_node_holders.begin (); it != active_node_holders.end ();)
13101320 {
@@ -1385,12 +1395,7 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13851395 {
13861396 zookeeper->deleteEphemeralNodeIfContentMatches (active_path, active_id);
13871397 }
1388- Coordination::Requests ops;
1389- ops.emplace_back (zkutil::makeCreateRequest (active_path, active_id, zkutil::CreateMode::Ephemeral));
1390- // / To bump node mtime
1391- ops.emplace_back (zkutil::makeSetRequest (fs::path (replicas_dir) / host_id, " " , -1 ));
1392- zookeeper->multi (ops);
1393-
1398+ zookeeper->create (active_path, active_id, zkutil::CreateMode::Ephemeral);
13941399 auto active_node_holder_zookeeper = zookeeper;
13951400 auto active_node_holder = zkutil::EphemeralNodeHolder::existing (active_path, *active_node_holder_zookeeper);
13961401 active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
0 commit comments