diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 2bd5a0a98a3..409940768f2 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -22 +23 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 4a1cba507c2..180c6e5e284 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -a1abe1a8139be6a6188d79503a63aa2e5068c6c1 +9563c7b171619687436257273b34ceba261903ed diff --git a/examples/auth/ssa_delegation/main.cpp b/examples/auth/ssa_delegation/main.cpp new file mode 100644 index 00000000000..b3a678ee2b8 --- /dev/null +++ b/examples/auth/ssa_delegation/main.cpp @@ -0,0 +1,94 @@ +#include +#include +#include + +#include + + +int main(int argc, char** argv) { + std::string endpoint; + std::string database; + std::string serviceId; + std::string microserviceId; + std::string targetServiceAccountId; + std::string resourceId; + std::string resourceType; + std::string iamEndpoint; + bool useSsl = false; + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint); + opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("PATH").StoreResult(&database); + + opts.AddLongOption("ssl", "Use SSL").NoArgument().SetFlag(&useSsl); + + opts.AddLongOption("target-service-account-id", "Target service account id") + .Required() + .RequiredArgument("ID") + .StoreResult(&targetServiceAccountId); + + opts.AddLongOption("service-id", "Service id") + .RequiredArgument("ID") + .DefaultValue("ydb") + .StoreResult(&serviceId); + + opts.AddLongOption("microservice-id", "Microservice id") + .RequiredArgument("ID") + .DefaultValue("control-plane") + .StoreResult(µserviceId); + + opts.AddLongOption("resource-id", "Resource id") + .Required() + .RequiredArgument("ID") + .StoreResult(&resourceId); + + opts.AddLongOption("iam-endpoint", "IAM endpoint") + .RequiredArgument("HOST") + .DefaultValue("iam.api.cloud.yandex.net") + .StoreResult(&iamEndpoint); + + opts.AddLongOption("resource-type", "Resource type") + .RequiredArgument("STRING") + .DefaultValue("resource-manager.cloud") + .StoreResult(&resourceType); + + opts.SetFreeArgsMin(0); + + NLastGetopt::TOptsParseResult optsResult(&opts, argc, argv); + + NYdb::TIamServiceParams iamParams{ + .SystemServiceAccountCredentials = NYdb::CreateIamCredentialsProviderFactory(), + .ServiceId = serviceId, + .MicroserviceId = microserviceId, + .ResourceId = resourceId, + .ResourceType = resourceType, + .TargetServiceAccountId = targetServiceAccountId, + }; + + iamParams.Endpoint = iamEndpoint; + + auto config = NYdb::TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetCredentialsProviderFactory(NYdb::CreateIamServiceCredentialsProviderFactory(iamParams)); + + if (useSsl) { + config.UseSecureConnection(); + } + + NYdb::TDriver driver(config); + NYdb::NQuery::TQueryClient client(driver); + + auto result = client.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + if (!result.IsSuccess()) { + std::cerr << ToString(static_cast(result)) << std::endl; + return 1; + } + + auto parser = result.GetResultSetParser(0); + while (parser.TryNextRow()) { + std::cout << parser.ColumnParser(0).GetInt32() << std::endl; + } + + return 0; +} diff --git a/include/ydb-cpp-sdk/client/arrow/accessor.h b/include/ydb-cpp-sdk/client/arrow/accessor.h deleted file mode 100644 index 4ea22648fec..00000000000 --- a/include/ydb-cpp-sdk/client/arrow/accessor.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include - -namespace NYdb::inline V3 { - -//! Provides access to Arrow batches of result set. It is not recommended to use this -//! class in client applications as it is an experimental feature. -class TArrowAccessor { -public: - static TResultSet::EFormat Format(const TResultSet& resultSet); - static const std::string& GetArrowSchema(const TResultSet& resultSet); - static const std::vector& GetArrowBatches(const TResultSet& resultSet); -}; - -} // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h index c596fd6c752..f8ed9f38611 100644 --- a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h +++ b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h @@ -29,7 +29,10 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { private: class TImpl : public std::enable_shared_from_this::TImpl> { public: - TImpl(const TIamEndpoint& iamEndpoint, const TRequestFiller& requestFiller, TAsyncRpc rpc) + TImpl(const TIamEndpoint& iamEndpoint, + const TRequestFiller& requestFiller, + TAsyncRpc rpc, + TCredentialsProviderPtr authTokenProvider = nullptr) : Rpc_(rpc) , Ticket_("") , NextTicketUpdate_(TInstant::Zero()) @@ -40,6 +43,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { , NeedStop_(false) , BackoffTimeout_(BACKOFF_START) , Lock_() + , AuthTokenProvider_(authTokenProvider) { std::shared_ptr creds = nullptr; if (IamEndpoint_.EnableSsl) { @@ -82,15 +86,18 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { RequestFiller_(req); - grpc::ClientContext context; + Context_ = std::make_unique(); auto deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(IamEndpoint_.RequestTimeout.MicroSeconds(), GPR_TIMESPAN)); - context.set_deadline(deadline); + Context_->set_deadline(deadline); + if (AuthTokenProvider_) { + Context_->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo()); + } - (Stub_->async()->*Rpc_)(&context, &req, response.get(), std::move(cb)); + (Stub_->async()->*Rpc_)(Context_.get(), &req, response.get(), std::move(cb)); if (sync) { resultPromise.GetFuture().Wait(2 * IamEndpoint_.RequestTimeout); @@ -125,6 +132,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { private: void ProcessIamResponse(grpc::Status&& status, TResponse&& result, bool sync) { + Context_.reset(); if (!status.ok()) { TDuration sleepDuration; { @@ -163,6 +171,8 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { std::shared_ptr Channel_; std::shared_ptr Stub_; TAsyncRpc Rpc_; + std::unique_ptr Context_; + std::string Ticket_; TInstant NextTicketUpdate_; const TIamEndpoint IamEndpoint_; @@ -172,11 +182,15 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { bool NeedStop_; TDuration BackoffTimeout_; TAdaptiveLock Lock_; + TCredentialsProviderPtr AuthTokenProvider_; }; public: - TGrpcIamCredentialsProvider(const TIamEndpoint& endpoint, const TRequestFiller& requestFiller, TAsyncRpc rpc) - : Impl_(std::make_shared(endpoint, requestFiller, rpc)) + TGrpcIamCredentialsProvider(const TIamEndpoint& endpoint, + const TRequestFiller& requestFiller, + TAsyncRpc rpc, + TCredentialsProviderPtr authTokenProvider = nullptr) + : Impl_(std::make_shared(endpoint, requestFiller, rpc, authTokenProvider)) { Impl_->UpdateTicket(true); } diff --git a/include/ydb-cpp-sdk/client/iam/common/types.h b/include/ydb-cpp-sdk/client/iam/common/types.h index 94ca785f09b..70f0c185a3d 100644 --- a/include/ydb-cpp-sdk/client/iam/common/types.h +++ b/include/ydb-cpp-sdk/client/iam/common/types.h @@ -8,7 +8,7 @@ #include #include -namespace NYdb { +namespace NYdb::inline V3 { namespace NIam { constexpr std::string_view DEFAULT_ENDPOINT = "iam.api.cloud.yandex.net"; diff --git a/include/ydb-cpp-sdk/client/iam_private/common/types.h b/include/ydb-cpp-sdk/client/iam_private/common/types.h index f7f070671eb..d4ceab857a3 100644 --- a/include/ydb-cpp-sdk/client/iam_private/common/types.h +++ b/include/ydb-cpp-sdk/client/iam_private/common/types.h @@ -5,6 +5,8 @@ namespace NYdb::inline V3 { struct TIamServiceParams : TIamEndpoint { + TCredentialsProviderFactoryPtr SystemServiceAccountCredentials; + std::string ServiceId; std::string MicroserviceId; std::string ResourceId; diff --git a/include/ydb-cpp-sdk/client/scheme/scheme.h b/include/ydb-cpp-sdk/client/scheme/scheme.h index f4bede6f1e1..ec0291f1e4e 100644 --- a/include/ydb-cpp-sdk/client/scheme/scheme.h +++ b/include/ydb-cpp-sdk/client/scheme/scheme.h @@ -52,6 +52,7 @@ enum class ESchemeEntryType : i32 { ResourcePool = 21, SysView = 22, Transfer = 23, + StreamingQuery = 24, }; struct TVirtualTimestamp { diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index f27de01b8d0..85d6a3ff648 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -652,6 +652,7 @@ class TColumnFamilyDescription { const std::string& GetName() const; std::optional GetData() const; std::optional GetCompression() const; + std::optional GetCacheMode() const; std::optional GetKeepInMemory() const; private: @@ -829,6 +830,7 @@ class TColumnFamilyBuilder { TColumnFamilyBuilder& SetData(const std::string& media); TColumnFamilyBuilder& SetCompression(EColumnFamilyCompression compression); + TColumnFamilyBuilder& SetCacheMode(EColumnFamilyCacheMode cacheMode); TColumnFamilyBuilder& SetKeepInMemory(bool enabled); TColumnFamilyDescription Build() const; @@ -895,6 +897,11 @@ class TTableColumnFamilyBuilder { return *this; } + TTableColumnFamilyBuilder& SetCacheMode(EColumnFamilyCacheMode cacheMode) { + Builder_.SetCacheMode(cacheMode); + return *this; + } + TTableColumnFamilyBuilder& SetKeepInMemory(bool enabled) { Builder_.SetKeepInMemory(enabled); return *this; @@ -1519,6 +1526,11 @@ class TAlterColumnFamilyBuilder { return *this; } + TAlterColumnFamilyBuilder& SetCacheMode(EColumnFamilyCacheMode cacheMode) { + Builder_.SetCacheMode(cacheMode); + return *this; + } + TAlterColumnFamilyBuilder& SetKeepInMemory(bool enabled) { Builder_.SetKeepInMemory(enabled); return *this; diff --git a/include/ydb-cpp-sdk/client/table/table_enum.h b/include/ydb-cpp-sdk/client/table/table_enum.h index 7c5197a84db..5d6aa6111f9 100644 --- a/include/ydb-cpp-sdk/client/table/table_enum.h +++ b/include/ydb-cpp-sdk/client/table/table_enum.h @@ -11,6 +11,12 @@ enum class EColumnFamilyCompression { LZ4, }; +//! Column family cache mode +enum class EColumnFamilyCacheMode { + Regular, + InMemory, +}; + //! State of build index operation enum class EBuildIndexState { Unspecified = 0, diff --git a/src/api/protos/ydb_monitoring.proto b/src/api/protos/ydb_monitoring.proto index 4609df2acb0..6d652244412 100644 --- a/src/api/protos/ydb_monitoring.proto +++ b/src/api/protos/ydb_monitoring.proto @@ -107,12 +107,18 @@ message LoadAverageStatus { uint32 cores = 3; } +message ClockSkewStatus { + StatusFlag.Status overall = 1; + float clock_skew = 2; // ms +} + message ComputeNodeStatus { string id = 1; StatusFlag.Status overall = 2; repeated ComputeTabletStatus tablets = 3; repeated ThreadPoolStatus pools = 4; LoadAverageStatus load = 5; + ClockSkewStatus clock_skew = 6; } message ComputeStatus { @@ -121,6 +127,7 @@ message ComputeStatus { repeated ComputeTabletStatus tablets = 3; float paths_quota_usage = 4; float shards_quota_usage = 5; + ClockSkewStatus clock_skew = 6; } message LocationNode { diff --git a/src/api/protos/ydb_scheme.proto b/src/api/protos/ydb_scheme.proto index f28abe48b7c..2273746bfc8 100644 --- a/src/api/protos/ydb_scheme.proto +++ b/src/api/protos/ydb_scheme.proto @@ -68,6 +68,7 @@ message Entry { RESOURCE_POOL = 21; TRANSFER = 23; SYS_VIEW = 24; + STREAMING_QUERY = 25; } // Name of scheme entry (dir2 of /dir1/dir2) diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 42bba6085af..300dd73185d 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -301,6 +301,12 @@ message ColumnFamilyPolicy { COMPRESSED = 2; } + enum CacheMode { + CACHE_MODE_UNSPECIFIED = 0; + CACHE_MODE_REGULAR = 1; + CACHE_MODE_IN_MEMORY = 2; + } + // Name of the column family, the name "default" must be used for the // primary column family that contains as least primary key columns string name = 1; @@ -315,6 +321,9 @@ message ColumnFamilyPolicy { // Optionally specify whether data should be compressed Compression compression = 5; + + // When IN_MEMORY tries to keep the colums of the family in memory (default is REGULAR) + CacheMode cache_mode = 6; } message CompactionPolicy { @@ -553,6 +562,12 @@ message ColumnFamily { COMPRESSION_ZSTD = 3; } + enum CacheMode { + CACHE_MODE_UNSPECIFIED = 0; + CACHE_MODE_REGULAR = 1; + CACHE_MODE_IN_MEMORY = 2; + } + // Name of the column family string name = 1; @@ -570,6 +585,8 @@ message ColumnFamily { // For ZSTD compression level must be in range [-131072:22] // For other compression types compression level must be empty optional int32 compression_level = 5; + + CacheMode cache_mode = 6; } message PartitioningSettings { diff --git a/src/client/arrow/accessor.cpp b/src/client/arrow/accessor.cpp deleted file mode 100644 index f337357eca5..00000000000 --- a/src/client/arrow/accessor.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include - -namespace NYdb::inline V3 { - -TResultSet::EFormat TArrowAccessor::Format(const TResultSet& resultSet) { - return resultSet.Format(); -} - -const std::string& TArrowAccessor::GetArrowSchema(const TResultSet& resultSet) { - return resultSet.GetArrowSchema(); -} - -const std::vector& TArrowAccessor::GetArrowBatches(const TResultSet& resultSet) { - return resultSet.GetBytesData(); -} - -} // namespace NYdb diff --git a/src/client/iam_private/common/iam.h b/src/client/iam_private/common/iam.h index d84c0caa3ae..acf2cf16b15 100644 --- a/src/client/iam_private/common/iam.h +++ b/src/client/iam_private/common/iam.h @@ -17,7 +17,9 @@ class TIamServiceCredentialsProviderFactory : public ICredentialsProviderFactory req.set_resource_id(params.ResourceId); req.set_resource_type(params.ResourceType); req.set_target_service_account_id(params.TargetServiceAccountId); - }, &TService::Stub::async_interface::CreateForService) {} + }, + &TService::Stub::async_interface::CreateForService, + params.SystemServiceAccountCredentials->CreateProvider()) {} }; public: diff --git a/src/client/operation/impl.h b/src/client/operation/impl.h new file mode 100644 index 00000000000..e8c2d8ba8c9 --- /dev/null +++ b/src/client/operation/impl.h @@ -0,0 +1,146 @@ +#pragma once + +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include +#include +#include + + +namespace NYdb::inline V3::NOperation { + +constexpr TDuration OPERATION_CLIENT_TIMEOUT = TDuration::Seconds(5); + +class TOperationClient::TImpl : public TClientImplCommon { + template + using TSimpleRpc = TGRpcConnectionsImpl::TSimpleRpc; + + template + TAsyncStatus Run(TRequest&& request, TSimpleRpc rpc) { + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] + (TResponse* response, TPlainStatus status) mutable { + if (response) { + NYdb::NIssue::TIssues opIssues; + NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); + TStatus st(static_cast(response->status()), std::move(opIssues)); + promise.SetValue(std::move(st)); + } else { + TStatus st(std::move(status)); + promise.SetValue(std::move(st)); + } + }; + + TRpcRequestSettings rpcSettings; + rpcSettings.ClientTimeout = OPERATION_CLIENT_TIMEOUT; + + Connections_->Run( + std::move(request), + extractor, + rpc, + DbDriverState_, + rpcSettings); + + return promise.GetFuture(); + } + +public: + TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + { + } + + template + NThreading::TFuture Get(Ydb::Operations::GetOperationRequest&& request) { + return RunOperation( + std::move(request), + &Ydb::Operation::V1::OperationService::Stub::AsyncGetOperation); + } + + TAsyncStatus Cancel(Ydb::Operations::CancelOperationRequest&& request) { + return Run(std::move(request), + &Ydb::Operation::V1::OperationService::Stub::AsyncCancelOperation); + } + + TAsyncStatus Forget(Ydb::Operations::ForgetOperationRequest&& request) { + return Run(std::move(request), + &Ydb::Operation::V1::OperationService::Stub::AsyncForgetOperation); + } + + template + NThreading::TFuture> List(Ydb::Operations::ListOperationsRequest&& request) { + auto promise = NThreading::NewPromise>(); + + auto extractor = [promise] + (Ydb::Operations::ListOperationsResponse* response, TPlainStatus status) mutable { + if (response) { + NYdb::NIssue::TIssues opIssues; + NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); + TStatus st(static_cast(response->status()), std::move(opIssues)); + + std::vector operations; + operations.reserve(response->operations_size()); + for (auto& operation : *response->mutable_operations()) { + NYdb::NIssue::TIssues opIssues; + NYdb::NIssue::IssuesFromMessage(operation.issues(), opIssues); + operations.emplace_back( + TStatus(static_cast(operation.status()), std::move(opIssues)), + std::move(operation)); + } + + promise.SetValue(TOperationsList(std::move(st), std::move(operations), response->next_page_token())); + } else { + TStatus st(std::move(status)); + promise.SetValue(TOperationsList(std::move(st))); + } + }; + + TRpcRequestSettings rpcSettings; + rpcSettings.ClientTimeout = OPERATION_CLIENT_TIMEOUT; + + Connections_->Run( + std::move(request), + extractor, + &Ydb::Operation::V1::OperationService::Stub::AsyncListOperations, + DbDriverState_, + rpcSettings); + + return promise.GetFuture(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +template +NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id) { + auto request = MakeRequest(); + request.set_id(TStringType{id.ToString()}); + + return Impl_->Get(std::move(request)); +} + +template +NThreading::TFuture> TOperationClient::List(const std::string& kind, std::uint64_t pageSize, const std::string& pageToken) { + auto request = MakeRequest(); + + request.set_kind(TStringType{kind}); + if (pageSize) { + request.set_page_size(pageSize); + } + if (!pageToken.empty()) { + request.set_page_token(TStringType{pageToken}); + } + + return Impl_->List(std::move(request)); +} + +} diff --git a/src/client/operation/operation.cpp b/src/client/operation/operation.cpp index 737c4847269..8823cf7f39c 100644 --- a/src/client/operation/operation.cpp +++ b/src/client/operation/operation.cpp @@ -11,218 +11,66 @@ #include #include -#ifdef YDB_SDK_USE_DRAFT_API -#include -#endif - -#include -#include -#include - -namespace NYdb::inline V3 { -namespace NOperation { - -constexpr TDuration OPERATION_CLIENT_TIMEOUT = TDuration::Seconds(5); - -using namespace NThreading; -using namespace Ydb::Operation; -using namespace Ydb::Operations; - -class TOperationClient::TImpl : public TClientImplCommon { - template - using TSimpleRpc = TGRpcConnectionsImpl::TSimpleRpc; - - template - TAsyncStatus Run(TRequest&& request, TSimpleRpc rpc) { - auto promise = NewPromise(); - - auto extractor = [promise] - (TResponse* response, TPlainStatus status) mutable { - if (response) { - NYdb::NIssue::TIssues opIssues; - NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); - TStatus st(static_cast(response->status()), std::move(opIssues)); - promise.SetValue(std::move(st)); - } else { - TStatus st(std::move(status)); - promise.SetValue(std::move(st)); - } - }; - - TRpcRequestSettings rpcSettings; - rpcSettings.ClientTimeout = OPERATION_CLIENT_TIMEOUT; - - Connections_->Run( - std::move(request), - extractor, - rpc, - DbDriverState_, - rpcSettings); - - return promise.GetFuture(); - } - -public: - TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) - : TClientImplCommon(std::move(connections), settings) - { - } - - template - TFuture Get(GetOperationRequest&& request) { - return RunOperation( - std::move(request), - &V1::OperationService::Stub::AsyncGetOperation); - } - - TAsyncStatus Cancel(CancelOperationRequest&& request) { - return Run(std::move(request), - &V1::OperationService::Stub::AsyncCancelOperation); - } - - TAsyncStatus Forget(ForgetOperationRequest&& request) { - return Run(std::move(request), - &V1::OperationService::Stub::AsyncForgetOperation); - } - - template - TFuture> List(ListOperationsRequest&& request) { - auto promise = NewPromise>(); - - auto extractor = [promise] - (ListOperationsResponse* response, TPlainStatus status) mutable { - if (response) { - NYdb::NIssue::TIssues opIssues; - NYdb::NIssue::IssuesFromMessage(response->issues(), opIssues); - TStatus st(static_cast(response->status()), std::move(opIssues)); - - std::vector operations; - operations.reserve(response->operations_size()); - for (auto& operation : *response->mutable_operations()) { - NYdb::NIssue::TIssues opIssues; - NYdb::NIssue::IssuesFromMessage(operation.issues(), opIssues); - operations.emplace_back( - TStatus(static_cast(operation.status()), std::move(opIssues)), - std::move(operation)); - } - - promise.SetValue(TOperationsList(std::move(st), std::move(operations), response->next_page_token())); - } else { - TStatus st(std::move(status)); - promise.SetValue(TOperationsList(std::move(st))); - } - }; - - TRpcRequestSettings rpcSettings; - rpcSettings.ClientTimeout = OPERATION_CLIENT_TIMEOUT; - - Connections_->Run( - std::move(request), - extractor, - &V1::OperationService::Stub::AsyncListOperations, - DbDriverState_, - rpcSettings); - - return promise.GetFuture(); - } - -}; - -//////////////////////////////////////////////////////////////////////////////// +#include "impl.h" + + +namespace NYdb::inline V3::NOperation { TOperationClient::TOperationClient(const TDriver& driver, const TCommonClientSettings& settings) : Impl_(new TImpl(CreateInternalInterface(driver), settings)) { } -template -TFuture TOperationClient::Get(const TOperation::TOperationId& id) { - auto request = MakeRequest(); - request.set_id(TStringType{id.ToString()}); - - return Impl_->Get(std::move(request)); -} - TAsyncStatus TOperationClient::Cancel(const TOperation::TOperationId& id) { - auto request = MakeRequest(); + auto request = MakeRequest(); request.set_id(TStringType{id.ToString()}); return Impl_->Cancel(std::move(request)); } TAsyncStatus TOperationClient::Forget(const TOperation::TOperationId& id) { - auto request = MakeRequest(); + auto request = MakeRequest(); request.set_id(TStringType{id.ToString()}); return Impl_->Forget(std::move(request)); } -template -TFuture> TOperationClient::List(const std::string& kind, ui64 pageSize, const std::string& pageToken) { - auto request = MakeRequest(); - - request.set_kind(TStringType{kind}); - if (pageSize) { - request.set_page_size(pageSize); - } - if (!pageToken.empty()) { - request.set_page_token(TStringType{pageToken}); - } - - return Impl_->List(std::move(request)); -} - // Instantiations -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { return List("ss/backgrounds", pageSize, pageToken); } -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { // TODO: export -> export/yt return List("export", pageSize, pageToken); } -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { return List("export/s3", pageSize, pageToken); } -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { return List("import/s3", pageSize, pageToken); } -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { return List("buildindex", pageSize, pageToken); } -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); +template NThreading::TFuture TOperationClient::Get(const TOperation::TOperationId& id); template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { +NThreading::TFuture> TOperationClient::List(std::uint64_t pageSize, const std::string& pageToken) { return List("scriptexec", pageSize, pageToken); } -#ifdef YDB_SDK_USE_DRAFT_API -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); -template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { - return List("incbackup", pageSize, pageToken); -} - -template TFuture TOperationClient::Get(const TOperation::TOperationId& id); -template <> -TFuture> TOperationClient::List(ui64 pageSize, const std::string& pageToken) { - return List("restore", pageSize, pageToken); -} -#endif - -} // namespace NOperation -} // namespace NYdb +} // namespace NYdb::NOperation diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index faabbf61f9e..f18aa0f556a 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -113,6 +113,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::SysView; case ::Ydb::Scheme::Entry::TRANSFER: return ESchemeEntryType::Transfer; + case ::Ydb::Scheme::Entry::STREAMING_QUERY: + return ESchemeEntryType::StreamingQuery; default: return ESchemeEntryType::Unknown; } @@ -146,6 +148,10 @@ void TSchemeEntry::SerializeTo(::Ydb::Scheme::ModifyPermissionsRequest& request) } TModifyPermissionsSettings::TModifyPermissionsSettings(const ::Ydb::Scheme::ModifyPermissionsRequest& request) { + if (request.clear_permissions()) { + AddClearAcl(); + } + for (const auto& action : request.actions()) { switch (action.action_case()) { case Ydb::Scheme::PermissionsAction::kGrant: diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 97b0f282dd6..bbbe4a876c4 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -153,6 +153,17 @@ std::optional TColumnFamilyDescription::GetCompression } } +std::optional TColumnFamilyDescription::GetCacheMode() const { + switch (GetProto().cache_mode()) { + case Ydb::Table::ColumnFamily::CACHE_MODE_REGULAR: + return EColumnFamilyCacheMode::Regular; + case Ydb::Table::ColumnFamily::CACHE_MODE_IN_MEMORY: + return EColumnFamilyCacheMode::InMemory; + default: + return { }; + } +} + std::optional TColumnFamilyDescription::GetKeepInMemory() const { switch (GetProto().keep_in_memory()) { case Ydb::FeatureFlag::ENABLED: @@ -1088,6 +1099,18 @@ TColumnFamilyBuilder& TColumnFamilyBuilder::SetCompression(EColumnFamilyCompress return *this; } +TColumnFamilyBuilder& TColumnFamilyBuilder::SetCacheMode(EColumnFamilyCacheMode cacheMode) { + switch (cacheMode) { + case EColumnFamilyCacheMode::Regular: + Impl_->Proto.set_cache_mode(Ydb::Table::ColumnFamily::CACHE_MODE_REGULAR); + break; + case EColumnFamilyCacheMode::InMemory: + Impl_->Proto.set_cache_mode(Ydb::Table::ColumnFamily::CACHE_MODE_IN_MEMORY); + break; + } + return *this; +} + TColumnFamilyBuilder& TColumnFamilyBuilder::SetKeepInMemory(bool enabled) { Impl_->Proto.set_keep_in_memory(enabled ? Ydb::FeatureFlag::ENABLED : Ydb::FeatureFlag::DISABLED); return *this; diff --git a/src/client/topic/ut/basic_usage_ut.cpp b/src/client/topic/ut/basic_usage_ut.cpp index 5c4f381a19c..99d9af7c42a 100644 --- a/src/client/topic/ut/basic_usage_ut.cpp +++ b/src/client/topic/ut/basic_usage_ut.cpp @@ -101,6 +101,12 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess } Y_UNIT_TEST_SUITE(BasicUsage) { + Y_UNIT_TEST(CreateTopicWithCustomName) { + TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; + const TString name = "test-topic-" + ToString(TInstant::Now().Seconds()); + setup.CreateTopic(name, TEST_CONSUMER, 1); + } + Y_UNIT_TEST(ReadWithoutConsumerWithRestarts) { if (EnableDirectRead) { // TODO(qyryq) Enable the test when LOGBROKER-9364 is done. diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 03679095c81..1ae008fb56b 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -484,25 +484,33 @@ std::vector TFixture::TTableSession::Execute(const std::string& quer bool commit, const TParams& params) { - auto txTable = dynamic_cast(tx); - auto txControl = NTable::TTxControl::Tx(*txTable).CommitTx(commit); - - auto result = Session_.ExecuteDataQuery(query, txControl, params).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + while (true) { + auto txTable = dynamic_cast(tx); + auto txControl = NTable::TTxControl::Tx(*txTable).CommitTx(commit); - return std::move(result).ExtractResultSets(); + auto result = Session_.ExecuteDataQuery(query, txControl, params).GetValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return std::move(result).ExtractResultSets(); + } + std::this_thread::sleep_for(100ms); + } } TFixture::ISession::TExecuteInTxResult TFixture::TTableSession::ExecuteInTx(const std::string& query, bool commit, const TParams& params) { - auto txControl = NTable::TTxControl::BeginTx().CommitTx(commit); - - auto result = Session_.ExecuteDataQuery(query, txControl, params).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + while (true) { + auto txControl = NTable::TTxControl::BeginTx().CommitTx(commit); - return {std::move(result).ExtractResultSets(), std::make_unique(*result.GetTransaction())}; + auto result = Session_.ExecuteDataQuery(query, txControl, params).GetValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return {std::move(result).ExtractResultSets(), std::make_unique(*result.GetTransaction())}; + } + std::this_thread::sleep_for(100ms); + } } std::unique_ptr TFixture::TTableSession::BeginTx() @@ -577,25 +585,33 @@ std::vector TFixture::TQuerySession::Execute(const std::string& quer bool commit, const TParams& params) { - auto txQuery = dynamic_cast(tx); - auto txControl = NQuery::TTxControl::Tx(*txQuery).CommitTx(commit); - - auto result = Session_.ExecuteQuery(query, txControl, params).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + while (true) { + auto txQuery = dynamic_cast(tx); + auto txControl = NQuery::TTxControl::Tx(*txQuery).CommitTx(commit); - return result.GetResultSets(); + auto result = Session_.ExecuteQuery(query, txControl, params).ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return result.GetResultSets(); + } + std::this_thread::sleep_for(100ms); + } } TFixture::ISession::TExecuteInTxResult TFixture::TQuerySession::ExecuteInTx(const std::string& query, bool commit, const TParams& params) { - auto txControl = NQuery::TTxControl::BeginTx().CommitTx(commit); - - auto result = Session_.ExecuteQuery(query, txControl, params).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + while (true) { + auto txControl = NQuery::TTxControl::BeginTx().CommitTx(commit); - return {result.GetResultSets(), std::make_unique(*result.GetTransaction())}; + auto result = Session_.ExecuteQuery(query, txControl, params).ExtractValueSync(); + if (result.GetStatus() != EStatus::SESSION_BUSY) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + return {result.GetResultSets(), std::make_unique(*result.GetTransaction())}; + } + std::this_thread::sleep_for(100ms); + } } std::unique_ptr TFixture::TQuerySession::BeginTx() diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 639d99d790f..1a64d8a0d78 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -36,7 +36,7 @@ void TTopicSdkTestSetup::CreateTopic(const std::string& name, const std::string& { ITopicTestSetup::CreateTopic(name, consumer, partitionCount, maxPartitionCount, retention, important); - Server_.WaitInit(GetTopicPath()); + Server_.WaitInit(GetTopicPath(name)); } TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const std::string& name, const std::string& consumer)