@@ -586,6 +586,36 @@ void TNodeBroker::PrepareEpochCache()
586586 TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
587587}
588588
589+ void TNodeBroker::PrepareUpdateNodesLog ()
590+ {
591+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
592+ " Preparing update nodes log for epoch #" << Committed.Epoch .ToString ()
593+ << " nodes=" << Committed.Nodes .size ()
594+ << " expired=" << Committed.ExpiredNodes .size ()
595+ << " removed=" << Committed.RemovedNodes .size ());
596+
597+ UpdateNodesLog.clear ();
598+ UpdateNodesLogVersions.clear ();
599+
600+ TVector<TVersionedNodeID> nodeIdsSortedByVersion;
601+ for (auto &entry : Committed.Nodes ) {
602+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
603+ }
604+ for (auto &entry : Committed.ExpiredNodes ) {
605+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
606+ }
607+ for (auto &entry : Committed.RemovedNodes ) {
608+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
609+ }
610+ std::sort (nodeIdsSortedByVersion.begin (), nodeIdsSortedByVersion.end (), TVersionedNodeID::TCmpByVersion ());
611+
612+ for (const auto &id : nodeIdsSortedByVersion) {
613+ const auto & node = *Committed.FindNode (id.NodeId );
614+ AddNodeToUpdateNodesLog (node);
615+ }
616+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
617+ }
618+
589619void TNodeBroker::AddNodeToEpochCache (const TNodeInfo &node)
590620{
591621 LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
@@ -615,13 +645,132 @@ void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version)
615645 TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
616646}
617647
648+ void TNodeBroker::AddNodeToUpdateNodesLog (const TNodeInfo &node)
649+ {
650+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
651+ " Add node " << node.IdShortString () << " to update nodes log" );
652+
653+ NKikimrNodeBroker::TUpdateNodes updateNodes;
654+
655+ switch (node.State ) {
656+ case ENodeState::Active:
657+ FillNodeInfo (node, *updateNodes.AddUpdates ()->MutableNode ());
658+ break ;
659+ case ENodeState::Expired:
660+ updateNodes.AddUpdates ()->SetExpiredNode (node.NodeId );
661+ break ;
662+ case ENodeState::Removed:
663+ updateNodes.AddUpdates ()->SetRemovedNode (node.NodeId );
664+ break ;
665+ }
666+
667+ TString delta;
668+ Y_PROTOBUF_SUPPRESS_NODISCARD updateNodes.SerializeToString (&delta);
669+
670+ Y_ENSURE (UpdateNodesLogVersions.empty () || UpdateNodesLogVersions.back ().Version <= node.Version );
671+ if (!UpdateNodesLogVersions.empty () && UpdateNodesLogVersions.back ().Version == node.Version ) {
672+ UpdateNodesLog += delta;
673+ UpdateNodesLogVersions.back ().CacheEndOffset = UpdateNodesLog.size ();
674+ } else {
675+ UpdateNodesLog += delta;
676+ UpdateNodesLogVersions.emplace_back (node.Version , UpdateNodesLog.size ());
677+ }
678+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
679+ }
680+
618681void TNodeBroker::SubscribeForConfigUpdates (const TActorContext &ctx)
619682{
620683 ui32 nodeBrokerItem = (ui32)NKikimrConsole::TConfigItem::NodeBrokerConfigItem;
621684 ui32 featureFlagsItem = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
622685 NConsole::SubscribeViaConfigDispatcher (ctx, {nodeBrokerItem, featureFlagsItem}, ctx.SelfID );
623686}
624687
688+ void TNodeBroker::SendToSubscriber (const TSubscriberInfo &subscriber, IEventBase* event, const TActorContext &ctx) const
689+ {
690+ SendToSubscriber (subscriber, event, 0 , ctx);
691+ }
692+
693+ void TNodeBroker::SendToSubscriber (const TSubscriberInfo &subscriber, IEventBase* event, ui64 cookie, const TActorContext &ctx) const
694+ {
695+ THolder<IEventHandle> ev = MakeHolder<IEventHandle>(subscriber.Id , ctx.SelfID , event, 0 , cookie);
696+ if (subscriber.PipeServerInfo ->IcSession ) {
697+ ev->Rewrite (TEvInterconnect::EvForward, subscriber.PipeServerInfo ->IcSession );
698+ }
699+ ctx.Send (ev.Release ());
700+ }
701+
702+ void TNodeBroker::SendUpdateNodes (const TActorContext &ctx)
703+ {
704+ if (SentVersion >= Committed.Epoch .Version ) {
705+ return ;
706+ }
707+
708+ for (const auto & [_, subscriber] : Subscribers) {
709+ SendUpdateNodes (subscriber, SentVersion, ctx);
710+ }
711+ SentVersion = Committed.Epoch .Version ;
712+ }
713+
714+ void TNodeBroker::SendUpdateNodes (const TSubscriberInfo &subscriber, ui64 version, const TActorContext &ctx)
715+ {
716+ NKikimrNodeBroker::TUpdateNodes record;
717+ record.SetSeqNo (subscriber.SeqNo );
718+ Committed.Epoch .Serialize (*record.MutableEpoch ());
719+ auto response = MakeHolder<TEvNodeBroker::TEvUpdateNodes>(record);
720+
721+ auto it = std::lower_bound (UpdateNodesLogVersions.begin (), UpdateNodesLogVersions.end (), version + 1 );
722+ if (it != UpdateNodesLogVersions.begin ()) {
723+ response->PreSerializedData = UpdateNodesLog.substr (std::prev (it)->CacheEndOffset );
724+ } else {
725+ response->PreSerializedData = UpdateNodesLog;
726+ }
727+
728+ TabletCounters->Percentile ()[COUNTER_UPDATE_NODES_BYTES].IncrementFor (response->GetCachedByteSize ());
729+ LOG_TRACE_S (ctx, NKikimrServices::NODE_BROKER,
730+ " Send TEvUpdateNodes v" << version << " -> v" << Committed.Epoch .Version
731+ << " to " << subscriber.Id );
732+ SendToSubscriber (subscriber, response.Release (), ctx);
733+ }
734+
735+ TNodeBroker::TSubscriberInfo& TNodeBroker::AddSubscriber (TActorId subscriberId,
736+ TActorId pipeServerId,
737+ ui64 seqNo,
738+ const TActorContext &ctx)
739+ {
740+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
741+ " New subscriber " << subscriberId
742+ << " , seqNo: " << seqNo
743+ << " , server pipe id: " << pipeServerId);
744+
745+ auto & pipeServer = PipeServers.at (pipeServerId);
746+ auto res = Subscribers.emplace (subscriberId, TSubscriberInfo (subscriberId, seqNo, &pipeServer));
747+ Y_ENSURE (res.second , " Subscription already exists for " << subscriberId);
748+ pipeServer.Subscribers .insert (subscriberId);
749+ return res.first ->second ;
750+ }
751+
752+ void TNodeBroker::RemoveSubscriber (TActorId subscriber, const TActorContext &ctx)
753+ {
754+ auto it = Subscribers.find (subscriber);
755+ Y_ENSURE (it != Subscribers.end (), " No subscription for " << subscriber);
756+
757+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
758+ " Unsubscribed " << subscriber
759+ << " , seqNo: " << it->second .SeqNo
760+ << " , server pipe id: " << it->second .PipeServerInfo ->Id );
761+
762+ it->second .PipeServerInfo ->Subscribers .erase (subscriber);
763+ Subscribers.erase (it);
764+ }
765+
766+ bool TNodeBroker::HasOutdatedSubscription (TActorId subscriber, ui64 newSeqNo) const
767+ {
768+ if (auto it = Subscribers.find (subscriber); it != Subscribers.end ()) {
769+ return it->second .SeqNo < newSeqNo;
770+ }
771+ return false ;
772+ }
773+
625774void TNodeBroker::TState::LoadConfigFromProto (const NKikimrNodeBroker::TConfig &config)
626775{
627776 Config = config;
@@ -1452,6 +1601,53 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev,
14521601 Execute (CreateTxUpdateConfig (ev), ctx);
14531602}
14541603
1604+ void TNodeBroker::Handle (TEvNodeBroker::TEvSubscribeNodesRequest::TPtr &ev,
1605+ const TActorContext &ctx)
1606+ {
1607+ TabletCounters->Cumulative ()[COUNTER_SUBSCRIBE_NODES_REQUESTS].Increment (1 );
1608+
1609+ auto seqNo = ev->Get ()->Record .GetSeqNo ();
1610+ if (HasOutdatedSubscription (ev->Sender , seqNo)) {
1611+ RemoveSubscriber (ev->Sender , ctx);
1612+ }
1613+
1614+ if (!Subscribers.contains (ev->Sender )) {
1615+ const auto & subscriber = AddSubscriber (ev->Sender , ev->Recipient , seqNo, ctx);
1616+ SendUpdateNodes (subscriber, ev->Get ()->Record .GetCachedVersion (), ctx);
1617+ }
1618+ }
1619+
1620+ void TNodeBroker::Handle (TEvNodeBroker::TEvSyncNodesRequest::TPtr &ev,
1621+ const TActorContext &ctx)
1622+ {
1623+ TabletCounters->Cumulative ()[COUNTER_SYNC_NODES_REQUESTS].Increment (1 );
1624+
1625+ if (auto it = Subscribers.find (ev->Sender ); it != Subscribers.end ()) {
1626+ if (it->second .SeqNo == ev->Get ()->Record .GetSeqNo ()) {
1627+ auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
1628+ response->Record .SetSeqNo (it->second .SeqNo );
1629+ SendToSubscriber (it->second , response.Release (), ev->Cookie , ctx);
1630+ }
1631+ }
1632+ }
1633+
1634+ void TNodeBroker::Handle (TEvTabletPipe::TEvServerConnected::TPtr &ev)
1635+ {
1636+ auto res = PipeServers.emplace (ev->Get ()->ServerId , TPipeServerInfo (ev->Get ()->ServerId , ev->Get ()->InterconnectSession ));
1637+ Y_ENSURE (res.second , " Unexpected TEvServerConnected for " << ev->Get ()->ServerId );
1638+ }
1639+
1640+ void TNodeBroker::Handle (TEvTabletPipe::TEvServerDisconnected::TPtr &ev,
1641+ const TActorContext &ctx)
1642+ {
1643+ auto it = PipeServers.find (ev->Get ()->ServerId );
1644+ Y_ENSURE (it != PipeServers.end (), " Unexpected TEvServerDisconnected for " << ev->Get ()->ServerId );
1645+ while (!it->second .Subscribers .empty ()) {
1646+ RemoveSubscriber (*it->second .Subscribers .begin (), ctx);
1647+ }
1648+ PipeServers.erase (it);
1649+ }
1650+
14551651void TNodeBroker::Handle (TEvPrivate::TEvUpdateEpoch::TPtr &ev,
14561652 const TActorContext &ctx)
14571653{
0 commit comments