@@ -83,14 +83,19 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
8383 }
8484
8585 void Handle (NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
86- if (ProgressCallback_) {
87- ProgressCallback_ (QueryId_, ev->Get ()->Record );
86+ try {
87+ if (ProgressCallback_) {
88+ ProgressCallback_ (QueryId_, ev->Get ()->Record );
89+ }
90+ } catch (...) {
91+ Cerr << CerrColors_.Red () << " Got unexpected exception during progress callback: " << CurrentExceptionMessage () << CerrColors_.Default () << Endl;
8892 }
8993 }
9094
9195private:
9296 const ui32 TargetNode_ = 0 ;
9397 const size_t QueryId_ = 0 ;
98+ const NColorizer::TColors CerrColors_ = NColorizer::AutoColors(Cerr);
9499
95100 std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
96101 NThreading::TPromise<TQueryResponse> Promise_;
@@ -229,48 +234,61 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
229234};
230235
231236class TResourcesWaiterActor : public NActors ::TActorBootstrapped<TResourcesWaiterActor> {
237+ using IRetryPolicy = IRetryPolicy<bool >;
238+ using EVerbose = TYdbSetupSettings::EVerbose;
239+ using EHealthCheck = TYdbSetupSettings::EHealthCheck;
240+
232241 static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10 );
233242
234243public:
235244 TResourcesWaiterActor (NThreading::TPromise<void > promise, const TWaitResourcesSettings& settings)
236245 : Settings_(settings)
246+ , RetryPolicy_(IRetryPolicy::GetExponentialBackoffPolicy(
247+ &TResourcesWaiterActor::Retryable, REFRESH_PERIOD,
248+ TDuration::MilliSeconds (100 ), TDuration::Seconds(1 ),
249+ std::numeric_limits<size_t>::max(), std::max(2 * REFRESH_PERIOD, Settings_.HealthCheckTimeout)
250+ ))
237251 , Promise_(promise)
238252 {}
239253
240254 void Bootstrap () {
241- if (Settings_.HealthCheckLevel < 1 ) {
255+ Become (&TResourcesWaiterActor::StateFunc);
256+
257+ HealthCheckStage_ = EHealthCheck::NodesCount;
258+ DoHealthCheck ();
259+ }
260+
261+ void DoHealthCheck () {
262+ if (Settings_.HealthCheckLevel < HealthCheckStage_) {
242263 Finish ();
243264 return ;
244265 }
245266
246- Become (&TResourcesWaiterActor::StateWaitNodeCont);
247- CheckResourcesPublish ();
248- }
267+ switch (HealthCheckStage_) {
268+ case TYdbSetupSettings::EHealthCheck::NodesCount:
269+ CheckResourcesPublish ();
270+ break ;
249271
250- void HandleWaitNodeCountWakeup () {
251- CheckResourcesPublish ();
252- }
272+ case TYdbSetupSettings::EHealthCheck::ScriptRequest:
273+ StartScriptQuery ();
274+ break ;
253275
254- void Handle (TEvPrivate::TEvResourcesInfo::TPtr& ev) {
255- const auto nodeCont = ev->Get ()->NodeCount ;
256- if (nodeCont == Settings_.ExpectedNodeCount ) {
257- if (Settings_.HealthCheckLevel < 2 ) {
276+ case TYdbSetupSettings::EHealthCheck::None:
277+ case TYdbSetupSettings::EHealthCheck::Max:
258278 Finish ();
259- } else {
260- Become (&TResourcesWaiterActor::StateWaitScript);
261- StartScriptQuery ();
262- }
263- return ;
279+ break ;
264280 }
281+ }
265282
266- if (Settings_.VerboseLevel >= 2 ) {
267- Cout << CoutColors_.Cyan () << " Retry invalid node count, got " << nodeCont << " , expected " << Settings_.ExpectedNodeCount << CoutColors_.Default () << Endl;
283+ void Handle (TEvPrivate::TEvResourcesInfo::TPtr& ev) {
284+ const auto nodeCount = ev->Get ()->NodeCount ;
285+ if (nodeCount == Settings_.ExpectedNodeCount ) {
286+ HealthCheckStage_ = EHealthCheck::ScriptRequest;
287+ DoHealthCheck ();
288+ return ;
268289 }
269- Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
270- }
271290
272- void HandleWaitScriptWakeup () {
273- StartScriptQuery ();
291+ Retry (TStringBuilder () << " invalid node count, got " << nodeCount << " , expected " << Settings_.ExpectedNodeCount , true );
274292 }
275293
276294 void Handle (NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
@@ -280,45 +298,26 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
280298 return ;
281299 }
282300
283- if (Settings_.VerboseLevel >= 2 ) {
284- Cout << CoutColors_.Cyan () << " Retry script creation fail with status " << status << " , reason:\n " << CoutColors_.Default () << ev->Get ()->Issues .ToString () << Endl;
285- }
286- Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
301+ Retry (TStringBuilder () << " script creation fail with status " << status << " , reason:\n " << CoutColors_.Default () << ev->Get ()->Issues .ToString (), true );
287302 }
288303
289- STRICT_STFUNC (StateWaitNodeCont ,
290- sFunc (NActors::TEvents::TEvWakeup, HandleWaitNodeCountWakeup );
304+ STRICT_STFUNC (StateFunc ,
305+ sFunc (NActors::TEvents::TEvWakeup, DoHealthCheck );
291306 hFunc (TEvPrivate::TEvResourcesInfo, Handle);
292- )
293-
294- STRICT_STFUNC(StateWaitScript,
295- sFunc (NActors::TEvents::TEvWakeup, HandleWaitScriptWakeup);
296307 hFunc (NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
297308 )
298309
299310private:
300311 void CheckResourcesPublish() {
301- GetResourceManager ();
302-
303312 if (!ResourceManager_) {
304- if (Settings_.VerboseLevel >= 2 ) {
305- Cout << CoutColors_.Cyan () << " Retry uninitialized resource manager" << CoutColors_.Default () << Endl;
306- }
307- Schedule (REFRESH_PERIOD, new NActors::TEvents::TEvWakeup ());
308- return ;
313+ ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager (SelfId ().NodeId ());
309314 }
310315
311- UpdateResourcesInfo ();
312- }
313-
314- void GetResourceManager () {
315- if (ResourceManager_) {
316+ if (!ResourceManager_) {
317+ Retry (" uninitialized resource manager" , true );
316318 return ;
317319 }
318- ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager (SelfId ().NodeId ());
319- }
320320
321- void UpdateResourcesInfo () const {
322321 ResourceManager_->RequestClusterResourcesInfo (
323322 [selfId = SelfId (), actorContext = ActorContext ()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
324323 actorContext.Send (selfId, new TEvPrivate::TEvResourcesInfo (resources.size ()));
@@ -338,20 +337,49 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
338337 Send (NKikimr::NKqp::MakeKqpProxyID (SelfId ().NodeId ()), event.Release ());
339338 }
340339
340+ void Retry (const TString& message, bool shortRetry) {
341+ if (RetryState_ == nullptr ) {
342+ RetryState_ = RetryPolicy_->CreateRetryState ();
343+ }
344+
345+ if (auto delay = RetryState_->GetNextRetryDelay (shortRetry)) {
346+ if (Settings_.VerboseLevel >= EVerbose::InitLogs) {
347+ Cout << CoutColors_.Cyan () << " Retry in " << *delay << " " << message << CoutColors_.Default () << Endl;
348+ }
349+ Schedule (*delay, new NActors::TEvents::TEvWakeup ());
350+ } else {
351+ Fail (TStringBuilder () << " Health check timeout " << Settings_.HealthCheckTimeout << " exceeded, use --health-check-timeout for increasing it or check out health check logs by using --verbose " << static_cast <ui32>(EVerbose::InitLogs));
352+ }
353+ }
354+
341355 void Finish () {
342356 Promise_.SetValue ();
343357 PassAway ();
344358 }
345359
360+ void Fail (const TString& error) {
361+ Promise_.SetException (error);
362+ PassAway ();
363+ }
364+
365+ static ERetryErrorClass Retryable (bool shortRetry) {
366+ return shortRetry ? ERetryErrorClass::ShortRetry : ERetryErrorClass::LongRetry;
367+ }
368+
346369private:
347370 const TWaitResourcesSettings Settings_;
348371 const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
372+ const IRetryPolicy::TPtr RetryPolicy_;
373+ IRetryPolicy::IRetryState::TPtr RetryState_ = nullptr ;
349374 NThreading::TPromise<void > Promise_;
350375
376+ EHealthCheck HealthCheckStage_ = EHealthCheck::None;
351377 std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
352378};
353379
354380class TSessionHolderActor : public NActors ::TActorBootstrapped<TSessionHolderActor> {
381+ using EVerbose = TYdbSetupSettings::EVerbose;
382+
355383public:
356384 TSessionHolderActor (TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void > closePromise)
357385 : TargetNode_(request.TargetNode)
@@ -375,7 +403,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
375403 }
376404
377405 SessionId_ = response.GetResponse ().GetSessionId ();
378- if (VerboseLevel_ >= 1 ) {
406+ if (VerboseLevel_ >= EVerbose::Info ) {
379407 Cout << CoutColors_.Cyan () << " Created new session on node " << TargetNode_ << " with id " << SessionId_ << " \n " ;
380408 }
381409
@@ -453,7 +481,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
453481private:
454482 const ui32 TargetNode_;
455483 const TString TraceId_;
456- const ui8 VerboseLevel_;
484+ const EVerbose VerboseLevel_;
457485 const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
458486
459487 std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Request_;
0 commit comments