@@ -192,9 +192,9 @@ class TFixture : public NUnitTest::TBaseFixture {
192192
193193 NTable::TDataQueryResult ExecuteDataQuery (NTable::TSession session, const TString& query, const NTable::TTxControl& control);
194194
195- void Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
196- const TString& consumerName,
197- size_t count);
195+ TVector<TString> Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
196+ const TString& consumerName,
197+ size_t count);
198198
199199 struct TAvgWriteBytes {
200200 ui64 PerSec = 0 ;
@@ -1069,16 +1069,22 @@ void TFixture::RestartLongTxService()
10691069 }
10701070}
10711071
1072- void TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1073- const TString& consumerName,
1074- size_t limit)
1072+ TVector<TString> TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1073+ const TString& consumerName,
1074+ size_t limit)
10751075{
1076- size_t count = 0 ;
1077- while (count < limit) {
1076+ TVector<TString> result;
1077+
1078+ while (result.size () < limit) {
10781079 auto messages = ReadFromTopic (topicPath, consumerName, TDuration::Seconds (2 ));
1079- count += messages.size ();
1080+ for (auto & m : messages) {
1081+ result.push_back (std::move (m));
1082+ }
10801083 }
1081- UNIT_ASSERT_VALUES_EQUAL (count, limit);
1084+
1085+ UNIT_ASSERT_VALUES_EQUAL (result.size (), limit);
1086+
1087+ return result;
10821088}
10831089
10841090auto TFixture::GetAvgWriteBytes (const TString& topicName,
@@ -1141,15 +1147,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture)
11411147 CommitTx (tx, EStatus::SUCCESS);
11421148
11431149 {
1144- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1145- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1150+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
11461151 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
11471152 UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #4" );
11481153 }
11491154
11501155 {
1151- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1152- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 5 );
1156+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 5 );
11531157 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #5" );
11541158 UNIT_ASSERT_VALUES_EQUAL (messages[4 ], " message #9" );
11551159 }
@@ -1190,15 +1194,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture)
11901194 CommitTx (tx, EStatus::SUCCESS);
11911195
11921196 {
1193- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1194- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1197+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
11951198 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
11961199 UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #4" );
11971200 }
11981201
11991202 {
1200- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1201- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
1203+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 3 );
12021204 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #7" );
12031205 UNIT_ASSERT_VALUES_EQUAL (messages[2 ], " message #9" );
12041206 }
@@ -1299,15 +1301,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture)
12991301 }
13001302
13011303 {
1302- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1303- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1304+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
13041305 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
13051306 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #3" );
13061307 }
13071308
13081309 {
1309- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1310- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1310+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 2 );
13111311 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #2" );
13121312 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #4" );
13131313 }
@@ -1331,8 +1331,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture)
13311331 CommitTx (tx, EStatus::SUCCESS);
13321332
13331333 {
1334- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1335- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1334+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
13361335 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
13371336 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
13381337 }
@@ -1357,17 +1356,15 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture)
13571356 WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, " message #6" , &tx);
13581357
13591358 {
1360- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1361- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1359+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
13621360 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #3" );
13631361 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #4" );
13641362 }
13651363
13661364 CommitTx (tx, EStatus::SUCCESS);
13671365
13681366 {
1369- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1370- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1367+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
13711368 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
13721369 UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #6" );
13731370 }
@@ -1456,8 +1453,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture)
14561453 }
14571454
14581455 {
1459- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1460- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1456+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
14611457 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
14621458 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
14631459 }
@@ -1753,8 +1749,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture)
17531749
17541750 CommitTx (tx, EStatus::SUCCESS);
17551751
1756- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1757- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1752+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
17581753 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
17591754 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
17601755}
@@ -1773,8 +1768,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture)
17731768
17741769 CommitTx (tx, EStatus::SUCCESS);
17751770
1776- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1777- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1771+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
17781772 UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
17791773 UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
17801774}
@@ -1800,8 +1794,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture)
18001794
18011795 // RestartPQTablet("topic_A", 0);
18021796
1803- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1804- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 8 );
1797+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 8 );
18051798 UNIT_ASSERT_VALUES_EQUAL (messages[0 ].size (), 22'000'000 );
18061799 UNIT_ASSERT_VALUES_EQUAL (messages[1 ].size (), 100 );
18071800 UNIT_ASSERT_VALUES_EQUAL (messages[2 ].size (), 200 );
@@ -2033,8 +2026,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture)
20332026
20342027 CommitTx (tx, EStatus::SUCCESS);
20352028
2036- messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
2037- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2029+ Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 3 );
20382030}
20392031
20402032Y_UNIT_TEST_F (WriteToTopic_Demo_26, TFixture)
@@ -2219,8 +2211,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
22192211
22202212 CommitTx (tx, EStatus::SUCCESS);
22212213
2222- auto messages = ReadFromTopic (" topic_A" , " consumer" , TDuration::Seconds (2 ));
2223- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
2214+ Read_Exactly_N_Messages_From_Topic (" topic_A" , " consumer" , 2 );
22242215}
22252216
22262217Y_UNIT_TEST_F (ReadRuleGeneration, TFixture)
@@ -2240,8 +2231,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
22402231 AddConsumer (TString{TEST_TOPIC}, {" consumer-1" });
22412232
22422233 // We read messages from the topic and committed offsets
2243- auto messages = ReadFromTopic (TString{TEST_TOPIC}, " consumer-1" , TDuration::Seconds (2 ));
2244- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2234+ Read_Exactly_N_Messages_From_Topic (TString{TEST_TOPIC}, " consumer-1" , 3 );
22452235 CloseTopicReadSession (TString{TEST_TOPIC}, " consumer-1" );
22462236
22472237 // And then the Logbroker team turned on the feature flag
@@ -2254,8 +2244,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
22542244 AddConsumer (TString{TEST_TOPIC}, {" consumer-2" });
22552245
22562246 // And they wanted to continue reading their messages
2257- messages = ReadFromTopic (TString{TEST_TOPIC}, " consumer-1" , TDuration::Seconds (2 ));
2258- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 1 );
2247+ Read_Exactly_N_Messages_From_Topic (TString{TEST_TOPIC}, " consumer-1" , 1 );
22592248}
22602249
22612250Y_UNIT_TEST_F (WriteToTopic_Demo_40, TFixture)
0 commit comments