diff --git a/.github/import_generation.txt b/.github/import_generation.txt index b6a7d89c68e..98d9bcb75a6 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -16 +17 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 6d498020879..38a9334ad0c 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -b6b887dc9b0107368d53dff40e8ddcbc04001b57 +ca39c1dc5e3592adab111f61e4aaec2021bfa95b diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 97119cff5e8..118dde30ced 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,5 +4,7 @@ add_subdirectory(pagination) add_subdirectory(secondary_index) add_subdirectory(secondary_index_builtin) add_subdirectory(topic_reader) +add_subdirectory(topic_writer/transaction) add_subdirectory(ttl) add_subdirectory(vector_index) +add_subdirectory(vector_index_builtin) diff --git a/examples/topic_writer/transaction/CMakeLists.txt b/examples/topic_writer/transaction/CMakeLists.txt new file mode 100644 index 00000000000..cbb717958c8 --- /dev/null +++ b/examples/topic_writer/transaction/CMakeLists.txt @@ -0,0 +1,36 @@ +add_executable(topic_writer_transaction) + +target_link_libraries(topic_writer_transaction + PUBLIC + yutil + YDB-CPP-SDK::Topic + YDB-CPP-SDK::Query +) + +target_sources(topic_writer_transaction + PRIVATE + main.cpp +) + +vcs_info(topic_writer_transaction) + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64") + target_link_libraries(topic_writer_transaction PUBLIC + cpuid_check + ) +endif() + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux") + target_link_options(topic_writer_transaction PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -lpthread + ) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin") + target_link_options(topic_writer_transaction PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -framework + CoreFoundation + ) +endif() diff --git a/examples/vector_index/CMakeLists.txt b/examples/vector_index/CMakeLists.txt index cd836a00b48..19249951e37 100644 --- a/examples/vector_index/CMakeLists.txt +++ b/examples/vector_index/CMakeLists.txt @@ -1,14 +1,15 @@ add_executable(vector_index) -target_link_libraries(vector_index PUBLIC - yutil - getopt - YDB-CPP-SDK::Table +target_link_libraries(vector_index + PUBLIC + yutil + getopt + YDB-CPP-SDK::Table ) target_sources(vector_index PRIVATE - ${YDB_SDK_SOURCE_DIR}/examples/vector_index/main.cpp - ${YDB_SDK_SOURCE_DIR}/examples/vector_index/vector_index.cpp + main.cpp + vector_index.cpp ) vcs_info(vector_index) diff --git a/examples/vector_index_builtin/CMakeLists.txt b/examples/vector_index_builtin/CMakeLists.txt new file mode 100644 index 00000000000..75c97b2c3c2 --- /dev/null +++ b/examples/vector_index_builtin/CMakeLists.txt @@ -0,0 +1,39 @@ +add_executable(vector_index_builtin) + +target_link_libraries(vector_index_builtin + PUBLIC + yutil + getopt + YDB-CPP-SDK::Query + YDB-CPP-SDK::Table + YDB-CPP-SDK::Helpers +) + +target_sources(vector_index_builtin + PRIVATE + main.cpp + vector_index.cpp +) + +vcs_info(vector_index_builtin) + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64") + target_link_libraries(vector_index_builtin PUBLIC + cpuid_check + ) +endif() + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux") + target_link_options(vector_index_builtin PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -lpthread + ) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin") + target_link_options(vector_index_builtin PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -framework + CoreFoundation + ) +endif() diff --git a/examples/vector_index_builtin/main.cpp b/examples/vector_index_builtin/main.cpp new file mode 100644 index 00000000000..198c7f4944f --- /dev/null +++ b/examples/vector_index_builtin/main.cpp @@ -0,0 +1,71 @@ +#include "vector_index.h" + +#include + + +void PrintResults(const std::vector& items) +{ + if (items.empty()) { + std::cout << "No items found" << std::endl; + return; + } + + for (const auto& item : items) { + std::cout << "[score=" << item.Score << "] " << item.Id << ": " << item.Document << std::endl; + } +} + +void VectorExample( + const std::string& endpoint, + const std::string& database, + const std::string& tableName, + const std::string& indexName) +{ + auto driverConfig = NYdb::CreateFromEnvironment(endpoint + "/?database=" + database); + NYdb::TDriver driver(driverConfig); + NYdb::NQuery::TQueryClient client(driver); + + try { + DropVectorTable(client, tableName); + CreateVectorTable(client, tableName); + std::vector items = { + {.Id = "1", .Document = "document 1", .Embedding = {0.98, 0.1, 0.01}}, + {.Id = "2", .Document = "document 2", .Embedding = {1.0, 0.05, 0.05}}, + {.Id = "3", .Document = "document 3", .Embedding = {0.9, 0.1, 0.1}}, + {.Id = "4", .Document = "document 4", .Embedding = {0.03, 0.0, 0.99}}, + {.Id = "5", .Document = "document 5", .Embedding = {0.0, 0.0, 0.99}}, + {.Id = "6", .Document = "document 6", .Embedding = {0.0, 0.02, 1.0}}, + {.Id = "7", .Document = "document 7", .Embedding = {0.0, 1.05, 0.05}}, + {.Id = "8", .Document = "document 8", .Embedding = {0.02, 0.98, 0.1}}, + {.Id = "9", .Document = "document 9", .Embedding = {0.0, 1.0, 0.05}}, + }; + InsertItems(client, tableName, items); + PrintResults(SearchItems(client, tableName, {1.0, 0.0, 0.0}, "CosineSimilarity", 3)); + AddIndex(driver, client, database, tableName, indexName, "similarity=cosine", 3, 1, 3); + PrintResults(SearchItems(client, tableName, {1.0, 0.0, 0.0}, "CosineSimilarity", 3, indexName)); + } catch (const std::exception& e) { + std::cerr << "Execution failed: " << e.what() << std::endl; + } + + driver.Stop(true); +} + +int main(int argc, char** argv) { + std::string endpoint; + std::string database; + std::string tableName; + std::string indexName; + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint); + opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("DATABASE").StoreResult(&database); + opts.AddLongOption("table", "table name").Required().RequiredArgument("TABLE").StoreResult(&tableName); + opts.AddLongOption("index", "index name").Required().RequiredArgument("INDEX").StoreResult(&indexName); + + opts.SetFreeArgsMin(0); + NLastGetopt::TOptsParseResult result(&opts, argc, argv); + + VectorExample(endpoint, database, tableName, indexName); + return 0; +} diff --git a/examples/vector_index_builtin/vector_index.cpp b/examples/vector_index_builtin/vector_index.cpp new file mode 100644 index 00000000000..2df8d3e19d8 --- /dev/null +++ b/examples/vector_index_builtin/vector_index.cpp @@ -0,0 +1,177 @@ +#include "vector_index.h" + +#include + + +void DropVectorTable(NYdb::NQuery::TQueryClient& client, const std::string& tableName) +{ + NYdb::NStatusHelpers::ThrowOnError(client.RetryQuerySync([&](NYdb::NQuery::TSession session) { + return session.ExecuteQuery(std::format("DROP TABLE IF EXISTS {}", tableName), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + })); + + std::cout << "Vector table dropped: " << tableName << std::endl; +} + +void CreateVectorTable(NYdb::NQuery::TQueryClient& client, const std::string& tableName) +{ + std::string query = std::format(R"( + CREATE TABLE IF NOT EXISTS `{}` ( + id Utf8, + document Utf8, + embedding String, + PRIMARY KEY (id) + ))", tableName); + + NYdb::NStatusHelpers::ThrowOnError(client.RetryQuerySync([&](NYdb::NQuery::TSession session) { + return session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + })); + + std::cout << "Vector table created: " << tableName << std::endl; +} + +void InsertItems( + NYdb::NQuery::TQueryClient& client, + const std::string& tableName, + const std::vector& items) +{ + std::string query = std::format(R"( + DECLARE $items AS List + >>; + + UPSERT INTO `{0}` + ( + id, + document, + embedding + ) + SELECT + id, + document, + Untag(Knn::ToBinaryStringFloat(embedding), "FloatVector"), + FROM AS_TABLE($items); + )", tableName); + + NYdb::TParamsBuilder paramsBuilder; + auto& valueBuilder = paramsBuilder.AddParam("$items"); + valueBuilder.BeginList(); + for (const auto& item : items) { + valueBuilder.AddListItem(); + valueBuilder.BeginStruct(); + valueBuilder.AddMember("id").Utf8(item.Id); + valueBuilder.AddMember("document").Utf8(item.Document); + valueBuilder.AddMember("embedding").BeginList(); + for (const auto& value : item.Embedding) { + valueBuilder.AddListItem().Float(value); + } + valueBuilder.EndList(); + valueBuilder.EndStruct(); + } + valueBuilder.EndList(); + valueBuilder.Build(); + + NYdb::NStatusHelpers::ThrowOnError(client.RetryQuerySync([params = paramsBuilder.Build(), &query](NYdb::NQuery::TSession session) { + return session.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(), params).ExtractValueSync(); + })); + + std::cout << items.size() << " items inserted" << std::endl; +} + +void AddIndex( + NYdb::TDriver& driver, + NYdb::NQuery::TQueryClient& client, + const std::string& database, + const std::string& tableName, + const std::string& indexName, + const std::string& strategy, + std::uint64_t dim, + std::uint64_t levels, + std::uint64_t clusters) +{ + std::string query = std::format(R"( + ALTER TABLE `{0}` + ADD INDEX {1}__temp + GLOBAL USING vector_kmeans_tree + ON (embedding) + WITH ( + {2}, + vector_type="Float", + vector_dimension={3}, + levels={4}, + clusters={5} + ); + )", tableName, indexName, strategy, dim, levels, clusters); + + NYdb::NStatusHelpers::ThrowOnError(client.RetryQuerySync([&](NYdb::NQuery::TSession session) { + return session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + })); + + NYdb::NTable::TTableClient tableClient(driver); + NYdb::NStatusHelpers::ThrowOnError(tableClient.RetryOperationSync([&](NYdb::NTable::TSession session) { + return session.AlterTable(database + "/" + tableName, NYdb::NTable::TAlterTableSettings() + .AppendRenameIndexes(NYdb::NTable::TRenameIndex{ + .SourceName_ = indexName + "__temp", + .DestinationName_ = indexName, + .ReplaceDestination_ = true + }) + ).ExtractValueSync(); + })); + + std::cout << "Table index `" << indexName << "` for table `" << tableName << "` added" << std::endl; +} + +std::vector SearchItems( + NYdb::NQuery::TQueryClient& client, + const std::string& tableName, + const std::vector& embedding, + const std::string& strategy, + std::uint64_t limit, + const std::optional& indexName) +{ + std::string viewIndex = indexName ? "VIEW " + *indexName : ""; + std::string sortOrder = strategy.ends_with("Similarity") ? "DESC" : "ASC"; + + std::string query = std::format(R"( + DECLARE $embedding as List; + + $TargetEmbedding = Knn::ToBinaryStringFloat($embedding); + + SELECT + id, + document, + Knn::{2}(embedding, $TargetEmbedding) as score + FROM {0} {1} + ORDER BY score + {3} + LIMIT {4}; + )", tableName, viewIndex, strategy, sortOrder, limit); + + NYdb::TParamsBuilder paramsBuilder; + auto& valueBuilder = paramsBuilder.AddParam("$embedding"); + valueBuilder.BeginList(); + for (auto value : embedding) { + valueBuilder.AddListItem().Float(value); + } + valueBuilder.EndList().Build(); + + std::vector result; + + NYdb::NStatusHelpers::ThrowOnError(client.RetryQuerySync([params = paramsBuilder.Build(), &query, &result](NYdb::NQuery::TSession session) { + auto execResult = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(), params).ExtractValueSync(); + if (execResult.IsSuccess()) { + auto parser = execResult.GetResultSetParser(0); + while (parser.TryNextRow()) { + result.push_back({ + .Id = *parser.ColumnParser(0).GetOptionalUtf8(), + .Document = *parser.ColumnParser(1).GetOptionalUtf8(), + .Score = *parser.ColumnParser(2).GetOptionalFloat() + }); + } + } + return execResult; + })); + + return result; +} diff --git a/examples/vector_index_builtin/vector_index.h b/examples/vector_index_builtin/vector_index.h new file mode 100644 index 00000000000..762f42b941a --- /dev/null +++ b/examples/vector_index_builtin/vector_index.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include + +#include + +struct TItem { + std::string Id; + std::string Document; + std::vector Embedding; +}; + +struct TResultItem { + std::string Id; + std::string Document; + float Score; +}; + +void DropVectorTable(NYdb::NQuery::TQueryClient& client, const std::string& tableName); + +void CreateVectorTable(NYdb::NQuery::TQueryClient& client, const std::string& tableName); + +void InsertItems( + NYdb::NQuery::TQueryClient& client, + const std::string& tableName, + const std::vector& items); + +void AddIndex( + NYdb::TDriver& driver, + NYdb::NQuery::TQueryClient& client, + const std::string& database, + const std::string& tableName, + const std::string& indexName, + const std::string& strategy, + std::uint64_t dim, + std::uint64_t levels, + std::uint64_t clusters); + +std::vector SearchItems( + NYdb::NQuery::TQueryClient& client, + const std::string& tableName, + const std::vector& embedding, + const std::string& strategy, + std::uint64_t limit, + const std::optional& indexName = std::nullopt); diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index 60db7c097c3..ac07326994c 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -89,13 +89,12 @@ class TDriverConfig { //! Params is a optionally field to set policy settings //! default: EBalancingPolicy::UsePreferableLocation TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string()); - //! !!! EXPERIMENTAL !!! //! Set grpc level keep alive. If keepalive ping was delayed more than given timeout //! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error //! Note: this timeout should not be too small to prevent fail due to //! network buffers delay. I.e. values less than 5 seconds may cause request failure //! even with fast network - //! default: disabled + //! default: enabled, 10 seconds TDriverConfig& SetGRpcKeepAliveTimeout(TDuration timeout); TDriverConfig& SetGRpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls); //! Set inactive socket timeout. diff --git a/src/api/protos/draft/ydb_bridge.proto b/src/api/protos/draft/ydb_bridge.proto index 6c2df4398f9..196b57e5b8f 100644 --- a/src/api/protos/draft/ydb_bridge.proto +++ b/src/api/protos/draft/ydb_bridge.proto @@ -41,6 +41,8 @@ message UpdateClusterStateRequest { Ydb.Operations.OperationParams operation_params = 1; // List of desired pile states to update repeated PileStateUpdate updates = 2; + // If set, acquire quorum only for specific pile(s) + repeated uint32 specific_pile_ids = 3; } message UpdateClusterStateResponse { diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index 1016008baea..0ce0ef260d8 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -70,8 +70,8 @@ class TDriverConfig::TImpl : public IConnectionsParams { }; bool DrainOnDtors = true; TBalancingSettings BalancingSettings = TBalancingSettings{EBalancingPolicy::UsePreferableLocation, std::string()}; - TDuration GRpcKeepAliveTimeout; - bool GRpcKeepAlivePermitWithoutCalls = false; + TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10); + bool GRpcKeepAlivePermitWithoutCalls = true; TDuration SocketIdleTimeout = TDuration::Minutes(6); uint64_t MemoryQuota = 0; uint64_t MaxInboundMessageSize = 0; diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 7e446e8b1ae..ff7ccee5bd3 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -834,6 +834,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this { } void AddPrecommitCallback(TPrecommitTransactionCallback cb) { + std::lock_guard lock(PrecommitCallbacksMutex); + if (!ChangesAreAccepted) { ythrow TContractViolation("Changes are no longer accepted"); } @@ -842,6 +844,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this { } void AddOnFailureCallback(TOnFailureTransactionCallback cb) { + std::lock_guard lock(OnFailureCallbacksMutex); + if (!ChangesAreAccepted) { ythrow TContractViolation("Changes are no longer accepted"); } @@ -856,6 +860,9 @@ class TTransaction::TImpl : public std::enable_shared_from_this { bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet std::vector PrecommitCallbacks; std::vector OnFailureCallbacks; + + std::mutex PrecommitCallbacksMutex; + std::mutex OnFailureCallbacksMutex; }; TTransaction::TTransaction(const TSession& session, const std::string& txId) diff --git a/src/client/table/impl/transaction.cpp b/src/client/table/impl/transaction.cpp index ecf4c072515..c52756181d6 100644 --- a/src/client/table/impl/transaction.cpp +++ b/src/client/table/impl/transaction.cpp @@ -92,6 +92,8 @@ TAsyncStatus TTransaction::TImpl::Rollback(const TRollbackTxSettings& settings) void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb) { + std::lock_guard lock(PrecommitCallbacksMutex); + if (!ChangesAreAccepted) { ythrow TContractViolation("Changes are no longer accepted"); } @@ -101,6 +103,8 @@ void TTransaction::TImpl::AddPrecommitCallback(TPrecommitTransactionCallback cb) void TTransaction::TImpl::AddOnFailureCallback(TOnFailureTransactionCallback cb) { + std::lock_guard lock(OnFailureCallbacksMutex); + if (!ChangesAreAccepted) { ythrow TContractViolation("Changes are no longer accepted"); } diff --git a/src/client/table/impl/transaction.h b/src/client/table/impl/transaction.h index f5b6025aae9..3eb187975fd 100644 --- a/src/client/table/impl/transaction.h +++ b/src/client/table/impl/transaction.h @@ -2,6 +2,8 @@ #include +#include + namespace NYdb::inline V3::NTable { class TTransaction::TImpl : public std::enable_shared_from_this { @@ -40,6 +42,9 @@ class TTransaction::TImpl : public std::enable_shared_from_this { bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet mutable std::vector PrecommitCallbacks; mutable std::vector OnFailureCallbacks; + + std::mutex PrecommitCallbacksMutex; + std::mutex OnFailureCallbacksMutex; }; } diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index d47812fd31c..19809f111c6 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -305,7 +305,7 @@ namespace NYdb::inline V3::NTopic::NTests { UNIT_ASSERT(writeSession->Write(NPQ::NTest::Msg("message_1.1", 2))); - std::uint64_t txId = 1006; + ui64 txId = 1006; NPQ::NTest::SplitPartition(setup, ++txId, 0, "a"); UNIT_ASSERT(writeSession->Write(NPQ::NTest::Msg("message_1.2", 3))); diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 4274a464fd0..03679095c81 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -396,7 +396,7 @@ class TFixture : public NUnitTest::TBaseFixture { std::unordered_map, TTopicWriteSessionContext> TopicWriteSessions; std::unordered_map TopicReadSessions; - std::uint64_t SchemaTxId = 1000; + ui64 SchemaTxId = 1000; }; class TFixtureTable : public TFixture {