@@ -2910,6 +2910,60 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
29102910
29112911 CheckTabletKeys (" topic_A" );
29122912}
2913+
2914+ Y_UNIT_TEST_F (Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
2915+ {
2916+ // The test verifies the simultaneous execution of several transactions. There is a topic
2917+ // with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
2918+ // The size of the messages is random. Such that both large blobs in the body and small ones in
2919+ // the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we
2920+ // will make sure that when committing transactions, the division into blocks is taken into account.
2921+
2922+ const size_t PARTITIONS_COUNT = 20 ;
2923+ const size_t TXS_COUNT = 100 ;
2924+
2925+ CreateTopic (" topic_A" , TEST_CONSUMER, PARTITIONS_COUNT);
2926+
2927+ std::vector<NTable::TSession> sessions;
2928+ std::vector<NTable::TTransaction> transactions;
2929+
2930+ // We open TXS_COUNT transactions and write messages to the topic.
2931+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
2932+ sessions.push_back (CreateTableSession ());
2933+ auto & session = sessions.back ();
2934+
2935+ transactions.push_back (BeginTx (session));
2936+ auto & tx = transactions.back ();
2937+
2938+ for (size_t j = 0 ; j < PARTITIONS_COUNT; ++j) {
2939+ TString sourceId = TEST_MESSAGE_GROUP_ID;
2940+ sourceId += " _" ;
2941+ sourceId += ToString (i);
2942+ sourceId += " _" ;
2943+ sourceId += ToString (j);
2944+
2945+ size_t count = RandomNumber<size_t >(20 ) + 3 ;
2946+ WriteToTopic (" topic_A" , sourceId, TString (512 * 1000 * count, ' x' ), &tx, j);
2947+
2948+ WaitForAcks (" topic_A" , sourceId);
2949+ }
2950+ }
2951+
2952+ // We are doing an asynchronous commit of transactions. They will be executed simultaneously.
2953+ std::vector<NTable::TAsyncCommitTransactionResult> futures;
2954+
2955+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
2956+ futures.push_back (transactions[i].Commit ());
2957+ }
2958+
2959+ // All transactions must be completed successfully.
2960+ for (size_t i = 0 ; i < TXS_COUNT; ++i) {
2961+ futures[i].Wait ();
2962+ const auto & result = futures[i].GetValueSync ();
2963+ UNIT_ASSERT_VALUES_EQUAL_C (result.GetStatus (), EStatus::SUCCESS, result.GetIssues ().ToString ());
2964+ }
2965+ }
2966+
29132967}
29142968
29152969}
0 commit comments