@@ -181,6 +181,10 @@ class TFixture : public NUnitTest::TBaseFixture {
181181
182182 NTable::TDataQueryResult ExecuteDataQuery (NTable::TSession session, const TString& query, const NTable::TTxControl& control);
183183
184+ void Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
185+ const TString& consumerName,
186+ size_t count);
187+
184188private:
185189 template <class E >
186190 E ReadEvent (TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -531,10 +535,13 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
531535Y_UNIT_TEST_F (Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture)
532536{
533537 WriteMessage (" message" );
538+
534539 auto session = CreateTableSession ();
535540 auto tx = BeginTx (session);
541+
536542 auto reader = CreateReader ();
537543 StartPartitionSession (reader, tx, 0 );
544+
538545 UNIT_ASSERT_EXCEPTION (ReadMessage (reader, {.Tx = tx, .CommitOffsets = true }), yexception);
539546}
540547
@@ -1002,6 +1009,18 @@ void TFixture::RestartLongTxService()
10021009 }
10031010}
10041011
1012+ void TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1013+ const TString& consumerName,
1014+ size_t limit)
1015+ {
1016+ size_t count = 0 ;
1017+ while (count < limit) {
1018+ auto messages = ReadFromTopic (topicPath, consumerName, TDuration::Seconds (2 ));
1019+ count += messages.size ();
1020+ }
1021+ UNIT_ASSERT_VALUES_EQUAL (count, limit);
1022+ }
1023+
10051024Y_UNIT_TEST_F (WriteToTopic_Demo_1, TFixture)
10061025{
10071026 CreateTopic (" topic_A" );
@@ -2166,8 +2185,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_40, TFixture)
21662185
21672186 CommitTx (tx, EStatus::SUCCESS);
21682187
2169- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2170- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2188+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
21712189}
21722190
21732191Y_UNIT_TEST_F (WriteToTopic_Demo_41, TFixture)
@@ -2202,8 +2220,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_42, TFixture)
22022220
22032221 CommitTx (tx, EStatus::SUCCESS);
22042222
2205- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
2206- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2223+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
22072224}
22082225
22092226Y_UNIT_TEST_F (WriteToTopic_Demo_43, TFixture)
@@ -2221,8 +2238,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_43, TFixture)
22212238
22222239 ExecuteDataQuery (tableSession, " SELECT 1" , NTable::TTxControl::Tx (tx).CommitTx (true ));
22232240
2224- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2225- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2241+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
22262242}
22272243
22282244Y_UNIT_TEST_F (WriteToTopic_Demo_44, TFixture)
@@ -2246,8 +2262,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture)
22462262
22472263 ExecuteDataQuery (tableSession, " SELECT 2" , NTable::TTxControl::Tx (tx).CommitTx (true ));
22482264
2249- messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2250- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2265+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
22512266}
22522267
22532268}
0 commit comments