Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,24 @@ void TKqpComputeActor::CheckRunStatus() {
TBase::CheckRunStatus();
}

ui64 TKqpComputeActor::GetSourcesState() {
return ScanData ? CalculateFreeSpace() : 0;
}

void TKqpComputeActor::PollSources(ui64 prevFreeSpace) {
if (!ScanData || ScanData->IsFinished()) {
return;
}

const auto freeSpace = CalculateFreeSpace();
if (freeSpace <= prevFreeSpace && ScanData->GetStoredBytes()) {
return;
}

CA_LOG_D("Pool sources, free space: " << freeSpace);
Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace));
}

void TKqpComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (last && SysViewActorId && ScanData && dst->TasksSize() > 0) {
YQL_ENSURE(dst->TasksSize() == 1);
Expand Down Expand Up @@ -278,11 +296,7 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
}
}

ui64 freeSpace = GetMemoryLimits().ChannelBufferSize > ScanData->GetStoredBytes()
? GetMemoryLimits().ChannelBufferSize - ScanData->GetStoredBytes()
: 0;

if (freeSpace > 0) {
if (const auto freeSpace = CalculateFreeSpace(); freeSpace > 0) {
CA_LOG_D("Send scan data ack, freeSpace: " << freeSpace);

Send(SysViewActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace));
Expand All @@ -302,6 +316,13 @@ void TKqpComputeActor::HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) {
ReportStateAndMaybeDie(YdbStatusToDqStatus(status, EStatusCompatibilityLevel::WithUnauthorized), issues);
}

ui64 TKqpComputeActor::CalculateFreeSpace() const {
YQL_ENSURE(ScanData);
const auto storedBytes = ScanData->GetStoredBytes();
const auto channelBufferSize = GetMemoryLimits().ChannelBufferSize;
return channelBufferSize > storedBytes ? channelBufferSize - storedBytes : 0;
}

IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class TKqpComputeActor : public NScheduler::TSchedulableComputeActorBase<TKqpCom

STFUNC(StateFunc);

ui64 GetSourcesState();

void PollSources(ui64 prevFreeSpace);

protected:
ui64 CalcMkqlMemoryLimit() override;

Expand All @@ -59,6 +63,8 @@ class TKqpComputeActor : public NScheduler::TSchedulableComputeActorBase<TKqpCom

bool IsDebugLogEnabled(const TActorSystem* actorSystem);

ui64 CalculateFreeSpace() const;

private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
TMaybe<NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta> Meta;
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/sys_view/auth/auth_scan_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class TAuthScanBase : public TScanActorBase<TDerived> {
}

