1- #include " ut_utils/managed_executor.h"
21#include " ut_utils/topic_sdk_test_setup.h"
2+
3+ #include < tests/integration/topic/utils/managed_executor.h>
4+
35#include < src/client/persqueue_public/ut/ut_utils/ut_utils.h>
46
57#include < ydb-cpp-sdk/client/topic/client.h>
1820
1921#include < util/stream/zlib.h>
2022
21- #include < future>
2223
24+ using namespace std ::chrono_literals;
2325
2426static const bool EnableDirectRead = !std::string{std::getenv (" PQ_EXPERIMENTAL_DIRECT_READ" ) ? std::getenv (" PQ_EXPERIMENTAL_DIRECT_READ" ) : " " }.empty();
2527
2628
27- namespace NYdb ::NTopic::NTests {
29+ namespace NYdb ::inline V3:: NTopic::NTests {
2830
29- void WriteAndReadToEndWithRestarts (TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, ui32 count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
31+ void WriteAndReadToEndWithRestarts (TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, std:: uint32_t count, TTopicSdkTestSetup& setup, TIntrusivePtr<TManagedExecutor> decompressor) {
3032 auto client = setup.MakeClient ();
3133 auto session = client.CreateSimpleBlockingWriteSession (writeSettings);
3234
33- for (ui32 i = 1 ; i <= count; ++i) {
35+ for (std:: uint32_t i = 1 ; i <= count; ++i) {
3436 bool res = session->Write (message);
3537 UNIT_ASSERT (res);
3638 }
@@ -44,7 +46,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
4446
4547 auto WaitTasks = [&](auto f, size_t c) {
4648 while (f () < c) {
47- Sleep ( TDuration::MilliSeconds ( 100 ) );
49+ std::this_thread::sleep_for (100ms );
4850 };
4951 };
5052 auto WaitPlannedTasks = [&](auto e, size_t count) {
@@ -69,7 +71,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
6971 size_t completed = e->GetExecutedCount ();
7072
7173 setup.GetServer ().KillTopicPqrbTablet (setup.GetTopicPath ());
72- Sleep ( TDuration::MilliSeconds ( 100 ) );
74+ std::this_thread::sleep_for (100ms );
7375
7476 e->StartFuncs (tasks);
7577 WaitExecutedTasks (e, completed + n);
@@ -90,7 +92,7 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess
9092
9193 ReadSession = topicClient.CreateReadSession (readSettings);
9294
93- ui32 i = 0 ;
95+ std:: uint32_t i = 0 ;
9496 while (AtomicGet (lastOffset) + 1 < count) {
9597 RunTasks (decompressor, {i++});
9698 }
@@ -109,7 +111,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
109111 auto decompressor = CreateThreadPoolManagedExecutor (1 );
110112
111113 TReadSessionSettings readSettings;
112- TTopicReadSettings topic = TEST_TOPIC ;
114+ TTopicReadSettings topic = setup. GetTopicPath () ;
113115 topic.AppendPartitionIds (0 );
114116 readSettings
115117 .WithoutConsumer ()
@@ -121,13 +123,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
121123
122124 TWriteSessionSettings writeSettings;
123125 writeSettings
124- .Path (TEST_TOPIC )
126+ .Path (setup. GetTopicPath () )
125127 .MessageGroupId (TEST_MESSAGE_GROUP_ID)
126128 .Codec (NTopic::ECodec::RAW)
127129 .CompressionExecutor (compressor);
128130
129131
130- ui32 count = 700 ;
132+ std:: uint32_t count = 700 ;
131133 std::string message (2'000 , ' x' );
132134
133135 WriteAndReadToEndWithRestarts (readSettings, writeSettings, message, count, setup, decompressor);
@@ -140,21 +142,21 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
140142
141143 TReadSessionSettings readSettings;
142144 readSettings
143- .ConsumerName (TEST_CONSUMER )
145+ .ConsumerName (setup. GetConsumerName () )
144146 .MaxMemoryUsageBytes (1_MB)
145147 .DecompressionExecutor (decompressor)
146- .AppendTopics (TEST_TOPIC )
148+ .AppendTopics (setup. GetTopicPath () )
147149 // .DirectRead(EnableDirectRead)
148150 ;
149151
150152 TWriteSessionSettings writeSettings;
151153 writeSettings
152- .Path (TEST_TOPIC ).MessageGroupId (TEST_MESSAGE_GROUP_ID)
153- .Codec (NTopic:: ECodec::RAW)
154+ .Path (setup. GetTopicPath () ).MessageGroupId (TEST_MESSAGE_GROUP_ID)
155+ .Codec (ECodec::RAW)
154156 .CompressionExecutor (compressor);
155157
156158
157- ui32 count = 700 ;
159+ std:: uint32_t count = 700 ;
158160 std::string message (2'000 , ' x' );
159161
160162 WriteAndReadToEndWithRestarts (readSettings, writeSettings, message, count, setup, decompressor);
@@ -164,25 +166,25 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
164166
165167 TTopicSdkTestSetup setup (TEST_CASE_NAME);
166168
167- NTopic:: TWriteSessionSettings writeSettings;
169+ TWriteSessionSettings writeSettings;
168170 writeSettings.Path (setup.GetTopicPath ()).MessageGroupId (TEST_MESSAGE_GROUP_ID);
169171 writeSettings.Path (setup.GetTopicPath ()).ProducerId (TEST_MESSAGE_GROUP_ID);
170- writeSettings.Codec (NTopic:: ECodec::RAW);
171- NTopic:: IExecutor::TPtr executor = new NTopic:: TSyncExecutor ();
172+ writeSettings.Codec (ECodec::RAW);
173+ IExecutor::TPtr executor = new TSyncExecutor ();
172174 writeSettings.CompressionExecutor (executor);
173175
174- ui64 count = 100u ;
176+ std:: uint64_t count = 100u ;
175177
176178 auto client = setup.MakeClient ();
177179 auto session = client.CreateSimpleBlockingWriteSession (writeSettings);
178180
179- TString messageBase = " message----" ;
181+ std::string messageBase = " message----" ;
180182
181183 for (auto i = 0u ; i < count; i++) {
182184 auto res = session->Write (messageBase);
183185 UNIT_ASSERT (res);
184186 if (i % 10 == 0 ) {
185- setup.GetServer ().KillTopicPqTablets (setup.GetTopicPath ());
187+ setup.GetServer ().KillTopicPqTablets (setup.GetFullTopicPath ());
186188 }
187189 }
188190 session->Close ();
0 commit comments