From 946dafdc2efd2283102b4b52b5facecec28b8d73 Mon Sep 17 00:00:00 2001 From: Andrey Zaspa Date: Fri, 3 Oct 2025 11:00:23 +0000 Subject: [PATCH 1/2] Filled DatabaseName in SchemeCache requests --- ydb/core/discovery/discovery.cpp | 1 + ydb/core/graph/service/service_impl.cpp | 2 + ydb/core/grpc_services/rpc_kh_describe.cpp | 2 + ydb/core/grpc_services/rpc_object_storage.cpp | 26 ++++++------ .../grpc_services/rpc_rate_limiter_api.cpp | 1 + ydb/core/grpc_services/rpc_read_columns.cpp | 2 + ydb/core/grpc_services/rpc_read_rows.cpp | 1 + ydb/core/health_check/health_check.cpp | 2 + .../actors/kafka_sasl_auth_actor.cpp | 2 + .../executer_actor/kqp_scheme_executer.cpp | 3 +- .../kqp/executer_actor/kqp_table_resolver.cpp | 1 + ydb/core/kqp/gateway/actors/analyze_actor.cpp | 40 ++++++++++--------- ydb/core/kqp/gateway/actors/analyze_actor.h | 12 +++--- ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 2 +- ydb/core/kqp/host/kqp_runner.cpp | 7 +++- .../kqp/query_compiler/kqp_query_compiler.cpp | 11 +++-- .../kqp/query_compiler/kqp_query_compiler.h | 2 +- ydb/core/kqp/runtime/kqp_write_actor.cpp | 26 +++++++----- .../session_actor/kqp_temp_tables_manager.cpp | 1 + .../local_pgwire/local_pgwire_auth_actor.cpp | 4 +- ydb/core/mind/bscontroller/virtual_group.cpp | 5 ++- ydb/core/mind/hive/hive_domains.cpp | 2 + ydb/core/mind/node_broker.cpp | 10 +++-- ydb/core/protos/kqp.proto | 3 +- 24 files changed, 108 insertions(+), 60 deletions(-) diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index fba9d5d922bd..daef49ddbad3 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -751,6 +751,7 @@ class TDiscoverer: public TActorBootstrapped { << ": path# " << id); auto request = MakeHolder(); + request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name); auto& entry = request->ResultSet.emplace_back(); entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; diff --git a/ydb/core/graph/service/service_impl.cpp b/ydb/core/graph/service/service_impl.cpp index b07c057c3a4e..40a6ec8ba732 100644 --- a/ydb/core/graph/service/service_impl.cpp +++ b/ydb/core/graph/service/service_impl.cpp @@ -57,6 +57,8 @@ class TGraphService : public TActor { BLOG_D("ResolveDatabase " << Database); TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); + request->DatabaseName = Database; + NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; entry.SyncVersion = false; diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp index 2d9bcf69723e..475e82a2cf6e 100644 --- a/ydb/core/grpc_services/rpc_kh_describe.cpp +++ b/ydb/core/grpc_services/rpc_kh_describe.cpp @@ -106,6 +106,8 @@ class TKikhouseDescribeTableRPC : public TActorBootstrapped request(new NSchemeCache::TSchemeCacheNavigate()); + request->DatabaseName = CanonizePath(Request->GetDatabaseName().GetOrElse("")); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = std::move(path); if (entry.Path.empty()) { diff --git a/ydb/core/grpc_services/rpc_object_storage.cpp b/ydb/core/grpc_services/rpc_object_storage.cpp index 9757fd226dbb..32e4c2a14850 100644 --- a/ydb/core/grpc_services/rpc_object_storage.cpp +++ b/ydb/core/grpc_services/rpc_object_storage.cpp @@ -133,6 +133,8 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped request(new NSchemeCache::TSchemeCacheNavigate()); + request->DatabaseName = CanonizePath(GrpcRequest->GetDatabaseName().GetOrElse("")); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = NKikimr::SplitPath(table); if (entry.Path.empty()) { @@ -372,7 +374,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedRecord.SetPathColumnDelimiter(Request->Getpath_column_delimiter()); ev->Record.SetSerializedStartAfterKeySuffix(StartAfterSuffixColumns.GetBuffer()); ev->Record.SetMaxKeys(MaxKeys - ContentsRows.size() - CommonPrefixesRows.size()); - + if (!CommonPrefixesRows.empty()) { // Next shard might have the same common prefix, need to skip it ev->Record.SetLastCommonPrefix(CommonPrefixesRows.back()); @@ -564,7 +566,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedRecord.mutable_filter(); - + for (const auto& colId : Filter.ColumnIds) { filter->add_columns(colId); } @@ -624,7 +626,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedGetPartitions(); - + for (CurrentShardIdx++; CurrentShardIdx < partitions.size(); CurrentShardIdx++) { auto& partition = KeyRange->GetPartitions()[CurrentShardIdx]; @@ -693,7 +695,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedmutable_type()->mutable_optional_type()->mutable_item() = NYdb::TProtoAccessor::GetProto(type); *col->mutable_name() = colMeta.Name; } @@ -754,7 +756,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped(colMeta.PType.GetDecimalType().GetPrecision()), static_cast(colMeta.PType.GetDecimalType().GetScale())}); vb.Decimal(decimal); } else { @@ -801,10 +803,10 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped 0) { lastDirectory = CommonPrefixesRows[CommonPrefixesRows.size() - 1]; } - + if (isTruncated && (lastDirectory || lastFile)) { NKikimrTxDataShard::TObjectStorageListingContinuationToken token; - + if (lastDirectory > lastFile) { token.set_last_path(lastDirectory); token.set_is_folder(true); @@ -814,7 +816,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedset_next_continuation_token(serializedToken); } @@ -824,7 +826,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrappedRaiseIssue(NYql::ExceptionToIssue(ex)); GrpcRequest->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR); } - + Finished = true; Die(ctx); } diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index fe9ad87cfc50..08607b89a002 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -169,6 +169,7 @@ class TRateLimiterControlRequest : public TRateLimiterRequest(); + req->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))); req->ResultSet.emplace_back(); req->ResultSet.back().Path.swap(path); req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; diff --git a/ydb/core/grpc_services/rpc_read_columns.cpp b/ydb/core/grpc_services/rpc_read_columns.cpp index 2df1866b978e..47e9f31e4c4b 100644 --- a/ydb/core/grpc_services/rpc_read_columns.cpp +++ b/ydb/core/grpc_services/rpc_read_columns.cpp @@ -154,6 +154,8 @@ class TReadColumnsRPC : public TActorBootstrapped { WaitingResolveReply = true; } else { TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); + request->DatabaseName = CanonizePath(Request->GetDatabaseName().GetOrElse("")); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = std::move(path); if (entry.Path.empty()) { diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp index 8f73d6d4358d..ca2c30b02cb0 100644 --- a/ydb/core/grpc_services/rpc_read_rows.cpp +++ b/ydb/core/grpc_services/rpc_read_rows.cpp @@ -360,6 +360,7 @@ class TReadRowsRPC : public TActorBootstrapped { entry.SyncVersion = false; entry.ShowPrivatePath = false; auto request = std::make_unique(); + request->DatabaseName = CanonizePath(GetDatabase()); request->ResultSet.emplace_back(entry); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.release()), 0, 0, Span.GetTraceId()); return true; diff --git a/ydb/core/health_check/health_check.cpp b/ydb/core/health_check/health_check.cpp index ba3dc5a93975..1f16f5ac27c5 100644 --- a/ydb/core/health_check/health_check.cpp +++ b/ydb/core/health_check/health_check.cpp @@ -1067,6 +1067,7 @@ class TSelfCheckRequest : public TActorBootstrapped { [[nodiscard]] TRequestResponse MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie) { THolder request = MakeHolder(); request->Cookie = cookie; + request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name); TSchemeCacheNavigate::TEntry& entry = request->ResultSet.emplace_back(); entry.Path = NKikimr::SplitPath(path); entry.Operation = TSchemeCacheNavigate::EOp::OpPath; @@ -1080,6 +1081,7 @@ class TSelfCheckRequest : public TActorBootstrapped { [[nodiscard]] TRequestResponse MakeRequestSchemeCacheNavigate(const TPathId& pathId, ui64 cookie) { THolder request = MakeHolder(); request->Cookie = cookie; + request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name); TSchemeCacheNavigate::TEntry& entry = request->ResultSet.emplace_back(); entry.TableId.PathId = pathId; entry.RequestType = TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index 7a42b3c836e9..e1a233ed1da3 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -189,6 +189,8 @@ void TKafkaSaslAuthActor::SendDescribeRequest(const TActorContext& ctx) { void TKafkaSaslAuthActor::GetPathByPathId(const TPathId& pathId, const TActorContext& ctx) { auto schemeCacheRequest = std::make_unique(); + schemeCacheRequest->DatabaseName = DatabasePath; + NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.TableId.PathId = pathId; entry.RequestType = NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index c0352c6ebce7..fee10744086b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -200,6 +200,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { AFL_ENSURE(dirPath.size() >= 2); auto request = MakeHolder(); + request->DatabaseName = Database; TVector path; for (const auto& part : dirPath) { @@ -564,7 +565,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { auto analyzePromise = NewPromise(); TVector columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()}; - IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise); + IActor* analyzeActor = new TAnalyzeActor(Database, analyzeOperation.GetTablePath(), columns, analyzePromise); auto actorSystem = TActivationContext::ActorSystem(); RegisterWithSameMailbox(analyzeActor); diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index de7e059ea81d..1a7d629fe7f2 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -324,6 +324,7 @@ class TKqpTableResolver : public TActorBootstrapped { void ResolveKeys() { auto requestNavigate = std::make_unique(); auto request = MakeHolder(); + request->DatabaseName = TasksGraph.GetMeta().Database; request->ResultSet.reserve(TasksGraph.GetStagesInfo().size()); if (UserToken && !UserToken->GetSerializedToken().empty()) { request->UserToken = UserToken; diff --git a/ydb/core/kqp/gateway/actors/analyze_actor.cpp b/ydb/core/kqp/gateway/actors/analyze_actor.cpp index 6e5619f9e360..ffe34261cc5e 100644 --- a/ydb/core/kqp/gateway/actors/analyze_actor.cpp +++ b/ydb/core/kqp/gateway/actors/analyze_actor.cpp @@ -15,16 +15,18 @@ enum { using TNavigate = NSchemeCache::TSchemeCacheNavigate; -TAnalyzeActor::TAnalyzeActor(TString tablePath, TVector columns, NThreading::TPromise promise) - : TablePath(tablePath) - , Columns(columns) +TAnalyzeActor::TAnalyzeActor(const TString& database, const TString& tablePath, + const TVector& columns, NThreading::TPromise promise) + : Database(database) + , TablePath(tablePath) + , Columns(columns) , Promise(promise) , OperationId(UlidGen.Next(TActivationContext::Now()).ToBinary()) {} void TAnalyzeActor::Bootstrap() { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); + navigate->DatabaseName = Database; auto& entry = navigate->ResultSet.emplace_back(); entry.Path = SplitPath(TablePath); entry.Operation = TNavigate::EOp::OpTable; @@ -41,16 +43,16 @@ void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, c const auto& record = ev->Get()->Record; const TString operationId = record.GetOperationId(); - const auto status = record.GetStatus(); + const auto status = record.GetStatus(); if (status != NKikimrStat::TEvAnalyzeResponse::STATUS_SUCCESS) { - ALOG_CRIT(NKikimrServices::KQP_GATEWAY, + ALOG_CRIT(NKikimrServices::KQP_GATEWAY, "TAnalyzeActor, TEvAnalyzeResponse has status=" << status); } if (operationId != OperationId) { - ALOG_CRIT(NKikimrServices::KQP_GATEWAY, - "TAnalyzeActor, TEvAnalyzeResponse has operationId=" << operationId + ALOG_CRIT(NKikimrServices::KQP_GATEWAY, + "TAnalyzeActor, TEvAnalyzeResponse has operationId=" << operationId << " , but expected " << OperationId); } @@ -82,7 +84,7 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& Promise.SetValue( NYql::NCommon::ResultFromIssues( error, - TStringBuilder() << "Can't get statistics aggregator ID. " << entry.Status, + TStringBuilder() << "Can't get statistics aggregator ID. " << entry.Status, {} ) ); @@ -104,14 +106,14 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } return; } - + PathId = entry.TableId.PathId; auto& domainInfo = entry.DomainInfo; auto navigateDomainKey = [this] (TPathId domainKey) { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); + navigate->DatabaseName = Database; auto& entry = navigate->ResultSet.emplace_back(); entry.TableId = TTableId(domainKey.OwnerId, domainKey.LocalPathId); entry.Operation = TNavigate::EOp::OpPath; @@ -127,8 +129,8 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& SendStatisticsAggregatorAnalyze(entry, ctx); return; } - - navigateDomainKey(domainInfo->DomainKey); + + navigateDomainKey(domainInfo->DomainKey); } else { navigateDomainKey(domainInfo->ResourcesDomainKey); } @@ -153,7 +155,7 @@ void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TAc Promise.SetValue( NYql::NCommon::ResultFromError( YqlIssue( - {}, NYql::TIssuesIds::UNEXPECTED, + {}, NYql::TIssuesIds::UNEXPECTED, TStringBuilder() << "Can't establish connection with the Statistics Aggregator!" ) ) @@ -178,7 +180,7 @@ void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const T ); } -void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) { +void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const TNavigate::TEntry& entry, const TActorContext& ctx) { Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator()); StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); @@ -199,7 +201,7 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC Promise.SetValue( NYql::NCommon::ResultFromError( YqlIssue( - {}, NYql::TIssuesIds::UNEXPECTED, + {}, NYql::TIssuesIds::UNEXPECTED, TStringBuilder() << "No such column: " << columnName << " in the " << TablePath ) ) @@ -222,14 +224,14 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC void TAnalyzeActor::HandleUnexpectedEvent(ui32 typeRewrite) { ALOG_CRIT( - NKikimrServices::KQP_GATEWAY, + NKikimrServices::KQP_GATEWAY, "TAnalyzeActor, unexpected event, request type: " << typeRewrite; ); - + Promise.SetValue( NYql::NCommon::ResultFromError( YqlIssue( - {}, NYql::TIssuesIds::UNEXPECTED, + {}, NYql::TIssuesIds::UNEXPECTED, TStringBuilder() << "Unexpected event: " << typeRewrite ) ) diff --git a/ydb/core/kqp/gateway/actors/analyze_actor.h b/ydb/core/kqp/gateway/actors/analyze_actor.h index 3ba5f7e7f830..171ba5ac5779 100644 --- a/ydb/core/kqp/gateway/actors/analyze_actor.h +++ b/ydb/core/kqp/gateway/actors/analyze_actor.h @@ -19,9 +19,10 @@ struct TEvAnalyzePrivate { struct TEvAnalyzeRetry : public TEventLocal {}; }; -class TAnalyzeActor : public NActors::TActorBootstrapped { +class TAnalyzeActor : public NActors::TActorBootstrapped { public: - TAnalyzeActor(TString tablePath, TVector columns, NThreading::TPromise promise); + TAnalyzeActor(const TString& database,const TString& tablePath, + const TVector& columns, NThreading::TPromise promise); void Bootstrap(); @@ -31,7 +32,7 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { HFunc(NStat::TEvStatistics::TEvAnalyzeResponse, Handle); HFunc(TEvPipeCache::TEvDeliveryProblem, Handle); HFunc(TEvAnalyzePrivate::TEvAnalyzeRetry, Handle); - default: + default: HandleUnexpectedEvent(ev->GetTypeRewrite()); } } @@ -53,8 +54,9 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { TDuration CalcBackoffTime(); private: - TString TablePath; - TVector Columns; + const TString Database; + const TString TablePath; + const TVector Columns; NThreading::TPromise Promise; // For Statistics Aggregator std::optional StatisticsAggregatorId; diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 823a910b5533..ce4cb04def00 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1506,7 +1506,7 @@ class TKikimrIcGateway : public IKqpGateway { } auto analyzePromise = NewPromise(); - IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise); + IActor* analyzeActor = new TAnalyzeActor(Database, settings.TablePath, settings.Columns, analyzePromise); RegisterActor(analyzeActor); return analyzePromise.GetFuture(); diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 358fe6ab382e..0df48fe05127 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -56,6 +56,7 @@ class TCompilePhysicalQueryTransformer : public TSyncTransformerBase { public: TCompilePhysicalQueryTransformer( const TString& cluster, + const TString& database, TKqlTransformContext& transformCtx, TKqpOptimizeContext& optimizeCtx, TTypeAnnotationContext& typesCtx, @@ -63,6 +64,7 @@ class TCompilePhysicalQueryTransformer : public TSyncTransformerBase { const TKikimrConfiguration::TPtr& config ) : Cluster(cluster) + , Database(database) , TransformCtx(transformCtx) , OptimizeCtx(optimizeCtx) , TypesCtx(typesCtx) @@ -80,7 +82,7 @@ class TCompilePhysicalQueryTransformer : public TSyncTransformerBase { TKqpPhysicalQuery physicalQuery(input); YQL_ENSURE(TransformCtx.DataQueryBlocks); - auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx.Tables, FuncRegistry, TypesCtx, Config); + auto compiler = CreateKqpQueryCompiler(Cluster, Database, OptimizeCtx.Tables, FuncRegistry, TypesCtx, Config); auto ret = compiler->CompilePhysicalQuery(physicalQuery, *TransformCtx.DataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx); if (!ret) { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Failed to compile physical query.")); @@ -99,6 +101,7 @@ class TCompilePhysicalQueryTransformer : public TSyncTransformerBase { private: const TString Cluster; + const TString Database; TKqlTransformContext& TransformCtx; TKqpOptimizeContext& OptimizeCtx; TTypeAnnotationContext& TypesCtx; @@ -458,6 +461,7 @@ class TKqpRunner : public IKqpRunner { .Build(false); TAutoPtr compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster, + SessionCtx->GetDatabase(), *TransformCtx, *OptimizeCtx, *typesCtx, @@ -465,6 +469,7 @@ class TKqpRunner : public IKqpRunner { Config)); TAutoPtr newRBOCompilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster, + SessionCtx->GetDatabase(), *TransformCtx, *OptimizeCtx, *typesCtx, diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 76b7c3a63f43..97b367c9b569 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -591,9 +591,10 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro class TKqpQueryCompiler : public IKqpQueryCompiler { public: - TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr tablesData, + TKqpQueryCompiler(const TString& cluster, const TString& database, const TIntrusivePtr tablesData, const NMiniKQL::IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config) : Cluster(cluster) + , Database(database) , TablesData(tablesData) , FuncRegistry(funcRegistry) , Alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()) @@ -1250,6 +1251,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink(); internalSinkProto.SetType(TString(NYql::KqpTableSinkName)); NKikimrKqp::TKqpTableSinkSettings settingsProto; + settingsProto.SetDatabase(Database); const auto& tupleType = stage.Ref().GetTypeAnn()->Cast(); YQL_ENSURE(tupleType); @@ -1836,7 +1838,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { } private: - TString Cluster; + const TString Cluster; + const TString Database; const TIntrusivePtr TablesData; const IFunctionRegistry& FuncRegistry; NMiniKQL::TScopedAlloc Alloc; @@ -1850,11 +1853,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { } // namespace -TIntrusivePtr CreateKqpQueryCompiler(const TString& cluster, +TIntrusivePtr CreateKqpQueryCompiler(const TString& cluster, const TString& database, const TIntrusivePtr tablesData, const IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config) { - return MakeIntrusive(cluster, tablesData, funcRegistry, typesCtx, config); + return MakeIntrusive(cluster, database, tablesData, funcRegistry, typesCtx, config); } } // namespace NKqp diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.h b/ydb/core/kqp/query_compiler/kqp_query_compiler.h index fd122e729b7a..c9266c662388 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.h +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.h @@ -17,7 +17,7 @@ class IKqpQueryCompiler : public TThrRefBase { NYql::TExprContext& ctx) = 0; }; -TIntrusivePtr CreateKqpQueryCompiler(const TString& cluster, +TIntrusivePtr CreateKqpQueryCompiler(const TString& cluster, const TString& database, const TIntrusivePtr tablesData, const NMiniKQL::IFunctionRegistry& funcRegistry, NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 26d8c17beee5..de42fe80ac5d 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -219,7 +219,7 @@ struct TKqpTableWriterStatistics { tableStats->SetWriteBytes(tableStats->GetWriteBytes() + WriteBytes); tableStats->SetEraseRows(tableStats->GetEraseRows() + EraseRows); tableStats->SetEraseBytes(tableStats->GetEraseBytes() + EraseBytes); - + ReadRows = 0; ReadBytes = 0; WriteRows = 0; @@ -272,6 +272,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { public: TKqpTableWriteActor( IKqpTableWriterCallbacks* callbacks, + const TString& database, const TTableId& tableId, const TStringBuf tablePath, const ui64 lockTxId, @@ -289,6 +290,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { , Alloc(alloc) , MvccSnapshot(mvccSnapshot) , LockMode(lockMode) + , Database(database) , TableId(tableId) , TablePath(tablePath) , LockTxId(lockTxId) @@ -526,6 +528,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { CA_LOG_D("Resolve TableId=" << TableId); TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); + request->DatabaseName = Database; NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.TableId = TableId; entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; @@ -718,7 +721,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); TxManager->SetError(ev->Get()->Record.GetOrigin()); - + if (InconsistentTx) { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); @@ -778,7 +781,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { getIssues()); } return; - } + } case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: { CA_LOG_W("Got OVERLOADED for table `" << TablePath << "`." @@ -1396,6 +1399,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { const std::optional MvccSnapshot; const NKikimrDataEvents::ELockMode LockMode; + const TString Database; const TTableId TableId; const TString TablePath; @@ -1748,6 +1752,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu WriteTableActor = new TKqpTableWriteActor( this, + Settings.GetDatabase(), TableId, Settings.GetTable().GetPath(), Settings.GetLockTxId(), @@ -2044,6 +2049,7 @@ struct TTransactionSettings { }; struct TWriteSettings { + TString Database; TTableId TableId; TString TablePath; // for error messages NKikimrDataEvents::TEvWrite::TOperation::EOperationType OperationType; @@ -2243,6 +2249,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } TKqpTableWriteActor* ptr = new TKqpTableWriteActor( this, + settings.Database, tableId, tablePath, LockTxId, @@ -2275,7 +2282,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub {}); return false; } - AFL_ENSURE(writeActor->GetTableId() == tableId); + AFL_ENSURE(writeActor->GetTableId() == tableId); return true; }; @@ -2398,7 +2405,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub message.From = ev->Sender; message.Close = ev->Get()->Close; message.Data = ev->Get()->Data; - + Process(); } @@ -2438,7 +2445,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub if (isEmpty) { OnFlushed(); } - } + } return true; } @@ -2739,7 +2746,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub for (auto& [tabletId, t] : topicTxs) { auto& transaction = t.tx; - + if (!isImmediateCommit) { FillTopicsCommit(transaction, TxManager); } @@ -2916,7 +2923,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub void HandlePrepare(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { auto& event = ev->Get()->Record; const ui64 tabletId = event.GetOrigin(); - + CA_LOG_D("Got ProposeTransactionResult" << ", PQ tablet: " << tabletId << ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); @@ -2933,7 +2940,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub void HandleCommit(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev) { auto& event = ev->Get()->Record; const ui64 tabletId = event.GetOrigin(); - + CA_LOG_D("Got ProposeTransactionResult" << ", PQ tablet: " << tabletId << ", status: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event.GetStatus())); @@ -3844,6 +3851,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped, Settings.GetLookupColumns().end()); ev->Settings = TWriteSettings{ + .Database = Settings.GetDatabase(), .TableId = TableId, .TablePath = Settings.GetTable().GetPath(), .OperationType = GetOperation(Settings.GetType()), diff --git a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp index 0cb7e4124042..772c8d012260 100644 --- a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp +++ b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp @@ -100,6 +100,7 @@ class TKqpTempTablesManager : public TActorBootstrapped { void TraverseNext() { auto schemeCacheRequest = MakeHolder(); + schemeCacheRequest->DatabaseName = Database; schemeCacheRequest->UserToken = UserToken; schemeCacheRequest->ResultSet.resize(PathsToTraverse.size()); diff --git a/ydb/core/local_pgwire/local_pgwire_auth_actor.cpp b/ydb/core/local_pgwire/local_pgwire_auth_actor.cpp index 896247f599b9..33a74c4e2f4c 100644 --- a/ydb/core/local_pgwire/local_pgwire_auth_actor.cpp +++ b/ydb/core/local_pgwire/local_pgwire_auth_actor.cpp @@ -43,7 +43,7 @@ class TPgYdbAuthActor : public NActors::TActorBootstrapped { }; TPgWireAuthData PgWireAuthData; - TActorId PgYdbProxy; + TActorId PgYdbProxy; TString DatabaseId; TString FolderId; @@ -162,6 +162,8 @@ class TPgYdbAuthActor : public NActors::TActorBootstrapped { void SendDescribeRequest() { auto schemeCacheRequest = std::make_unique(); + schemeCacheRequest->DatabaseName = PgWireAuthData.DatabasePath; + NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry entry; entry.Path = NKikimr::SplitPath(PgWireAuthData.DatabasePath); entry.Operation = NKikimr::NSchemeCache::TSchemeCacheNavigate::OpPath; diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 42f2df79e037..ee45d2ab999c 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -457,10 +457,11 @@ namespace NKikimr::NBsController { Y_ABORT_UNLESS(group->Database); Y_ABORT_UNLESS(!group->HiveId); const TString& database = *group->Database; + TString domainPath; const auto& domainsInfo = AppData()->DomainsInfo; if (const auto& domain = domainsInfo->Domain) { - const TString domainPath = TStringBuilder() << '/' << domain->Name; + domainPath = CanonizePath(domain->Name); if (database == domainPath || database.StartsWith(TStringBuilder() << domainPath << '/')) { RootHiveId = domainsInfo->GetHive(); if (RootHiveId == TDomainsInfo::BadTabletId) { @@ -470,6 +471,8 @@ namespace NKikimr::NBsController { } auto req = MakeHolder(); + req->DatabaseName = domainPath; + auto& item = req->ResultSet.emplace_back(); item.Path = NKikimr::SplitPath(database); item.RedirectRequired = false; diff --git a/ydb/core/mind/hive/hive_domains.cpp b/ydb/core/mind/hive/hive_domains.cpp index 39933537cf3b..081b16c312a7 100644 --- a/ydb/core/mind/hive/hive_domains.cpp +++ b/ydb/core/mind/hive/hive_domains.cpp @@ -31,6 +31,8 @@ bool THive::SeenDomain(TSubDomainKey domain) { void THive::ResolveDomain(TSubDomainKey domain) { THolder request = MakeHolder(); + request->DatabaseName = RootDomainName; + request->ResultSet.emplace_back(); auto& entry = request->ResultSet.back(); entry.TableId = TTableId(domain.first, domain.second); diff --git a/ydb/core/mind/node_broker.cpp b/ydb/core/mind/node_broker.cpp index 2ef78408fe9b..cb10c77740b8 100644 --- a/ydb/core/mind/node_broker.cpp +++ b/ydb/core/mind/node_broker.cpp @@ -1394,7 +1394,7 @@ void TNodeBroker::TDirtyState::DbUpdateNodeLocation(const TNodeInfo &node, } void TNodeBroker::TDirtyState::DbReleaseSlotIndex(const TNodeInfo &node, - TTransactionContext &txc) + TTransactionContext &txc) { LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER, @@ -1405,7 +1405,7 @@ void TNodeBroker::TDirtyState::DbReleaseSlotIndex(const TNodeInfo &node, db.Table().Key(node.NodeId) .UpdateToNull(); } - + void TNodeBroker::TDirtyState::DbUpdateNodeAuthorizedByCertificate(const TNodeInfo &node, TTransactionContext &txc) { @@ -1539,6 +1539,8 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, if (record.HasPath()) { auto req = MakeHolder(); + req->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name); + auto& rset = req->ResultSet; rset.emplace_back(); auto& item = rset.back(); @@ -1601,9 +1603,9 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, void TNodeBroker::Handle(TEvNodeBroker::TEvGracefulShutdownRequest::TPtr &ev, const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, "Handle TEvNodeBroker::TEvGracefulShutdownRequest" - << ": request# " << ev->Get()->Record.ShortDebugString()); + << ": request# " << ev->Get()->Record.ShortDebugString()); TabletCounters->Cumulative()[COUNTER_GRACEFUL_SHUTDOWN_REQUESTS].Increment(1); - Execute(CreateTxGracefulShutdown(ev), ctx); + Execute(CreateTxGracefulShutdown(ev), ctx); } void TNodeBroker::Handle(TEvNodeBroker::TEvExtendLeaseRequest::TPtr &ev, diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index db8e17ef13c7..945488ed21d9 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -660,7 +660,7 @@ message TEvStartKqpTasksRequest { optional NKikimrDataEvents.ELockMode LockMode = 19; optional string UserToken = 20; optional bool SupportShuttingDown = 21 [default = true]; - + reserved 15; reserved 16; reserved 18; @@ -866,6 +866,7 @@ message TKqpTableSinkSettings { repeated TKqpTableSinkIndexSettings Indexes = 19; repeated TKqpColumnMetadataProto LookupColumns = 20; optional bool IsIndexImplTable = 21; + optional string Database = 22; } message TKqpStreamLookupSettings { From ca3bde3de6980eafa15aaf8092e2d5edf34a9590 Mon Sep 17 00:00:00 2001 From: Andrey Zaspa Date: Wed, 8 Oct 2025 16:23:49 +0000 Subject: [PATCH 2/2] Filled DatabaseName in SchemeCache requests P.2 --- ydb/core/fq/libs/actors/run_actor.cpp | 1 + .../grpc_services/rpc_rate_limiter_api.cpp | 7 +++++-- ydb/core/kesus/proxy/events.h | 6 ++++-- ydb/core/kesus/proxy/proxy.cpp | 18 +++++++++++------- .../pqtablet/partition/account_read_quoter.cpp | 2 +- ydb/core/quoter/public/quoter.h | 4 +++- ydb/core/quoter/quoter_service.cpp | 4 +++- .../quota_requester.cpp | 2 +- .../quoter_service_bandwidth_test/server.cpp | 4 ++++ .../quoter_service_bandwidth_test/server.h | 1 + ydb/core/quoter/quoter_service_ut.cpp | 4 +++- ydb/core/quoter/ut_helpers.cpp | 2 +- .../actors/compute/dq_async_compute_actor.cpp | 4 ++-- ydb/library/yql/dq/proto/dq_tasks.proto | 1 + ydb/library/yql/dq/runtime/dq_tasks_runner.h | 6 +++++- ydb/services/kesus/grpc_service.cpp | 3 ++- 16 files changed, 48 insertions(+), 21 deletions(-) diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 178c484a0c4c..29873862665d 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -930,6 +930,7 @@ class TRunActor : public NActors::TActorBootstrapped { if (Params.Resources.rate_limiter_path()) { const TString rateLimiterResource = GetRateLimiterResourcePath(Params.CloudId, Params.Scope.ParseFolder(), Params.QueryId); for (auto& task : *ev->Get()->GraphParams.MutableTasks()) { + task.SetRateLimiterDatabase(Params.TenantName); task.SetRateLimiter(Params.Resources.rate_limiter_path()); task.SetRateLimiterResource(rateLimiterResource); } diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index 08607b89a002..affcd82213ae 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -621,17 +621,20 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequestunits_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::kRequired) { SendLeaf( - TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(), + TEvQuota::TResourceLeaf(database, + GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path(), GetProtoRequest()->required())); return; } SendLeaf( - TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(), + TEvQuota::TResourceLeaf(database, + GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path(), GetProtoRequest()->used(), true)); diff --git a/ydb/core/kesus/proxy/events.h b/ydb/core/kesus/proxy/events.h index b9694f7d3b42..f454925b68f2 100644 --- a/ydb/core/kesus/proxy/events.h +++ b/ydb/core/kesus/proxy/events.h @@ -28,10 +28,12 @@ struct TEvKesusProxy { "expected EvEnd <= EventSpaceEnd(TKikimrEvents::ES_KESUS_PROXY)"); struct TEvResolveKesusProxy : public TEventLocal { + const TString Database; const TString KesusPath; - explicit TEvResolveKesusProxy(const TString& kesusPath) - : KesusPath(kesusPath) + TEvResolveKesusProxy(const TString& database, const TString& kesusPath) + : Database(database) + , KesusPath(kesusPath) {} }; diff --git a/ydb/core/kesus/proxy/proxy.cpp b/ydb/core/kesus/proxy/proxy.cpp index b6849a3312e9..781fff4506bc 100644 --- a/ydb/core/kesus/proxy/proxy.cpp +++ b/ydb/core/kesus/proxy/proxy.cpp @@ -73,7 +73,7 @@ class TKesusProxyService : public TActor { } private: - IActor* CreateResolveActor(const TString& kesusPath); + IActor* CreateResolveActor(const TString& database, const TString& kesusPath); void Handle(TEvKesusProxy::TEvResolveKesusProxy::TPtr& ev) { const auto* msg = ev->Get(); @@ -103,7 +103,7 @@ class TKesusProxyService : public TActor { // Recheck schemecache for changes LOG_TRACE_S(ctx, NKikimrServices::KESUS_PROXY, "Starting resolve for kesus " << msg->KesusPath.Quote()); - RegisterWithSameMailbox(CreateResolveActor(msg->KesusPath)); + RegisterWithSameMailbox(CreateResolveActor(msg->Database, msg->KesusPath)); entry.State = CACHE_STATE_RESOLVING; [[fallthrough]]; @@ -220,21 +220,25 @@ class TKesusProxyService : public TActor { class TKesusProxyService::TResolveActor : public TActorBootstrapped { private: const TActorId Owner; + const TString Database; const TString KesusPath; public: - TResolveActor(const TActorId& owner, const TString& kesusPath) + TResolveActor(const TActorId& owner, const TString& database, const TString& kesusPath) : Owner(owner) + , Database(CanonizePath(database)) , KesusPath(kesusPath) {} void Bootstrap(const TActorContext& ctx) { LOG_TRACE_S(ctx, NKikimrServices::KESUS_PROXY, "Sending resolve request to SchemeCache: " << KesusPath.Quote()); - auto request = MakeHolder(); + auto request = MakeHolder(); + request->DatabaseName = Database; + auto& entry = request->ResultSet.emplace_back(); entry.Path = SplitPath(KesusPath); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.Operation = TSchemeCacheNavigate::OpPath; Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); Become(&TThis::StateWork); } @@ -262,8 +266,8 @@ class TKesusProxyService::TResolveActor : public TActorBootstrappedInc(); THolder req(new NSchemeCache::TSchemeCacheNavigate()); + req->DatabaseName = leaf.Database; + req->ResultSet.emplace_back(); req->ResultSet.back().Path.swap(path); req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; @@ -1469,7 +1471,7 @@ TString TQuoterService::PrintEvent(const TEvQuota::TEvRequest::TPtr& ev) { } ret << " { " << leaf.Amount << ", "; if (leaf.Quoter) { - ret << "\"" << leaf.Quoter << "\":\"" << leaf.Resource << "\""; + ret << "\"" << leaf.Database << "\":\"" << leaf.Quoter << "\":\"" << leaf.Resource << "\""; } else { ret << leaf.QuoterId << ":" << leaf.ResourceId; } diff --git a/ydb/core/quoter/quoter_service_bandwidth_test/quota_requester.cpp b/ydb/core/quoter/quoter_service_bandwidth_test/quota_requester.cpp index c5db95e89b9b..a1220470bd7e 100644 --- a/ydb/core/quoter/quoter_service_bandwidth_test/quota_requester.cpp +++ b/ydb/core/quoter/quoter_service_bandwidth_test/quota_requester.cpp @@ -97,7 +97,7 @@ TKesusQuotaRequester::TKesusQuotaRequester(const NKikimr::TOptions& opts, NKikim THolder TKesusQuotaRequester::MakeQuoterRequest() { TVector reqs = { - TEvQuota::TResourceLeaf(KesusPath, ResourcePath, 1.0) + TEvQuota::TResourceLeaf(TTestServer::GetDomainPath(), KesusPath, ResourcePath, 1.0) }; return MakeHolder(TEvQuota::EResourceOperator::And, std::move(reqs), Opts.QuotaRequestDeadline); } diff --git a/ydb/core/quoter/quoter_service_bandwidth_test/server.cpp b/ydb/core/quoter/quoter_service_bandwidth_test/server.cpp index 1a5c4371cbf5..72b055d41364 100644 --- a/ydb/core/quoter/quoter_service_bandwidth_test/server.cpp +++ b/ydb/core/quoter/quoter_service_bandwidth_test/server.cpp @@ -19,6 +19,10 @@ TTestServer::TTestServer(const TOptions &opts) RunServer(); } +TString TTestServer::GetDomainPath() { + return TStringBuilder() << "/" << Tests::TestDomainName; +} + std::pair TTestServer::GetKesusPathAndName(size_t i) { return {Tests::TestDomainName, TStringBuilder() << "Kesus_" << i}; } diff --git a/ydb/core/quoter/quoter_service_bandwidth_test/server.h b/ydb/core/quoter/quoter_service_bandwidth_test/server.h index 59522eb5bfac..6068b1dbe729 100644 --- a/ydb/core/quoter/quoter_service_bandwidth_test/server.h +++ b/ydb/core/quoter/quoter_service_bandwidth_test/server.h @@ -23,6 +23,7 @@ class TTestServer { void RunQuotaRequesters(TRequestStats& stats); + static TString GetDomainPath(); static std::pair GetKesusPathAndName(size_t i); static TString GetKesusPath(size_t i); static TString GetKesusResource(size_t i); diff --git a/ydb/core/quoter/quoter_service_ut.cpp b/ydb/core/quoter/quoter_service_ut.cpp index 696dfc03ee94..46b378fb9d22 100644 --- a/ydb/core/quoter/quoter_service_ut.cpp +++ b/ydb/core/quoter/quoter_service_ut.cpp @@ -190,18 +190,20 @@ Y_UNIT_TEST_SUITE(TQuoterServiceTest) { constexpr double secondsForWait = static_cast(waitDuration.MicroSeconds()) / 1000000.0; constexpr double doubleRate = static_cast(rate); + TString database; TString quoter; TString resource; if (resType == ESpeedTestResourceType::KesusResource) { CreateKesus(server); CreateKesusResource(server, doubleRate); + database = TStringBuilder() << "/" << Tests::TestDomainName; quoter = TStringBuilder() << "/" << Tests::TestDomainName << "/KesusQuoter"; resource = "Res"; } const TEvQuota::TResourceLeaf resLeaf = resType == ESpeedTestResourceType::StaticTaggedRateResource ? TEvQuota::TResourceLeaf(TEvQuota::TResourceLeaf::QuoterSystem, TEvQuota::TResourceLeaf::MakeTaggedRateRes(42, rate), 1) : - TEvQuota::TResourceLeaf(quoter, resource, 1); + TEvQuota::TResourceLeaf(database, quoter, resource, 1); for (size_t iteration = 0; iteration < 2; ++iteration) { const TInstant start = TInstant::Now(); diff --git a/ydb/core/quoter/ut_helpers.cpp b/ydb/core/quoter/ut_helpers.cpp index e791d95d4c64..e51db80d0839 100644 --- a/ydb/core/quoter/ut_helpers.cpp +++ b/ydb/core/quoter/ut_helpers.cpp @@ -134,7 +134,7 @@ void TKesusQuoterTestSetup::SendGetQuotaRequest(const std::vector res; res.reserve(resources.size()); for (auto&& [kesusPath, resourcePath, amount] : resources) { - res.emplace_back(kesusPath, resourcePath, amount); + res.emplace_back("/" + DEFAULT_KESUS_PARENT_PATH, kesusPath, resourcePath, amount); } GetServer().GetRuntime()->Send(new IEventHandle(MakeQuoterServiceID(), GetEdgeActor(), new TEvQuota::TEvRequest(operation, std::move(res), deadline))); } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 1f5d79a6d3cb..4c435cdd0f90 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -660,7 +660,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBaseGetRateLimiterResource(); } + const TProtoStringType& GetRateLimiterDatabase() const { + return Task_->GetRateLimiterDatabase(); + } + const TProtoStringType& GetRateLimiter() const { return Task_->GetRateLimiter(); } diff --git a/ydb/services/kesus/grpc_service.cpp b/ydb/services/kesus/grpc_service.cpp index 600c8ba14447..af07d15fdd99 100644 --- a/ydb/services/kesus/grpc_service.cpp +++ b/ydb/services/kesus/grpc_service.cpp @@ -150,9 +150,10 @@ class TGRpcSessionActor PingPeriod = MinPingPeriod; } + const TString database = RequestEvent->GetDatabaseName().GetOrElse(""); KesusPath = StartRequest->Record.session_start().path(); - auto resolve = MakeHolder(KesusPath); + auto resolve = MakeHolder(database, KesusPath); if (!Send(MakeKesusProxyServiceId(), resolve.Release())) { RequestEvent->Finish(Ydb::StatusIds::UNSUPPORTED, grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "Coordination service not implemented on this server"));