@@ -1139,7 +1139,6 @@ bool DDLWorker::initializeMainThread()
11391139 auto zookeeper = getAndSetZooKeeper ();
11401140 zookeeper->createAncestors (fs::path (queue_dir) / " " );
11411141 initializeReplication ();
1142- markReplicasActive (true );
11431142 initialized = true ;
11441143 return true ;
11451144 }
@@ -1212,6 +1211,14 @@ void DDLWorker::runMainThread()
12121211 }
12131212
12141213 cleanup_event->set ();
1214+ try
1215+ {
1216+ markReplicasActive (reinitialized);
1217+ }
1218+ catch (...)
1219+ {
1220+ tryLogCurrentException (log, " An error occurred when markReplicasActive: " );
1221+ }
12151222 scheduleTasks (reinitialized);
12161223 subsequent_errors_count = 0 ;
12171224
@@ -1290,20 +1297,23 @@ void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet
12901297 zookeeper->createAncestors (fs::path (replicas_dir) / host_id / " " );
12911298}
12921299
1293- void DDLWorker::markReplicasActive (bool /* reinitialized*/ )
1300+ void DDLWorker::markReplicasActive (bool reinitialized)
12941301{
12951302 auto zookeeper = getZooKeeper ();
12961303
1297- // Reset all active_node_holders
1298- for (auto & it : active_node_holders)
1304+ if (reinitialized)
12991305 {
1300- auto & active_node_holder = it.second .second ;
1301- if (active_node_holder)
1302- active_node_holder->setAlreadyRemoved ();
1303- active_node_holder.reset ();
1304- }
1306+ // Reset all active_node_holders
1307+ for (auto & it : active_node_holders)
1308+ {
1309+ auto & active_node_holder = it.second .second ;
1310+ if (active_node_holder)
1311+ active_node_holder->setAlreadyRemoved ();
1312+ active_node_holder.reset ();
1313+ }
13051314
1306- active_node_holders.clear ();
1315+ active_node_holders.clear ();
1316+ }
13071317
13081318 for (auto it = active_node_holders.begin (); it != active_node_holders.end ();)
13091319 {
@@ -1384,12 +1394,7 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13841394 {
13851395 zookeeper->deleteEphemeralNodeIfContentMatches (active_path, active_id);
13861396 }
1387- Coordination::Requests ops;
1388- ops.emplace_back (zkutil::makeCreateRequest (active_path, active_id, zkutil::CreateMode::Ephemeral));
1389- // / To bump node mtime
1390- ops.emplace_back (zkutil::makeSetRequest (fs::path (replicas_dir) / host_id, " " , -1 ));
1391- zookeeper->multi (ops);
1392-
1397+ zookeeper->create (active_path, active_id, zkutil::CreateMode::Ephemeral);
13931398 auto active_node_holder_zookeeper = zookeeper;
13941399 auto active_node_holder = zkutil::EphemeralNodeHolder::existing (active_path, *active_node_holder_zookeeper);
13951400 active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
0 commit comments