@@ -1396,6 +1396,32 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13961396 }
13971397}
13981398
1399+ void DDLWorker::cleanupStaleReplicas (Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
1400+ {
1401+ auto replicas = zookeeper->getChildren (replicas_dir);
1402+ static constexpr Int64 REPLICA_MAX_INACTIVE_SECONDS = 86400 ;
1403+ for (const auto & replica : replicas)
1404+ {
1405+ auto replica_path = fs::path (replicas_dir) / replica;
1406+ auto responses = zookeeper->tryGet ({replica_path, fs::path (replica_path) / " active" });
1407+ // / Replica not active
1408+ if (responses[1 ].error == Coordination::Error::ZNONODE)
1409+ {
1410+ auto stat = responses[0 ].stat ;
1411+ // / Replica was not active for too long, let's cleanup to avoid polluting Keeper with
1412+ // / removed replicas
1413+ if (stat.mtime / 1000 + REPLICA_MAX_INACTIVE_SECONDS < current_time_seconds)
1414+ {
1415+ LOG_INFO (log, " Replica {} is stale, removing it" , replica);
1416+ auto code = zookeeper->tryRemove (replica_path, -1 );
1417+ if (code != Coordination::Error::ZOK)
1418+ LOG_WARNING (log, " Cannot remove stale replica {}, code {}" , replica, Coordination::errorMessage (code));
1419+ }
1420+ }
1421+ }
1422+
1423+ }
1424+
13991425void DDLWorker::runCleanupThread ()
14001426{
14011427 setThreadName (" DDLWorkerClnr" );
@@ -1423,6 +1449,7 @@ void DDLWorker::runCleanupThread()
14231449 continue ;
14241450
14251451 cleanupQueue (current_time_seconds, zookeeper);
1452+ cleanupStaleReplicas (current_time_seconds, zookeeper);
14261453 last_cleanup_time_seconds = current_time_seconds;
14271454 }
14281455 catch (...)
0 commit comments