@@ -134,6 +134,32 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
134134 return res;
135135}
136136
137+ bool TPartition::ProcessHasDataRequest (const THasDataReq& request, const TActorContext& ctx) {
138+ auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
139+ auto response = MakeHasDataInfoResponse (lagSize, request.Cookie , readingFinished);
140+ ctx.Send (request.Sender , response.Release ());
141+ };
142+
143+ if (!IsActive ()) {
144+ if (request.Offset < EndOffset && (!request.ReadTimestamp || *request.ReadTimestamp <= EndWriteTimestamp)) {
145+ sendResponse (GetSizeLag (request.Offset ), false );
146+ } else {
147+ sendResponse (0 , true );
148+
149+ auto now = ctx.Now ();
150+ auto & userInfo = UsersInfoStorage->GetOrCreate (request.ClientId , ctx);
151+ userInfo.UpdateReadOffset ((i64 )EndOffset - 1 , now, now, now, true );
152+ }
153+ } else if (request.Offset < EndOffset) {
154+ sendResponse (GetSizeLag (request.Offset ), false );
155+ } else {
156+ return false ;
157+ }
158+
159+ return true ;
160+ }
161+
162+
137163void TPartition::ProcessHasDataRequests (const TActorContext& ctx) {
138164 if (!InitDone) {
139165 return ;
@@ -148,13 +174,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
148174 };
149175
150176 for (auto request = HasDataRequests.begin (); request != HasDataRequests.end ();) {
151- if (request->Offset < EndOffset && (IsActive () || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
152- auto response = MakeHasDataInfoResponse (GetSizeLag (request->Offset ), request->Cookie );
153- ctx.Send (request->Sender , response.Release ());
154- } else if (!IsActive ()) {
155- auto response = MakeHasDataInfoResponse (0 , request->Cookie , true );
156- ctx.Send (request->Sender , response.Release ());
157- } else {
177+ if (!ProcessHasDataRequest (*request, ctx)) {
158178 break ;
159179 }
160180
@@ -183,32 +203,22 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
183203 auto & record = ev->Get ()->Record ;
184204 Y_ABORT_UNLESS (record.HasSender ());
185205
206+ auto now = ctx.Now ();
207+
186208 auto cookie = record.HasCookie () ? TMaybe<ui64>(record.GetCookie ()) : TMaybe<ui64>();
187209 auto readTimestamp = GetReadFrom (record.GetMaxTimeLagMs (), record.GetReadTimestampMs (), TInstant::Zero (), ctx);
188-
189210 TActorId sender = ActorIdFromProto (record.GetSender ());
190- if (InitDone && EndOffset > (ui64)record.GetOffset () && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { // already has data, answer right now
191- auto response = MakeHasDataInfoResponse (GetSizeLag (record.GetOffset ()), cookie);
192- ctx.Send (sender, response.Release ());
193- } else if (InitDone && !IsActive ()) {
194- auto now = ctx.Now ();
195211
196- auto & userInfo = UsersInfoStorage-> GetOrCreate ( record.GetClientId (), ctx);
197- userInfo. UpdateReadOffset (( i64 )EndOffset - 1 , now, now, now, true ) ;
212+ THasDataReq req{++HasDataReqNum, (ui64) record.GetOffset (), sender, cookie,
213+ record. HasClientId () && InitDone ? record. GetClientId () : " " , readTimestamp} ;
198214
199- auto response = MakeHasDataInfoResponse (0 , cookie, true );
200- ctx.Send (sender, response.Release ());
201- } else {
202- THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset (), sender, cookie,
203- record.HasClientId () && InitDone ? record.GetClientId () : " " , readTimestamp};
215+ if (!InitDone || !ProcessHasDataRequest (req, ctx)) {
204216 THasDataDeadline dl{TInstant::MilliSeconds (record.GetDeadline ()), req};
205- auto res = HasDataRequests.insert (req);
217+ auto res = HasDataRequests.insert (std::move ( req) );
206218 HasDataDeadlines.insert (dl);
207219 Y_ABORT_UNLESS (res.second );
208220
209221 if (InitDone && record.HasClientId () && !record.GetClientId ().empty ()) {
210- auto now = ctx.Now ();
211-
212222 auto & userInfo = UsersInfoStorage->GetOrCreate (record.GetClientId (), ctx);
213223 ++userInfo.Subscriptions ;
214224 userInfo.UpdateReadOffset ((i64 )EndOffset - 1 , now, now, now);
0 commit comments