33#include < ydb-cpp-sdk/client/topic/client.h>
44#include < ydb-cpp-sdk/client/table/table.h>
55#include < src/client/persqueue_public/ut/ut_utils/ut_utils.h>
6+
67#include < ydb/core/cms/console/console.h>
78#include < ydb/core/keyvalue/keyvalue_events.h>
89#include < ydb/core/persqueue/key.h>
910#include < ydb/core/persqueue/blob.h>
10-
11+ # include < ydb/core/persqueue/events/global.h >
1112#include < ydb/core/tx/long_tx_service/public/events.h>
1213
13- #include < library/cpp/logger/stream .h>
14+ #include < ydb/core/persqueue/ut/common/autoscaling_ut_common .h>
1415
16+ #include < library/cpp/logger/stream.h>
1517#include < library/cpp/testing/unittest/registar.h>
1618
1719namespace NYdb ::NTopic::NTests {
1820
1921const auto TEST_MESSAGE_GROUP_ID_1 = TEST_MESSAGE_GROUP_ID + " _1" ;
2022const auto TEST_MESSAGE_GROUP_ID_2 = TEST_MESSAGE_GROUP_ID + " _2" ;
23+ const auto TEST_MESSAGE_GROUP_ID_3 = TEST_MESSAGE_GROUP_ID + " _3" ;
24+ const auto TEST_MESSAGE_GROUP_ID_4 = TEST_MESSAGE_GROUP_ID + " _4" ;
2125
2226Y_UNIT_TEST_SUITE (TxUsage) {
2327
@@ -79,9 +83,16 @@ class TFixture : public NUnitTest::TBaseFixture {
7983 const TString& consumer = TEST_CONSUMER,
8084 size_t partitionCount = 1 ,
8185 std::optional<size_t > maxPartitionCount = std::nullopt );
82- void DescribeTopic (const TString& path);
86+ TTopicDescription DescribeTopic (const TString& path);
8387
84- void AddConsumer (const TString& topic, const TVector<TString>& consumers);
88+ void AddConsumer (const TString& topicPath, const TVector<TString>& consumers);
89+ void AlterAutoPartitioning (const TString& topicPath,
90+ ui64 minActivePartitions,
91+ ui64 maxActivePartitions,
92+ EAutoPartitioningStrategy strategy,
93+ TDuration stabilizationWindow,
94+ ui64 downUtilizationPercent,
95+ ui64 upUtilizationPercent);
8596
8697 void WriteToTopicWithInvalidTxId (bool invalidTxId);
8798
@@ -185,6 +196,24 @@ class TFixture : public NUnitTest::TBaseFixture {
185196 const TString& consumerName,
186197 size_t count);
187198
199+ struct TAvgWriteBytes {
200+ ui64 PerSec = 0 ;
201+ ui64 PerMin = 0 ;
202+ ui64 PerHour = 0 ;
203+ ui64 PerDay = 0 ;
204+ };
205+
206+ TAvgWriteBytes GetAvgWriteBytes (const TString& topicPath,
207+ ui32 partitionId);
208+
209+ void CheckAvgWriteBytes (const TString& topicPath,
210+ ui32 partitionId,
211+ size_t minSize, size_t maxSize);
212+
213+ void SplitPartition (const TString& topicPath,
214+ ui32 partitionId,
215+ const TString& boundary);
216+
188217private:
189218 template <class E >
190219 E ReadEvent (TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -213,6 +242,8 @@ class TFixture : public NUnitTest::TBaseFixture {
213242
214243 THashMap<std::pair<TString, TString>, TTopicWriteSessionContext> TopicWriteSessions;
215244 THashMap<TString, TTopicReadSessionPtr> TopicReadSessions;
245+
246+ ui64 SchemaTxId = 1000 ;
216247};
217248
218249TFixture::TTableRecord::TTableRecord (const TString& key, const TString& value) :
@@ -225,6 +256,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
225256{
226257 NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings ();
227258 settings.SetEnableTopicServiceTx (true );
259+ settings.SetEnableTopicSplitMerge (true );
228260 settings.SetEnablePQConfigTransactionsAtSchemeShard (true );
229261
230262 Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
@@ -405,7 +437,7 @@ void TFixture::CreateTopic(const TString& path,
405437 Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount);
406438}
407439
408- void TFixture::AddConsumer (const TString& path ,
440+ void TFixture::AddConsumer (const TString& topicPath ,
409441 const TVector<TString>& consumers)
410442{
411443 NTopic::TTopicClient client (GetDriver ());
@@ -415,13 +447,41 @@ void TFixture::AddConsumer(const TString& path,
415447 settings.BeginAddConsumer (consumer);
416448 }
417449
418- auto result = client.AlterTopic (path , settings).GetValueSync ();
450+ auto result = client.AlterTopic (topicPath , settings).GetValueSync ();
419451 UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
420452}
421453
422- void TFixture::DescribeTopic (const TString& path)
454+ void TFixture::AlterAutoPartitioning (const TString& topicPath,
455+ ui64 minActivePartitions,
456+ ui64 maxActivePartitions,
457+ EAutoPartitioningStrategy strategy,
458+ TDuration stabilizationWindow,
459+ ui64 downUtilizationPercent,
460+ ui64 upUtilizationPercent)
423461{
424- Setup->DescribeTopic (path);
462+ NTopic::TTopicClient client (GetDriver ());
463+ NTopic::TAlterTopicSettings settings;
464+
465+ settings
466+ .BeginAlterPartitioningSettings ()
467+ .MinActivePartitions (minActivePartitions)
468+ .MaxActivePartitions (maxActivePartitions)
469+ .BeginAlterAutoPartitioningSettings ()
470+ .Strategy (strategy)
471+ .StabilizationWindow (stabilizationWindow)
472+ .DownUtilizationPercent (downUtilizationPercent)
473+ .UpUtilizationPercent (upUtilizationPercent)
474+ .EndAlterAutoPartitioningSettings ()
475+ .EndAlterTopicPartitioningSettings ()
476+ ;
477+
478+ auto result = client.AlterTopic (topicPath, settings).GetValueSync ();
479+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
480+ }
481+
482+ TTopicDescription TFixture::DescribeTopic (const TString& path)
483+ {
484+ return Setup->DescribeTopic (path);
425485}
426486
427487const TDriver& TFixture::GetDriver () const
@@ -545,11 +605,11 @@ Y_UNIT_TEST_F(Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture
545605 UNIT_ASSERT_EXCEPTION (ReadMessage (reader, {.Tx = tx, .CommitOffsets = true }), yexception);
546606}
547607
548- // Y_UNIT_TEST_F(WriteToTopic_Invalid_Session, TFixture)
549- // {
550- // WriteToTopicWithInvalidTxId(false);
551- // }
552- //
608+ Y_UNIT_TEST_F (WriteToTopic_Invalid_Session, TFixture)
609+ {
610+ WriteToTopicWithInvalidTxId (false );
611+ }
612+
553613// Y_UNIT_TEST_F(WriteToTopic_Invalid_Tx, TFixture)
554614// {
555615// WriteToTopicWithInvalidTxId(true);
@@ -1021,6 +1081,34 @@ void TFixture::Read_Exactly_N_Messages_From_Topic(const TString& topicPath,
10211081 UNIT_ASSERT_VALUES_EQUAL (count, limit);
10221082}
10231083
1084+ auto TFixture::GetAvgWriteBytes (const TString& topicName,
1085+ ui32 partitionId) -> TAvgWriteBytes
1086+ {
1087+ auto & runtime = Setup->GetRuntime ();
1088+ TActorId edge = runtime.AllocateEdgeActor ();
1089+ ui64 tabletId = GetTopicTabletId (edge, " /Root/" + topicName, partitionId);
1090+
1091+ runtime.SendToPipe (tabletId, edge, new NKikimr::TEvPersQueue::TEvStatus ());
1092+ auto response = runtime.GrabEdgeEvent <NKikimr::TEvPersQueue::TEvStatusResponse>();
1093+
1094+ UNIT_ASSERT_VALUES_EQUAL (tabletId, response->Record .GetTabletId ());
1095+
1096+ TAvgWriteBytes result;
1097+
1098+ for (size_t i = 0 ; i < response->Record .PartResultSize (); ++i) {
1099+ const auto & partition = response->Record .GetPartResult (i);
1100+ if (partition.GetPartition () == static_cast <int >(partitionId)) {
1101+ result.PerSec = partition.GetAvgWriteSpeedPerSec ();
1102+ result.PerMin = partition.GetAvgWriteSpeedPerMin ();
1103+ result.PerHour = partition.GetAvgWriteSpeedPerHour ();
1104+ result.PerDay = partition.GetAvgWriteSpeedPerDay ();
1105+ break ;
1106+ }
1107+ }
1108+
1109+ return result;
1110+ }
1111+
10241112Y_UNIT_TEST_F (WriteToTopic_Demo_1, TFixture)
10251113{
10261114 CreateTopic (" topic_A" );
@@ -2265,6 +2353,150 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture)
22652353 Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
22662354}
22672355
2356+ void TFixture::CheckAvgWriteBytes (const TString& topicPath,
2357+ ui32 partitionId,
2358+ size_t minSize, size_t maxSize)
2359+ {
2360+ #define UNIT_ASSERT_AVGWRITEBYTES (v, minSize, maxSize ) \
2361+ UNIT_ASSERT_LE_C (minSize, v, " , actual " << minSize << " > " << v); \
2362+ UNIT_ASSERT_LE_C (v, maxSize, " , actual " << v << " > " << maxSize);
2363+
2364+ auto avgWriteBytes = GetAvgWriteBytes (topicPath, partitionId);
2365+
2366+ UNIT_ASSERT_AVGWRITEBYTES (avgWriteBytes.PerSec , minSize, maxSize);
2367+ UNIT_ASSERT_AVGWRITEBYTES (avgWriteBytes.PerMin , minSize, maxSize);
2368+ UNIT_ASSERT_AVGWRITEBYTES (avgWriteBytes.PerHour , minSize, maxSize);
2369+ UNIT_ASSERT_AVGWRITEBYTES (avgWriteBytes.PerDay , minSize, maxSize);
2370+
2371+ #undef UNIT_ASSERT_AVGWRITEBYTES
2372+ }
2373+
2374+ void TFixture::SplitPartition (const TString& topicName,
2375+ ui32 partitionId,
2376+ const TString& boundary)
2377+ {
2378+ NKikimr::NPQ::NTest::SplitPartition (Setup->GetRuntime (),
2379+ ++SchemaTxId,
2380+ topicName,
2381+ partitionId,
2382+ boundary);
2383+ }
2384+
2385+ Y_UNIT_TEST_F (WriteToTopic_Demo_45, TFixture)
2386+ {
2387+ // Writing to a topic in a transaction affects the `AvgWriteBytes` indicator
2388+ CreateTopic (" topic_A" , TEST_CONSUMER, 2 );
2389+
2390+ auto session = CreateTableSession ();
2391+ auto tx = BeginTx (session);
2392+
2393+ TString message (1'000 , ' x' );
2394+
2395+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2396+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2397+
2398+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx, 1 );
2399+
2400+ CommitTx (tx, EStatus::SUCCESS);
2401+
2402+ size_t minSize = (message.size () + TEST_MESSAGE_GROUP_ID_1.size ()) * 2 ;
2403+ size_t maxSize = minSize + 200 ;
2404+
2405+ CheckAvgWriteBytes (" topic_A" , 0 , minSize, maxSize);
2406+
2407+ minSize = (message.size () + TEST_MESSAGE_GROUP_ID_2.size ());
2408+ maxSize = minSize + 200 ;
2409+
2410+ CheckAvgWriteBytes (" topic_A" , 1 , minSize, maxSize);
2411+ }
2412+
2413+ Y_UNIT_TEST_F (WriteToTopic_Demo_46, TFixture)
2414+ {
2415+ // The `split` operation of the topic partition affects the writing in the transaction.
2416+ // The transaction commit should fail with an error
2417+ CreateTopic (" topic_A" , TEST_CONSUMER, 2 , 10 );
2418+
2419+ auto session = CreateTableSession ();
2420+ auto tx = BeginTx (session);
2421+
2422+ TString message (1'000 , ' x' );
2423+
2424+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2425+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2426+
2427+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx, 1 );
2428+
2429+ WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID_2);
2430+
2431+ SplitPartition (" topic_A" , 1 , " \xC0 " );
2432+
2433+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx, 1 );
2434+
2435+ CommitTx (tx, EStatus::ABORTED);
2436+ }
2437+
2438+ Y_UNIT_TEST_F (WriteToTopic_Demo_47, TFixture)
2439+ {
2440+ // The `split` operation of the topic partition does not affect the reading in the transaction.
2441+ CreateTopic (" topic_A" , TEST_CONSUMER, 2 , 10 );
2442+
2443+ TString message (1'000 , ' x' );
2444+
2445+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, nullptr , 0 );
2446+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, nullptr , 0 );
2447+
2448+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, nullptr , 1 );
2449+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, nullptr , 1 );
2450+
2451+ WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID_1);
2452+ WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID_2);
2453+
2454+ SplitPartition (" topic_A" , 1 , " \xC0 " );
2455+
2456+ auto session = CreateTableSession ();
2457+ auto tx = BeginTx (session);
2458+
2459+ auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ), &tx, 0 );
2460+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
2461+
2462+ CloseTopicReadSession (" topic_A" , TEST_CONSUMER);
2463+
2464+ messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ), &tx, 1 );
2465+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
2466+
2467+ CommitTx (tx, EStatus::SUCCESS);
2468+ }
2469+
2470+ Y_UNIT_TEST_F (WriteToTopic_Demo_48, TFixture)
2471+ {
2472+ // the commit of a transaction affects the split of the partition
2473+ CreateTopic (" topic_A" , TEST_CONSUMER, 2 , 10 );
2474+ AlterAutoPartitioning (" topic_A" , 2 , 10 , EAutoPartitioningStrategy::ScaleUp, TDuration::Seconds (2 ), 1 , 2 );
2475+
2476+ auto session = CreateTableSession ();
2477+ auto tx = BeginTx (session);
2478+
2479+ TString message (1_MB, ' x' );
2480+
2481+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2482+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, message, &tx, 0 );
2483+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_3, message, &tx, 0 );
2484+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_3, message, &tx, 0 );
2485+
2486+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx, 1 );
2487+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_2, message, &tx, 1 );
2488+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_4, message, &tx, 1 );
2489+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_4, message, &tx, 1 );
2490+
2491+ CommitTx (tx, EStatus::SUCCESS);
2492+
2493+ Sleep (TDuration::Seconds (5 ));
2494+
2495+ auto topicDescription = DescribeTopic (" topic_A" );
2496+
2497+ UNIT_ASSERT_GT (topicDescription.GetTotalPartitionsCount (), 2 );
2498+ }
2499+
22682500}
22692501
22702502}
0 commit comments