void ContinueScan() {
if (IsNavigatePathInProgress) {
return;
}

while (DeepFirstSearchStack) {
auto& last = DeepFirstSearchStack.back();

Expand Down Expand Up @@ -155,6 +159,7 @@ class TAuthScanBase : public TScanActorBase<TDerived> {
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
IsNavigatePathInProgress = false;
THolder<NSchemeCache::TSchemeCacheNavigate> request(ev->Get()->Request.Release());

Y_ABORT_UNLESS(request->ResultSet.size() == 1);
Expand Down Expand Up @@ -197,6 +202,8 @@ class TAuthScanBase : public TScanActorBase<TDerived> {
}

void NavigatePath(TPath path) {
IsNavigatePathInProgress = true;

auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();

auto& entry = request->ResultSet.emplace_back();
Expand Down Expand Up @@ -260,6 +267,7 @@ class TAuthScanBase : public TScanActorBase<TDerived> {
bool RequireUserAdministratorAccess;
std::optional<TString> PathFrom, PathTo;
TVector<TTraversingChildren> DeepFirstSearchStack;
bool IsNavigatePathInProgress = false;
};

}
20 changes: 4 additions & 16 deletions ydb/core/sys_view/auth/users.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace NKikimr::NSysView::NAuth {
using namespace NSchemeShard;
using namespace NActors;

class TUsersScan : public TScanActorBase<TUsersScan> {
class TUsersScan : public TScanActorWithoutBackPressure<TUsersScan> {
public:
using TBase = TScanActorBase<TUsersScan>;
using TBase = TScanActorWithoutBackPressure<TUsersScan>;

TUsersScan(const NActors::TActorId& ownerId, ui32 scanId,
const NKikimrSysView::TSysViewDescription& sysViewInfo,
Expand All @@ -31,7 +31,7 @@ class TUsersScan : public TScanActorBase<TUsersScan> {
STFUNC(StateScan) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvSchemeShard::TEvListUsersResult, Handle);
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(NKqp::TEvKqp::TEvAbortExecution, TBase::HandleAbortExecution);
cFunc(TEvents::TEvWakeup::EventType, TBase::HandleTimeout);
Expand All @@ -43,24 +43,12 @@ class TUsersScan : public TScanActorBase<TUsersScan> {
}

protected:
void ProceedToScan() override {
TBase::Become(&TUsersScan::StateScan);

void StartScan() final {
//NOTE: here is the earliest point when Base::DatabaseOwner is already set
bool isClusterAdmin = IsAdministrator(AppData(), UserToken.Get());
bool isDatabaseAdmin = (AppData()->FeatureFlags.GetEnableDatabaseAdmin() && IsDatabaseAdministrator(UserToken.Get(), TBase::DatabaseOwner));
IsAdmin = isClusterAdmin || isDatabaseAdmin;

if (TBase::AckReceived) {
StartScan();
}
}

void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) {
StartScan();
}

void StartScan() {
auto request = MakeHolder<TEvSchemeShard::TEvListUsers>();

LOG_TRACE_S(TlsActivationContext->AsActorContext(), NKikimrServices::SYSTEM_VIEWS,
Expand Down
35 changes: 35 additions & 0 deletions ydb/core/sys_view/common/scan_actor_base_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class TScanActorBase : public TActorBootstrapped<TDerived> {
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScanAck);
hFunc(TEvSysView::TEvGetScanLimiterResult, HandleLimiter);
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution);
cFunc(TEvents::TEvWakeup::EventType, HandleTimeout);
cFunc(TEvents::TEvPoison::EventType, this->PassAway);
default:
Expand Down Expand Up @@ -394,6 +395,40 @@ class TScanActorBase : public TActorBootstrapped<TDerived> {
} FailState = OK;
};

template <typename TDerived>
class TScanActorWithoutBackPressure : public TScanActorBase<TDerived> {
using TBase = TScanActorBase<TDerived>;

public:
using TBase::TBase;

protected:
// Should scan all data inside call
virtual void StartScan() = 0;

void HandleAck() {
TBase::AckReceived = true;
DoScan();
}

void ProceedToScan() final {
TBase::Become(&TDerived::StateScan);
if (TBase::AckReceived) {
DoScan();
}
}

private:
void DoScan() {
if (!ScanStarted) {
ScanStarted = true;
StartScan();
}
}

private:
bool ScanStarted = false;
};

} // NSysView
} // NKikimr
15 changes: 9 additions & 6 deletions ydb/core/sys_view/compile_cache/compile_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBase<TCompi
return;
}

// if feature flag is not set -- return only for self node
if (!AppData()->FeatureFlags.GetEnableCompileCacheView()) {
PendingNodesInitialized = true;
PendingNodes.emplace_back(SelfId().NodeId());
}

if (AckReceived) {
StartScan();
}
Expand All @@ -154,13 +160,9 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBase<TCompi
ReplyEmptyAndDie();
return;
}
// if feature flag is not set -- return only for self node
if (!AppData()->FeatureFlags.GetEnableCompileCacheView()) {
PendingNodesInitialized = true;
PendingNodes.emplace_back(SelfId().NodeId());
}

if (!PendingNodesInitialized) {
if (!PendingNodesInitialized && !PendingRequest) {
PendingRequest = true;
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKikimr::NKqp::TEvKqp::TEvListProxyNodesRequest());
return;
}
Expand Down Expand Up @@ -196,6 +198,7 @@ class TCompileCacheQueriesScan : public NKikimr::NSysView::TScanActorBase<TCompi
}

void Handle(NKqp::TEvKqp::TEvListProxyNodesResponse::TPtr& ev) {
PendingRequest = false;
if (AppData()->FeatureFlags.GetEnableCompileCacheView()) {
auto& proxies = ev->Get()->ProxyNodes;
std::sort(proxies.begin(), proxies.end());
Expand Down
19 changes: 4 additions & 15 deletions ydb/core/sys_view/nodes/nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace NSysView {
using namespace NActors;
using namespace NNodeWhiteboard;

class TNodesScan : public TScanActorBase<TNodesScan> {
class TNodesScan : public TScanActorWithoutBackPressure<TNodesScan> {
public:
using TBase = TScanActorBase<TNodesScan>;
using TBase = TScanActorWithoutBackPressure<TNodesScan>;

static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN;
Expand Down Expand Up @@ -57,7 +57,7 @@ class TNodesScan : public TScanActorBase<TNodesScan> {

STFUNC(StateScan) {
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck);
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
hFunc(TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvents::TEvUndelivered, Undelivered);
Expand All @@ -73,14 +73,7 @@ class TNodesScan : public TScanActorBase<TNodesScan> {
}

private:
void ProceedToScan() override {
Become(&TNodesScan::StateScan);
if (AckReceived) {
StartScan();
}
}

void StartScan() {
void StartScan() final {
if (IsEmptyRange || TenantNodes.empty()) {
ReplyEmptyAndDie();
return;
Expand All @@ -98,10 +91,6 @@ class TNodesScan : public TScanActorBase<TNodesScan> {
}
}

void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) {
StartScan();
}

void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
THolder<TEvInterconnect::TEvNodesInfo> nodesInfo = ev->Release();

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/sys_view/query_stats/query_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ class TQueryStatsScan : public TScanActorBase<TQueryStatsScan<TGreater>> {
return;
}

if (OldScanStarted) {
return;
}

OldScanStarted = true;

NodesToRequest.reserve(this->TenantNodes.size());
for (const auto& nodeId : this->TenantNodes) {
Nodes[nodeId] = TRetryState{false, 0, StartRetryInterval};
Expand Down Expand Up @@ -504,6 +510,7 @@ class TQueryStatsScan : public TScanActorBase<TQueryStatsScan<TGreater>> {

THolder<TScanQueryHistory<TGreater>> History;

bool OldScanStarted = false;
bool UseProcessor = false;
NKikimrSysView::TEvGetQueryMetricsRequest Request;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ namespace NSysView {
using namespace NActors;
using namespace NNodeWhiteboard;

class TResourcePoolClassifiersScan : public TScanActorBase<TResourcePoolClassifiersScan> {
class TResourcePoolClassifiersScan : public TScanActorWithoutBackPressure<TResourcePoolClassifiersScan> {
public:
using TBase = TScanActorBase<TResourcePoolClassifiersScan>;
using TBase = TScanActorWithoutBackPressure<TResourcePoolClassifiersScan>;

static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_SYSTEM_VIEW_SCAN;
Expand All @@ -45,7 +45,7 @@ class TResourcePoolClassifiersScan : public TScanActorBase<TResourcePoolClassifi
STFUNC(StateScan) {
try {
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, Handle);
sFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleAck);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle)
hFunc(NKqp::NWorkload::TEvFetchDatabaseResponse, Handle);
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleAbortExecution);
Expand All @@ -63,24 +63,13 @@ class TResourcePoolClassifiersScan : public TScanActorBase<TResourcePoolClassifi
}

private:
void ProceedToScan() override {
Become(&TResourcePoolClassifiersScan::StateScan);
if (AckReceived) {
StartScan();
}
}

void StartScan() {
void StartScan() final {
if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
ReplyEmptyAndDie();
}
Register(NKqp::NWorkload::CreateDatabaseFetcherActor(SelfId(), Database, UserToken, NACLib::EAccessRights::GenericFull));
}

void Handle(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr&) {
StartScan();
}

void Handle(NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
auto& event = *ev->Get();
if (event.Status != Ydb::StatusIds::SUCCESS) {
Expand Down
Loading
Loading