@@ -94,6 +94,8 @@ class TFixture : public NUnitTest::TBaseFixture {
9494 TDuration stabilizationWindow,
9595 ui64 downUtilizationPercent,
9696 ui64 upUtilizationPercent);
97+ void SetPartitionWriteSpeed (const std::string& topicPath,
98+ size_t bytesPerSeconds);
9799
98100 void WriteToTopicWithInvalidTxId (bool invalidTxId);
99101
@@ -511,6 +513,18 @@ void TFixture::AlterAutoPartitioning(const TString& topicPath,
511513 UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
512514}
513515
516+ void TFixture::SetPartitionWriteSpeed (const std::string& topicPath,
517+ size_t bytesPerSeconds)
518+ {
519+ NTopic::TTopicClient client (GetDriver ());
520+ NTopic::TAlterTopicSettings settings;
521+
522+ settings.SetPartitionWriteSpeedBytesPerSecond (bytesPerSeconds);
523+
524+ auto result = client.AlterTopic (topicPath, settings).GetValueSync ();
525+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
526+ }
527+
514528TTopicDescription TFixture::DescribeTopic (const TString& path)
515529{
516530 return Setup->DescribeTopic (path);
@@ -3005,9 +3019,6 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
30053019
30063020Y_UNIT_TEST_F (Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30073021{
3008- // Consumes a lot of memory. Temporarily disabled
3009- return ;
3010-
30113022 // The test verifies the simultaneous execution of several transactions. There is a topic
30123023 // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
30133024 // The size of the messages is random. Such that both large blobs in the body and small ones in
@@ -3019,6 +3030,8 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30193030
30203031 CreateTopic (" topic_A" , TEST_CONSUMER, PARTITIONS_COUNT);
30213032
3033+ SetPartitionWriteSpeed (" topic_A" , 50'000'000 );
3034+
30223035 std::vector<NTable::TSession> sessions;
30233036 std::vector<NTable::TTransaction> transactions;
30243037
@@ -3059,6 +3072,141 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30593072 }
30603073}
30613074
3075+ Y_UNIT_TEST_F (Write_Only_Big_Messages_In_Wide_Transactions, TFixture)
3076+ {
3077+ // The test verifies the simultaneous execution of several transactions. There is a topic `topic_A` and
3078+ // it contains a `PARTITIONS_COUNT' of partitions. In each transaction, the test writes to all partitions.
3079+ // The size of the messages is chosen so that only large blobs are recorded in the transaction and there
3080+ // are no records in the head. Thus, we verify that transaction bundling is working correctly.
3081+
3082+ const size_t PARTITIONS_COUNT = 20 ;
3083+ const size_t TXS_COUNT = 100 ;
3084+
3085+ CreateTopic (" topic_A" , TEST_CONSUMER, PARTITIONS_COUNT);
3086+
3087+ SetPartitionWriteSpeed (" topic_A" , 50'000'000 );
3088+
3089+ std::vector<NTable::TSession> sessions;
3090+ std::vector<NTable::TTransaction> transactions;
3091+
3092+ // We open TXS_COUNT transactions and write messages to the topic.
3093+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3094+ sessions.push_back (CreateTableSession ());
3095+ auto & session = sessions.back ();
3096+
3097+ transactions.push_back (BeginTx (session));
3098+ auto & tx = transactions.back ();
3099+
3100+ for (size_t j = 0 ; j < PARTITIONS_COUNT; ++j) {
3101+ TString sourceId = TEST_MESSAGE_GROUP_ID;
3102+ sourceId += " _" ;
3103+ sourceId += ToString (i);
3104+ sourceId += " _" ;
3105+ sourceId += ToString (j);
3106+
3107+ WriteToTopic (" topic_A" , sourceId, TString (6'500'000 , ' x' ), &tx, j);
3108+
3109+ WaitForAcks (" topic_A" , sourceId);
3110+ }
3111+ }
3112+
3113+ // We are doing an asynchronous commit of transactions. They will be executed simultaneously.
3114+ std::vector<NTable::TAsyncCommitTransactionResult> futures;
3115+
3116+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3117+ futures.push_back (transactions[i].Commit ());
3118+ }
3119+
3120+ // All transactions must be completed successfully.
3121+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3122+ futures[i].Wait ();
3123+ const auto & result = futures[i].GetValueSync ();
3124+ UNIT_ASSERT_VALUES_EQUAL_C (result.GetStatus (), EStatus::SUCCESS, result.GetIssues ().ToString ());
3125+ }
3126+ }
3127+
3128+ Y_UNIT_TEST_F (Transactions_Conflict_On_SeqNo, TFixture)
3129+ {
3130+ const ui32 PARTITIONS_COUNT = 20 ;
3131+ const size_t TXS_COUNT = 100 ;
3132+
3133+ CreateTopic (" topic_A" , TEST_CONSUMER, PARTITIONS_COUNT);
3134+
3135+ SetPartitionWriteSpeed (" topic_A" , 50'000'000 );
3136+
3137+ auto tableSession = CreateTableSession ();
3138+ std::vector<std::shared_ptr<NTopic::ISimpleBlockingWriteSession>> topicWriteSessions;
3139+
3140+ for (ui32 i = 0 ; i < PARTITIONS_COUNT; ++i) {
3141+ TString sourceId = TEST_MESSAGE_GROUP_ID;
3142+ sourceId += " _" ;
3143+ sourceId += ToString (i);
3144+
3145+ NTopic::TTopicClient client (GetDriver ());
3146+ NTopic::TWriteSessionSettings options;
3147+ options.Path (" topic_A" );
3148+ options.ProducerId (sourceId);
3149+ options.MessageGroupId (sourceId);
3150+ options.PartitionId (i);
3151+ options.Codec (ECodec::RAW);
3152+
3153+ auto session = client.CreateSimpleBlockingWriteSession (options);
3154+
3155+ topicWriteSessions.push_back (std::move (session));
3156+ }
3157+
3158+ std::vector<NTable::TSession> sessions;
3159+ std::vector<NTable::TTransaction> transactions;
3160+
3161+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3162+ sessions.push_back (CreateTableSession ());
3163+ auto & session = sessions.back ();
3164+
3165+ transactions.push_back (BeginTx (session));
3166+ auto & tx = transactions.back ();
3167+
3168+ for (size_t j = 0 ; j < PARTITIONS_COUNT; ++j) {
3169+ TString sourceId = TEST_MESSAGE_GROUP_ID;
3170+ sourceId += " _" ;
3171+ sourceId += ToString (j);
3172+
3173+ for (size_t k = 0 , count = RandomNumber<size_t >(20 ) + 1 ; k < count; ++k) {
3174+ const std::string data (RandomNumber<size_t >(1'000 ) + 100 , ' x' );
3175+ NTopic::TWriteMessage params (data);
3176+ params.Tx (tx);
3177+
3178+ topicWriteSessions[j]->Write (std::move (params));
3179+ }
3180+ }
3181+ }
3182+
3183+ std::vector<NTable::TAsyncCommitTransactionResult> futures;
3184+
3185+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3186+ futures.push_back (transactions[i].Commit ());
3187+ }
3188+
3189+ // Some transactions should end with the error `ABORTED`
3190+ size_t successCount = 0 ;
3191+
3192+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
3193+ futures[i].Wait ();
3194+ const auto & result = futures[i].GetValueSync ();
3195+ switch (result.GetStatus ()) {
3196+ case EStatus::SUCCESS:
3197+ ++successCount;
3198+ break ;
3199+ case EStatus::ABORTED:
3200+ break ;
3201+ default :
3202+ UNIT_FAIL (" unexpected status: " << static_cast <const NYdb::TStatus&>(result));
3203+ break ;
3204+ }
3205+ }
3206+
3207+ UNIT_ASSERT_VALUES_UNEQUAL (successCount, TXS_COUNT);
3208+ }
3209+
30623210}
30633211
30643212}
0 commit comments