Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
<< ": path# " << id);

auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name);

auto& entry = request->ResultSet.emplace_back();
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
if (Params.Resources.rate_limiter_path()) {
const TString rateLimiterResource = GetRateLimiterResourcePath(Params.CloudId, Params.Scope.ParseFolder(), Params.QueryId);
for (auto& task : *ev->Get()->GraphParams.MutableTasks()) {
task.SetRateLimiterDatabase(Params.TenantName);
task.SetRateLimiter(Params.Resources.rate_limiter_path());
task.SetRateLimiterResource(rateLimiterResource);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/graph/service/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class TGraphService : public TActor<TGraphService> {

BLOG_D("ResolveDatabase " << Database);
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
request->DatabaseName = Database;

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = false;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/grpc_services/rpc_kh_describe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class TKikhouseDescribeTableRPC : public TActorBootstrapped<TKikhouseDescribeTab
WaitingResolveReply = true;
} else {
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
request->DatabaseName = CanonizePath(Request->GetDatabaseName().GetOrElse(""));

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = std::move(path);
if (entry.Path.empty()) {
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/grpc_services/rpc_object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
// TODO: check all params;

TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
request->DatabaseName = CanonizePath(GrpcRequest->GetDatabaseName().GetOrElse(""));

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = NKikimr::SplitPath(table);
if (entry.Path.empty()) {
Expand Down Expand Up @@ -372,7 +374,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
TString err;

bool filterParsedOk = CellsFromTuple(&filterType, columnValues, typesRef, true, false, cells, err, owner);

if (!filterParsedOk) {
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Sprintf("Invalid filter: '%s'", err.data()), ctx);
return false;
Expand Down Expand Up @@ -545,7 +547,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
ev->Record.SetPathColumnDelimiter(Request->Getpath_column_delimiter());
ev->Record.SetSerializedStartAfterKeySuffix(StartAfterSuffixColumns.GetBuffer());
ev->Record.SetMaxKeys(MaxKeys - ContentsRows.size() - CommonPrefixesRows.size());

if (!CommonPrefixesRows.empty()) {
// Next shard might have the same common prefix, need to skip it
ev->Record.SetLastCommonPrefix(CommonPrefixesRows.back());
Expand All @@ -564,7 +566,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag

if (!Filter.ColumnIds.empty()) {
auto* filter = ev->Record.mutable_filter();

for (const auto& colId : Filter.ColumnIds) {
filter->add_columns(colId);
}
Expand Down Expand Up @@ -624,7 +626,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
afterLastFolderPrefix.Parse(prefixColumns);

auto& partitions = KeyRange->GetPartitions();

for (CurrentShardIdx++; CurrentShardIdx < partitions.size(); CurrentShardIdx++) {
auto& partition = KeyRange->GetPartitions()[CurrentShardIdx];

Expand Down Expand Up @@ -693,7 +695,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
shardResponse.GetMoreRows()) {
if (!FindNextShard()) {
ReplySuccess(ctx, (hasMoreShards && shardResponse.GetMoreRows()) || maxKeysExhausted);
return;
return;
}
MakeShardRequest(CurrentShardIdx, ctx);
} else {
Expand All @@ -714,7 +716,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
return NYdb::TTypeBuilder().Pg(getPgTypeFromColMeta(colMeta)).Build();
case NScheme::NTypeIds::Decimal:
return NYdb::TTypeBuilder().Decimal(NYdb::TDecimalType(
typeInfo.GetDecimalType().GetPrecision(),
typeInfo.GetDecimalType().GetPrecision(),
typeInfo.GetDecimalType().GetScale()))
.Build();
default:
Expand All @@ -727,7 +729,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
for (const auto& colMeta : columns) {
const auto type = getTypeFromColMeta(colMeta);
auto* col = resultSet.Addcolumns();

*col->mutable_type()->mutable_optional_type()->mutable_item() = NYdb::TProtoAccessor::GetProto(type);
*col->mutable_name() = colMeta.Name;
}
Expand All @@ -754,7 +756,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
Ydb::Value valueProto;
valueProto.set_low_128(loHi.first);
valueProto.set_high_128(loHi.second);
const NYdb::TDecimalValue decimal(valueProto,
const NYdb::TDecimalValue decimal(valueProto,
{static_cast<ui8>(colMeta.PType.GetDecimalType().GetPrecision()), static_cast<ui8>(colMeta.PType.GetDecimalType().GetScale())});
vb.Decimal(decimal);
} else {
Expand Down Expand Up @@ -801,10 +803,10 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
if (CommonPrefixesRows.size() > 0) {
lastDirectory = CommonPrefixesRows[CommonPrefixesRows.size() - 1];
}

if (isTruncated && (lastDirectory || lastFile)) {
NKikimrTxDataShard::TObjectStorageListingContinuationToken token;

if (lastDirectory > lastFile) {
token.set_last_path(lastDirectory);
token.set_is_folder(true);
Expand All @@ -814,7 +816,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
}

TString serializedToken = token.SerializeAsString();

resp->set_next_continuation_token(serializedToken);
}

Expand All @@ -824,7 +826,7 @@ class TObjectStorageListingRequestGrpc : public TActorBootstrapped<TObjectStorag
GrpcRequest->RaiseIssue(NYql::ExceptionToIssue(ex));
GrpcRequest->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
}

Finished = true;
Die(ctx);
}
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/grpc_services/rpc_rate_limiter_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class TRateLimiterControlRequest : public TRateLimiterRequest<TRateLimiterContro
}

auto req = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
req->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())));
req->ResultSet.emplace_back();
req->ResultSet.back().Path.swap(path);
req->ResultSet.back().Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
Expand Down Expand Up @@ -620,17 +621,20 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi

void SendRequest() {
UnsafeBecome(&TAcquireRateLimiterResourceRPC::StateFunc);
const TString database = CanonizePath(Request().GetDatabaseName().GetOrElse(""));

if (GetProtoRequest()->units_case() == Ydb::RateLimiter::AcquireResourceRequest::UnitsCase::kRequired) {
SendLeaf(
TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(),
TEvQuota::TResourceLeaf(database,
GetProtoRequest()->coordination_node_path(),
GetProtoRequest()->resource_path(),
GetProtoRequest()->required()));
return;
}

SendLeaf(
TEvQuota::TResourceLeaf(GetProtoRequest()->coordination_node_path(),
TEvQuota::TResourceLeaf(database,
GetProtoRequest()->coordination_node_path(),
GetProtoRequest()->resource_path(),
GetProtoRequest()->used(),
true));
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/grpc_services/rpc_read_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class TReadColumnsRPC : public TActorBootstrapped<TReadColumnsRPC> {
WaitingResolveReply = true;
} else {
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
request->DatabaseName = CanonizePath(Request->GetDatabaseName().GetOrElse(""));

NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Path = std::move(path);
if (entry.Path.empty()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_read_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
entry.SyncVersion = false;
entry.ShowPrivatePath = false;
auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
request->DatabaseName = CanonizePath(GetDatabase());
request->ResultSet.emplace_back(entry);
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.release()), 0, 0, Span.GetTraceId());
return true;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/health_check/health_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,7 @@ class TSelfCheckRequest : public TActorBootstrapped<TSelfCheckRequest> {
[[nodiscard]] TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie) {
THolder<TSchemeCacheNavigate> request = MakeHolder<TSchemeCacheNavigate>();
request->Cookie = cookie;
request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name);
TSchemeCacheNavigate::TEntry& entry = request->ResultSet.emplace_back();
entry.Path = NKikimr::SplitPath(path);
entry.Operation = TSchemeCacheNavigate::EOp::OpPath;
Expand All @@ -1080,6 +1081,7 @@ class TSelfCheckRequest : public TActorBootstrapped<TSelfCheckRequest> {
[[nodiscard]] TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(const TPathId& pathId, ui64 cookie) {
THolder<TSchemeCacheNavigate> request = MakeHolder<TSchemeCacheNavigate>();
request->Cookie = cookie;
request->DatabaseName = CanonizePath(AppData()->DomainsInfo->GetDomain()->Name);
TSchemeCacheNavigate::TEntry& entry = request->ResultSet.emplace_back();
entry.TableId.PathId = pathId;
entry.RequestType = TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ void TKafkaSaslAuthActor::SendDescribeRequest(const TActorContext& ctx) {

void TKafkaSaslAuthActor::GetPathByPathId(const TPathId& pathId, const TActorContext& ctx) {
auto schemeCacheRequest = std::make_unique<NKikimr::NSchemeCache::TSchemeCacheNavigate>();
schemeCacheRequest->DatabaseName = DatabasePath;

NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.TableId.PathId = pathId;
entry.RequestType = NKikimr::NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kesus/proxy/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ struct TEvKesusProxy {
"expected EvEnd <= EventSpaceEnd(TKikimrEvents::ES_KESUS_PROXY)");

struct TEvResolveKesusProxy : public TEventLocal<TEvResolveKesusProxy, EvResolveKesusProxy> {
const TString Database;
const TString KesusPath;

explicit TEvResolveKesusProxy(const TString& kesusPath)
: KesusPath(kesusPath)
TEvResolveKesusProxy(const TString& database, const TString& kesusPath)
: Database(database)
, KesusPath(kesusPath)
{}
};

Expand Down
18 changes: 11 additions & 7 deletions ydb/core/kesus/proxy/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TKesusProxyService : public TActor<TKesusProxyService> {
}

private:
IActor* CreateResolveActor(const TString& kesusPath);
IActor* CreateResolveActor(const TString& database, const TString& kesusPath);

void Handle(TEvKesusProxy::TEvResolveKesusProxy::TPtr& ev) {
const auto* msg = ev->Get();
Expand Down Expand Up @@ -103,7 +103,7 @@ class TKesusProxyService : public TActor<TKesusProxyService> {
// Recheck schemecache for changes
LOG_TRACE_S(ctx, NKikimrServices::KESUS_PROXY,
"Starting resolve for kesus " << msg->KesusPath.Quote());
RegisterWithSameMailbox(CreateResolveActor(msg->KesusPath));
RegisterWithSameMailbox(CreateResolveActor(msg->Database, msg->KesusPath));
entry.State = CACHE_STATE_RESOLVING;
[[fallthrough]];

Expand Down Expand Up @@ -220,21 +220,25 @@ class TKesusProxyService : public TActor<TKesusProxyService> {
class TKesusProxyService::TResolveActor : public TActorBootstrapped<TResolveActor> {
private:
const TActorId Owner;
const TString Database;
const TString KesusPath;

public:
TResolveActor(const TActorId& owner, const TString& kesusPath)
TResolveActor(const TActorId& owner, const TString& database, const TString& kesusPath)
: Owner(owner)
, Database(CanonizePath(database))
, KesusPath(kesusPath)
{}

void Bootstrap(const TActorContext& ctx) {
LOG_TRACE_S(ctx, NKikimrServices::KESUS_PROXY,
"Sending resolve request to SchemeCache: " << KesusPath.Quote());
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
auto request = MakeHolder<TSchemeCacheNavigate>();
request->DatabaseName = Database;

auto& entry = request->ResultSet.emplace_back();
entry.Path = SplitPath(KesusPath);
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.Operation = TSchemeCacheNavigate::OpPath;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
Become(&TThis::StateWork);
}
Expand Down Expand Up @@ -262,8 +266,8 @@ class TKesusProxyService::TResolveActor : public TActorBootstrapped<TResolveActo
}
};

IActor* TKesusProxyService::CreateResolveActor(const TString& kesusPath) {
return new TResolveActor(SelfId(), kesusPath);
IActor* TKesusProxyService::CreateResolveActor(const TString& database, const TString& kesusPath) {
return new TResolveActor(SelfId(), database, kesusPath);
}

TActorId MakeKesusProxyServiceId() {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
AFL_ENSURE(dirPath.size() >= 2);

auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
request->DatabaseName = Database;
TVector<TString> path;

for (const auto& part : dirPath) {
Expand Down Expand Up @@ -564,7 +565,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
auto analyzePromise = NewPromise<IKqpGateway::TGenericResult>();

TVector<TString> columns{analyzeOperation.columns().begin(), analyzeOperation.columns().end()};
IActor* analyzeActor = new TAnalyzeActor(analyzeOperation.GetTablePath(), columns, analyzePromise);
IActor* analyzeActor = new TAnalyzeActor(Database, analyzeOperation.GetTablePath(), columns, analyzePromise);

auto actorSystem = TActivationContext::ActorSystem();
RegisterWithSameMailbox(analyzeActor);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class TKqpTableResolver : public TActorBootstrapped<TKqpTableResolver> {
void ResolveKeys() {
auto requestNavigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->DatabaseName = TasksGraph.GetMeta().Database;
request->ResultSet.reserve(TasksGraph.GetStagesInfo().size());
if (UserToken && !UserToken->GetSerializedToken().empty()) {
request->UserToken = UserToken;
Expand Down
Loading
Loading