@@ -3358,6 +3358,64 @@ Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_2, TFixtureNoClient)
33583358 TestWriteAndReadMessages (4 , 61'000'000 , true );
33593359}
33603360
3361+ Y_UNIT_TEST_F (Write_50k_100times_50tx, TFixtureTable)
3362+ {
3363+ // 100 transactions. Write 100 50KB messages in each folder. Call the commit at the same time.
3364+ // As a result, there will be a lot of small blobs in the FastWrite zone of the main batch,
3365+ // which will be picked up by a compact. The scenario is similar to the work of Ya.Metrika.
3366+
3367+ const std::size_t PARTITIONS_COUNT = 2 ;
3368+ const std::size_t TXS_COUNT = 50 ;
3369+
3370+ auto makeSourceId = [](unsigned txId, unsigned partitionId) {
3371+ std::string sourceId = TEST_MESSAGE_GROUP_ID;
3372+ sourceId += " _" ;
3373+ sourceId += ToString (txId);
3374+ sourceId += " _" ;
3375+ sourceId += ToString (partitionId);
3376+ return sourceId;
3377+ };
3378+
3379+ CreateTopic (" topic_A" , TEST_CONSUMER, PARTITIONS_COUNT);
3380+
3381+ SetPartitionWriteSpeed (" topic_A" , 50'000'000 );
3382+
3383+ std::vector<std::unique_ptr<TFixture::ISession>> sessions;
3384+ std::vector<std::unique_ptr<TTransactionBase>> transactions;
3385+
3386+ for (std::size_t i = 0 ; i < TXS_COUNT; ++i) {
3387+ sessions.push_back (CreateSession ());
3388+ auto & session = sessions.back ();
3389+
3390+ transactions.push_back (session->BeginTx ());
3391+ auto & tx = transactions.back ();
3392+
3393+ auto sourceId = makeSourceId (i, 0 );
3394+ for (size_t j = 0 ; j < 100 ; ++j) {
3395+ WriteToTopic (" topic_A" , sourceId, std::string (50'000 , ' x' ), tx.get (), 0 );
3396+ }
3397+ WaitForAcks (" topic_A" , sourceId);
3398+
3399+ sourceId = makeSourceId (i, 1 );
3400+ WriteToTopic (" topic_A" , sourceId, std::string (50'000 , ' x' ), tx.get (), 1 );
3401+ WaitForAcks (" topic_A" , sourceId);
3402+ }
3403+
3404+ // We are doing an asynchronous commit of transactions. They will be executed simultaneously.
3405+ std::vector<TAsyncStatus> futures;
3406+
3407+ for (std::size_t i = 0 ; i < TXS_COUNT; ++i) {
3408+ futures.push_back (sessions[i]->AsyncCommitTx (*transactions[i]));
3409+ }
3410+
3411+ // All transactions must be completed successfully.
3412+ for (std::size_t i = 0 ; i < TXS_COUNT; ++i) {
3413+ futures[i].Wait ();
3414+ const auto & result = futures[i].GetValueSync ();
3415+ UNIT_ASSERT_EQUAL_C (result.GetStatus (), EStatus::SUCCESS, result.GetIssues ().ToString ());
3416+ }
3417+ }
3418+
33613419}
33623420
33633421}
0 commit comments