diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index c30530481678..1f91791abbfa 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -181,6 +181,24 @@ void TKqpComputeActor::CheckRunStatus() { TBase::CheckRunStatus(); } +ui64 TKqpComputeActor::GetSourcesState() { + return ScanData ? CalculateFreeSpace() : 0; +} + +void TKqpComputeActor::PollSources(ui64 prevFreeSpace) { + if (!ScanData || ScanData->IsFinished()) { + return; + } + + const auto freeSpace = CalculateFreeSpace(); + if (freeSpace <= prevFreeSpace && ScanData->GetStoredBytes()) { + return; + } + + CA_LOG_D("Poll sources, free space: " << freeSpace); + Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace)); +} + void TKqpComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (last && SysViewActorId && ScanData && dst->TasksSize() > 0) { YQL_ENSURE(dst->TasksSize() == 1); @@ -278,11 +296,7 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { } } - ui64 freeSpace = GetMemoryLimits().ChannelBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ChannelBufferSize - ScanData->GetStoredBytes() - : 0; - - if (freeSpace > 0) { + if (const auto freeSpace = CalculateFreeSpace(); freeSpace > 0) { CA_LOG_D("Send scan data ack, freeSpace: " << freeSpace); Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace)); @@ -302,6 +316,13 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) { ReportStateAndMaybeDie(YdbStatusToDqStatus(status, EStatusCompatibilityLevel::WithUnauthorized), issues); } +ui64 TKqpComputeActor::CalculateFreeSpace() const { + YQL_ENSURE(ScanData); + const auto storedBytes = ScanData->GetStoredBytes(); + const auto channelBufferSize = GetMemoryLimits().ChannelBufferSize; + return channelBufferSize > storedBytes ? channelBufferSize - storedBytes : 0; +} + IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index e937f92011d4..e1b9171cfda4 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -39,6 +39,10 @@ class TKqpComputeActor : public NScheduler::TSchedulableComputeActorBase Meta; diff --git a/ydb/core/sys_view/auth/auth_scan_base.h b/ydb/core/sys_view/auth/auth_scan_base.h index 1b0f386d05eb..73494a0566cc 100644 --- a/ydb/core/sys_view/auth/auth_scan_base.h +++ b/ydb/core/sys_view/auth/auth_scan_base.h @@ -116,6 +116,10 @@ class TAuthScanBase : public TScanActorBase { } void ContinueScan() { + if (IsNavigatePathInProgress) { + return; + } + while (DeepFirstSearchStack) { auto& last = DeepFirstSearchStack.back(); @@ -155,6 +159,7 @@ class TAuthScanBase : public TScanActorBase { } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + IsNavigatePathInProgress = false; THolder request(ev->Get()->Request.Release()); Y_ABORT_UNLESS(request->ResultSet.size() == 1); @@ -197,6 +202,8 @@ class TAuthScanBase : public TScanActorBase { } void NavigatePath(TPath path) { + IsNavigatePathInProgress = true; + auto request = MakeHolder(); auto& entry = request->ResultSet.emplace_back(); @@ -260,6 +267,7 @@ class TAuthScanBase : public TScanActorBase { bool RequireUserAdministratorAccess; std::optional PathFrom, PathTo; TVector DeepFirstSearchStack; + bool IsNavigatePathInProgress = false; }; } diff --git a/ydb/core/sys_view/auth/users.cpp b/ydb/core/sys_view/auth/users.cpp index d3fc381e715a..02caa38cdc77 100644 --- a/ydb/core/sys_view/auth/users.cpp +++ b/ydb/core/sys_view/auth/users.cpp @@ -15,9 +15,9 @@ namespace NKikimr::NSysView::NAuth { using namespace NSchemeShard; using namespace NActors; -class TUsersScan : public TScanActorBase { +class TUsersScan : public TScanActorWithoutBackPressure { public: - using TBase = TScanActorBase; + using TBase = TScanActorWithoutBackPressure; TUsersScan(const NActors::TActorId& ownerId, ui32 scanId, const NKikimrSysView::TSysViewDescription& sysViewInfo, @@ -31,7 +31,7 @@ class TUsersScan : public TScanActorBase { STFUNC(StateScan) { switch (ev->GetTypeRewrite()) { HFunc(TEvSchemeShard::TEvListUsersResult, Handle); - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(NKqp::TEvKqp::TEvAbortExecution, TBase::HandleAbortExecution); cFunc(TEvents::TEvWakeup::EventType, TBase::HandleTimeout); @@ -43,24 +43,12 @@ class TUsersScan : public TScanActorBase { } protected: - void ProceedToScan() override { - TBase::Become(&TUsersScan::StateScan); - + void StartScan() final { //NOTE: here is the earliest point when Base::DatabaseOwner is already set bool isClusterAdmin = IsAdministrator(AppData(), UserToken.Get()); bool isDatabaseAdmin = (AppData()->FeatureFlags.GetEnableDatabaseAdmin() && IsDatabaseAdministrator(UserToken.Get(), TBase::DatabaseOwner)); IsAdmin = isClusterAdmin || isDatabaseAdmin; - if (TBase::AckReceived) { - StartScan(); - } - } - - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - StartScan(); - } - - void StartScan() { auto request = MakeHolder(); LOG_TRACE_S(TlsActivationContext->AsActorContext(), NKikimrServices::SYSTEM_VIEWS, diff --git a/ydb/core/sys_view/common/scan_actor_base_impl.h b/ydb/core/sys_view/common/scan_actor_base_impl.h index e57199c63dc4..cfda79c6f222 100644 --- a/ydb/core/sys_view/common/scan_actor_base_impl.h +++ b/ydb/core/sys_view/common/scan_actor_base_impl.h @@ -328,6 +328,7 @@ class TScanActorBase : public TActorBootstrapped { switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScanAck); hFunc(TEvSysView::TEvGetScanLimiterResult, HandleLimiter); + hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); cFunc(TEvents::TEvWakeup::EventType, HandleTimeout); cFunc(TEvents::TEvPoison::EventType, this->PassAway); default: @@ -394,6 +395,40 @@ class TScanActorBase : public TActorBootstrapped { } FailState = OK; }; +template +class TScanActorWithoutBackPressure : public TScanActorBase { + using TBase = TScanActorBase; + +public: + using TBase::TBase; + +protected: + // Should scan all data inside call + virtual void StartScan() = 0; + + void HandleAck() { + TBase::AckReceived = true; + DoScan(); + } + + void ProceedToScan() final { + TBase::Become(&TDerived::StateScan); + if (TBase::AckReceived) { + DoScan(); + } + } + +private: + void DoScan() { + if (!ScanStarted) { + ScanStarted = true; + StartScan(); + } + } + +private: + bool ScanStarted = false; +}; } // NSysView } // NKikimr diff --git a/ydb/core/sys_view/compile_cache/compile_cache.cpp b/ydb/core/sys_view/compile_cache/compile_cache.cpp index 08d6549202a5..c0b3713bd10e 100644 --- a/ydb/core/sys_view/compile_cache/compile_cache.cpp +++ b/ydb/core/sys_view/compile_cache/compile_cache.cpp @@ -144,6 +144,12 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBaseFeatureFlags.GetEnableCompileCacheView()) { + PendingNodesInitialized = true; + PendingNodes.emplace_back(SelfId().NodeId()); + } + if (AckReceived) { StartScan(); } @@ -154,13 +160,9 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBaseFeatureFlags.GetEnableCompileCacheView()) { - PendingNodesInitialized = true; - PendingNodes.emplace_back(SelfId().NodeId()); - } - if (!PendingNodesInitialized) { + if (!PendingNodesInitialized && !PendingRequest) { + PendingRequest = true; Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKikimr::NKqp::TEvKqp::TEvListProxyNodesRequest()); return; } @@ -196,6 +198,7 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBaseFeatureFlags.GetEnableCompileCacheView()) { auto& proxies = ev->Get()->ProxyNodes; std::sort(proxies.begin(), proxies.end()); diff --git a/ydb/core/sys_view/nodes/nodes.cpp b/ydb/core/sys_view/nodes/nodes.cpp index 1c9599a7bf56..7d3c0561cfda 100644 --- a/ydb/core/sys_view/nodes/nodes.cpp +++ b/ydb/core/sys_view/nodes/nodes.cpp @@ -17,9 +17,9 @@ namespace NSysView { using namespace NActors; using namespace NNodeWhiteboard; -class TNodesScan : public TScanActorBase { +class TNodesScan : public TScanActorWithoutBackPressure { public: - using TBase = TScanActorBase; + using TBase = TScanActorWithoutBackPressure; static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; @@ -57,7 +57,7 @@ class TNodesScan : public TScanActorBase { STFUNC(StateScan) { switch (ev->GetTypeRewrite()) { - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); hFunc(TEvInterconnect::TEvNodesInfo, Handle); hFunc(TEvWhiteboard::TEvSystemStateResponse, Handle); hFunc(TEvents::TEvUndelivered, Undelivered); @@ -73,14 +73,7 @@ class TNodesScan : public TScanActorBase { } private: - void ProceedToScan() override { - Become(&TNodesScan::StateScan); - if (AckReceived) { - StartScan(); - } - } - - void StartScan() { + void StartScan() final { if (IsEmptyRange || TenantNodes.empty()) { ReplyEmptyAndDie(); return; @@ -98,10 +91,6 @@ class TNodesScan : public TScanActorBase { } } - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - StartScan(); - } - void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) { THolder nodesInfo = ev->Release(); diff --git a/ydb/core/sys_view/query_stats/query_stats.cpp b/ydb/core/sys_view/query_stats/query_stats.cpp index e0871a3d0fd4..d49040702cb8 100644 --- a/ydb/core/sys_view/query_stats/query_stats.cpp +++ b/ydb/core/sys_view/query_stats/query_stats.cpp @@ -196,6 +196,12 @@ class TQueryStatsScan : public TScanActorBase> { return; } + if (OldScanStarted) { + return; + } + + OldScanStarted = true; + NodesToRequest.reserve(this->TenantNodes.size()); for (const auto& nodeId : this->TenantNodes) { Nodes[nodeId] = TRetryState{false, 0, StartRetryInterval}; @@ -504,6 +510,7 @@ class TQueryStatsScan : public TScanActorBase> { THolder> History; + bool OldScanStarted = false; bool UseProcessor = false; NKikimrSysView::TEvGetQueryMetricsRequest Request; }; diff --git a/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp index b61849d81bdb..53b63930e90e 100644 --- a/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp +++ b/ydb/core/sys_view/resource_pool_classifiers/resource_pool_classifiers.cpp @@ -24,9 +24,9 @@ namespace NSysView { using namespace NActors; using namespace NNodeWhiteboard; -class TResourcePoolClassifiersScan : public TScanActorBase { +class TResourcePoolClassifiersScan : public TScanActorWithoutBackPressure { public: - using TBase = TScanActorBase; + using TBase = TScanActorWithoutBackPressure; static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; @@ -45,7 +45,7 @@ class TResourcePoolClassifiersScan : public TScanActorBaseGetTypeRewrite()) { - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle) hFunc(NKqp::NWorkload::TEvFetchDatabaseResponse, Handle); hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); @@ -63,24 +63,13 @@ class TResourcePoolClassifiersScan : public TScanActorBaseGet(); if (event.Status != Ydb::StatusIds::SUCCESS) { diff --git a/ydb/core/sys_view/resource_pools/resource_pools.cpp b/ydb/core/sys_view/resource_pools/resource_pools.cpp index 760e5dbc118a..426ba55b58f7 100644 --- a/ydb/core/sys_view/resource_pools/resource_pools.cpp +++ b/ydb/core/sys_view/resource_pools/resource_pools.cpp @@ -23,9 +23,9 @@ namespace NSysView { using namespace NActors; -class TResourcePoolsScan : public TScanActorBase { +class TResourcePoolsScan : public TScanActorWithoutBackPressure { public: - using TBase = TScanActorBase; + using TBase = TScanActorWithoutBackPressure; enum class EState { LIST_RESOURCE_POOLS, @@ -49,7 +49,7 @@ class TResourcePoolsScan : public TScanActorBase { STFUNC(StateScan) { switch (ev->GetTypeRewrite()) { - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution); cFunc(TEvents::TEvWakeup::EventType, HandleTimeout); @@ -61,14 +61,7 @@ class TResourcePoolsScan : public TScanActorBase { } private: - void ProceedToScan() override { - Become(&TResourcePoolsScan::StateScan); - if (AckReceived) { - StartScan(); - } - } - - void StartScan() { + void StartScan() final { SendRequestToSchemeCache({{".metadata/workload_manager", "pools"}}, NSchemeCache::TSchemeCacheNavigate::OpList); } @@ -238,10 +231,6 @@ class TResourcePoolsScan : public TScanActorBase { SendBatch(std::move(batch)); } - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - StartScan(); - } - private: EState State = EState::LIST_RESOURCE_POOLS; const TIntrusiveConstPtr UserToken; diff --git a/ydb/core/sys_view/show_create/show_create.cpp b/ydb/core/sys_view/show_create/show_create.cpp index d1a70f25a306..64899b9186b6 100644 --- a/ydb/core/sys_view/show_create/show_create.cpp +++ b/ydb/core/sys_view/show_create/show_create.cpp @@ -50,9 +50,9 @@ bool RewriteTemporaryTablePath(const TString& database, TString& tablePath, TStr return true; } -class TShowCreate : public TScanActorBase { +class TShowCreate : public TScanActorWithoutBackPressure { public: - using TBase = TScanActorBase; + using TBase = TScanActorWithoutBackPressure; static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN; @@ -68,10 +68,10 @@ class TShowCreate : public TScanActorBase { { } - STFUNC(StateWork) { + STFUNC(StateScan) { switch (ev->GetTypeRewrite()) { hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); default: LOG_CRIT(*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS, "NSysView::TScanActorBase: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); @@ -80,11 +80,12 @@ class TShowCreate : public TScanActorBase { STFUNC(StateCollectTableSettings) { switch (ev->GetTypeRewrite()) { - hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleCollectTableSettings); - hFunc(NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult, Handle); - default: - LOG_CRIT(*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS, - "NSysView::TScanActorBase: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); + hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleCollectTableSettings); + hFunc(NSequenceProxy::TEvSequenceProxy::TEvGetSequenceResult, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck); + default: + LOG_CRIT(*TlsActivationContext, NKikimrServices::SYSTEM_VIEWS, + "NSysView::TScanActorBase: unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); } } @@ -110,7 +111,7 @@ class TShowCreate : public TScanActorBase { return; } - void StartScan() { + void StartScan() final { if (!AppData()->FeatureFlags.GetEnableShowCreate()) { ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Sys view 'show_create' is not supported"); @@ -165,17 +166,6 @@ class TShowCreate : public TScanActorBase { Send(MakeTxProxyID(), navigateRequest.release()); } - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - StartScan(); - } - - void ProceedToScan() override { - Become(&TShowCreate::StateWork); - if (AckReceived) { - StartScan(); - } - } - bool NeedToCollectTableSettings(const NKikimrSchemeOp::TTableDescription& tableDesc) { return !tableDesc.GetCdcStreams().empty() || !tableDesc.GetSequences().empty(); } diff --git a/ydb/core/sys_view/storage/base.h b/ydb/core/sys_view/storage/base.h index e36940c8301b..13168baab7fd 100644 --- a/ydb/core/sys_view/storage/base.h +++ b/ydb/core/sys_view/storage/base.h @@ -15,14 +15,14 @@ namespace NKikimr::NSysView { template - class TStorageScanBase : public TScanActorBase> { - using TBase = TScanActorBase>; + class TStorageScanBase : public TScanActorWithoutBackPressure> { + using TBase = TScanActorWithoutBackPressure>; public: - using TScanActorBase::TScanActorBase; + using TBase::TBase; STRICT_STFUNC(StateScan, - hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle); + sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, TBase::HandleAck); hFunc(TEvResponse, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(NKqp::TEvKqp::TEvAbortExecution, TBase::HandleAbortExecution); @@ -34,23 +34,12 @@ namespace NKikimr::NSysView { using TFieldMap = std::unordered_map>; private: - void ProceedToScan() override { - TBase::Become(&TStorageScanBase::StateScan); - if (TBase::AckReceived) { - StartScan(); - } - } - - void StartScan() { + void StartScan() final { auto pipeCache = MakePipePerNodeCacheID(false); TBase::Send(pipeCache, new TEvPipeCache::TEvForward(static_cast(*this).CreateQuery(), MakeBSControllerID(), true), IEventHandle::FlagTrackDelivery); } - void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) { - StartScan(); - } - void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) { TBase::ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Delivery problem in NSysView::" << static_cast(*this).GetName()); diff --git a/ydb/core/sys_view/tablets/tablets.cpp b/ydb/core/sys_view/tablets/tablets.cpp index a337d58b31cd..6ab5fcfe80a7 100644 --- a/ydb/core/sys_view/tablets/tablets.cpp +++ b/ydb/core/sys_view/tablets/tablets.cpp @@ -165,6 +165,12 @@ class TTabletsScan : public TScanActorBase { } void RequestTabletIds() { + if (TabletIdsRequested) { + return; + } + + TabletIdsRequested = true; + auto request = MakeHolder(); if (!CalculateRangeFrom() || !CalculateRangeTo()) { @@ -350,6 +356,7 @@ class TTabletsScan : public TScanActorBase { TVector TabletIds; TVector::const_iterator FromIterator; + bool TabletIdsRequested = false; bool BatchRequested = false; };