@@ -639,7 +639,7 @@ namespace NKikimr {
639639 TBridgeInfo::TPtr BridgeInfo;
640640
641641 std::deque<std::unique_ptr<IEventHandle>> PendingQ;
642- std::map<ui32, std::deque< std::unique_ptr<IEventHandle>>> PendingByGeneration ;
642+ std::deque< std::tuple<TMonotonic, std::unique_ptr<IEventHandle>>> PendingForNextGeneration ;
643643
644644 public:
645645 TBridgedBlobStorageProxyActor (TIntrusivePtr<TBlobStorageGroupInfo> info)
@@ -650,6 +650,32 @@ namespace NKikimr {
650650 void Bootstrap () {
651651 Send (MakeBlobStorageNodeWardenID (SelfId ().NodeId ()), new TEvNodeWardenQueryStorageConfig (/* subscribe=*/ true ));
652652 Become (&TThis::StateWaitBridgeInfo);
653+ HandleWakeup ();
654+ }
655+
656+ void HandleWakeup () {
657+ TMonotonic dropBefore = TActivationContext::Monotonic () - TDuration::Seconds (2 );
658+ while (!PendingForNextGeneration.empty ()) {
659+ if (auto & [timestamp, ev] = PendingForNextGeneration.front (); timestamp < dropBefore) {
660+ switch (ev->GetTypeRewrite ()) {
661+ #define MAKE_ERROR (TYPE ) \
662+ case TYPE::EventType: \
663+ Send (ev->Sender , static_cast <TYPE*>(ev->GetBase ())->MakeErrorResponse (NKikimrProto::ERROR, \
664+ " bridge request timed out" , GroupId), 0 , ev->Cookie ); \
665+ break ;
666+
667+ DSPROXY_ENUM_EVENTS (MAKE_ERROR)
668+ #undef MAKE_ERROR
669+ default :
670+ Y_ABORT ();
671+ }
672+ PendingForNextGeneration.pop_front ();
673+ } else {
674+ break ;
675+ }
676+ }
677+ TActivationContext::Schedule (TDuration::Seconds (1 ), new IEventHandle (TEvents::TSystem::Wakeup, 0 , SelfId (),
678+ {}, nullptr , 0 ));
653679 }
654680
655681 void PassAway () override {
@@ -815,7 +841,7 @@ namespace NKikimr {
815841 const ui32 myGeneration = bridgeGroupState.GetPile (pile.BridgePileId .GetPileIndex ()).GetGroupGeneration ();
816842
817843 if (myGeneration < msg->RacingGeneration ) {
818- PendingByGeneration[Info-> GroupGeneration + 1 ]. push_back ( std::move (handle));
844+ PendingForNextGeneration. emplace_back ( TActivationContext::Monotonic (), std::move (handle));
819845 } else if (msg->RacingGeneration < myGeneration) {
820846 // our generation is higher than the recipient's; we have to route this message through node warden
821847 // to ensure proxy's configuration gets in place
@@ -879,17 +905,13 @@ namespace NKikimr {
879905 }
880906
881907 void Handle (TEvBlobStorage::TEvConfigureProxy::TPtr ev) {
882- Info = std::move (ev->Get ()->Info );
883- while (!PendingByGeneration.empty ()) {
884- auto it = PendingByGeneration.begin ();
885- auto & [requiredGeneration, events] = *it;
886- if (Info->GroupGeneration < requiredGeneration) {
887- break ;
888- }
889- for (auto & ev : events) {
908+ auto prevInfo = std::exchange (Info, std::move (ev->Get ()->Info ));
909+ Y_ABORT_UNLESS (prevInfo);
910+ Y_ABORT_UNLESS (Info);
911+ if (prevInfo->GroupGeneration < Info->GroupGeneration ) {
912+ for (auto & [timestamp, ev] : std::exchange (PendingForNextGeneration, {})) {
890913 TActivationContext::Send (ev.release ());
891914 }
892- PendingByGeneration.erase (it);
893915 }
894916 }
895917
@@ -923,6 +945,7 @@ namespace NKikimr {
923945 hFunc(TEvBlobStorage::TEvConfigureProxy, Handle)
924946 hFunc(TEvNodeWardenStorageConfig, Handle)
925947 cFunc(TEvents::TSystem::Poison, PassAway)
948+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup)
926949 )
927950
928951#undef HANDLE_RESULT
0 commit comments