Skip to content

Commit 40fc975

Browse files
committed
fix
1 parent a1d65d1 commit 40fc975

File tree

10 files changed

+79
-93
lines changed

10 files changed

+79
-93
lines changed

tests/integration/topic/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ add_ydb_test(NAME topic_direct_read_it GTEST
3333
integration
3434
ENV
3535
PQ_EXPERIMENTAL_DIRECT_READ=1
36-
MANY_MESSAGES_DISABLED=1
36+
YDB_TEST_TOPIC_PATH=topic_direct_read
3737
)

tests/integration/topic/basic_usage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
static const bool EnableDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty();
1717

1818

19-
namespace NYdb::NPersQueue::NTests {
19+
namespace NYdb::inline V3::NPersQueue::NTests {
2020

2121
class TSimpleWriteSessionTestAdapter {
2222
public:
@@ -40,7 +40,7 @@ std::uint64_t TSimpleWriteSessionTestAdapter::GetAcquiredMessagesCount() const {
4040

4141
}
4242

43-
namespace NYdb::NTopic::NTests {
43+
namespace NYdb::inline V3::NTopic::NTests {
4444

4545
class TManagedExecutor : public IExecutor {
4646
public:

tests/integration/topic/describe_topic.cpp

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <thread>
66

7-
namespace NYdb::NTopic::NTests {
7+
namespace NYdb::inline V3::NTopic::NTests {
88

99
class Describe : public TTopicTestFixture {
1010
protected:
@@ -20,24 +20,24 @@ class Describe : public TTopicTestFixture {
2020
const auto& description = result.GetTopicDescription();
2121

2222
const auto& partitions = description.GetPartitions();
23-
EXPECT_EQ(partitions.size(), 1);
23+
EXPECT_EQ(partitions.size(), 1u);
2424

2525
const auto& partition = partitions[0];
2626
EXPECT_TRUE(partition.GetActive());
27-
EXPECT_EQ(partition.GetPartitionId(), 0);
27+
EXPECT_EQ(partition.GetPartitionId(), 0u);
2828

2929
if (requireStats) {
3030
const auto& stats = description.GetTopicStats();
3131

3232
if (requireNonEmptyStats) {
33-
EXPECT_GT(stats.GetStoreSizeBytes(), 0);
34-
EXPECT_GT(stats.GetBytesWrittenPerMinute(), 0);
35-
EXPECT_GT(stats.GetBytesWrittenPerHour(), 0);
36-
EXPECT_GT(stats.GetBytesWrittenPerDay(), 0);
33+
EXPECT_GT(stats.GetStoreSizeBytes(), 0u);
34+
EXPECT_GT(stats.GetBytesWrittenPerMinute(), 0u);
35+
EXPECT_GT(stats.GetBytesWrittenPerHour(), 0u);
36+
EXPECT_GT(stats.GetBytesWrittenPerDay(), 0u);
3737
EXPECT_GT(stats.GetMaxWriteTimeLag(), TDuration::Zero());
3838
EXPECT_GT(stats.GetMinLastWriteTime(), TInstant::Zero());
3939
} else {
40-
EXPECT_EQ(stats.GetStoreSizeBytes(), 0);
40+
EXPECT_EQ(stats.GetStoreSizeBytes(), 0u);
4141
}
4242
}
4343

@@ -62,11 +62,11 @@ class Describe : public TTopicTestFixture {
6262
const auto& description = result.GetConsumerDescription();
6363

6464
const auto& partitions = description.GetPartitions();
65-
EXPECT_EQ(partitions.size(), 1);
65+
EXPECT_EQ(partitions.size(), 1u);
6666

6767
const auto& partition = partitions[0];
6868
EXPECT_TRUE(partition.GetActive());
69-
EXPECT_EQ(partition.GetPartitionId(), 0);
69+
EXPECT_EQ(partition.GetPartitionId(), 0u);
7070

7171
if (requireStats) {
7272
const auto& stats = partition.GetPartitionStats();
@@ -75,23 +75,23 @@ class Describe : public TTopicTestFixture {
7575
EXPECT_TRUE(consumerStats);
7676

7777
if (requireNonEmptyStats) {
78-
EXPECT_GE(stats->GetStartOffset(), 0);
79-
EXPECT_GE(stats->GetEndOffset(), 0);
80-
EXPECT_GT(stats->GetStoreSizeBytes(), 0);
78+
EXPECT_GE(stats->GetStartOffset(), 0u);
79+
EXPECT_GE(stats->GetEndOffset(), 0u);
80+
EXPECT_GT(stats->GetStoreSizeBytes(), 0u);
8181
EXPECT_GT(stats->GetLastWriteTime(), TInstant::Zero());
8282
EXPECT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
83-
EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0);
84-
EXPECT_GT(stats->GetBytesWrittenPerHour(), 0);
85-
EXPECT_GT(stats->GetBytesWrittenPerDay(), 0);
83+
EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0u);
84+
EXPECT_GT(stats->GetBytesWrittenPerHour(), 0u);
85+
EXPECT_GT(stats->GetBytesWrittenPerDay(), 0u);
8686

87-
EXPECT_GT(consumerStats->GetLastReadOffset(), 0);
88-
EXPECT_GT(consumerStats->GetCommittedOffset(), 0);
89-
EXPECT_GE(consumerStats->GetReadSessionId().size(), 0);
87+
EXPECT_GT(consumerStats->GetLastReadOffset(), 0u);
88+
EXPECT_GT(consumerStats->GetCommittedOffset(), 0u);
89+
EXPECT_GE(consumerStats->GetReadSessionId().size(), 0u);
9090
EXPECT_EQ(consumerStats->GetReaderName(), "");
9191
EXPECT_GE(consumerStats->GetMaxWriteTimeLag(), TDuration::Seconds(100));
9292
} else {
93-
EXPECT_EQ(stats->GetStartOffset(), 0);
94-
EXPECT_EQ(consumerStats->GetLastReadOffset(), 0);
93+
EXPECT_EQ(stats->GetStartOffset(), 0u);
94+
EXPECT_EQ(consumerStats->GetLastReadOffset(), 0u);
9595
}
9696
}
9797

@@ -109,7 +109,7 @@ class Describe : public TTopicTestFixture {
109109
settings.IncludeStats(requireStats);
110110
settings.IncludeLocation(requireLocation);
111111

112-
std::int64_t testPartitionId = 0;
112+
std::uint64_t testPartitionId = 0;
113113

114114
{
115115
auto result = client.DescribePartition(GetTopicPath(), testPartitionId, settings).GetValueSync();
@@ -126,16 +126,16 @@ class Describe : public TTopicTestFixture {
126126
EXPECT_TRUE(stats);
127127

128128
if (requireNonEmptyStats) {
129-
EXPECT_GE(stats->GetStartOffset(), 0);
130-
EXPECT_GE(stats->GetEndOffset(), 0);
131-
EXPECT_GT(stats->GetStoreSizeBytes(), 0);
129+
EXPECT_GE(stats->GetStartOffset(), 0u);
130+
EXPECT_GE(stats->GetEndOffset(), 0u);
131+
EXPECT_GT(stats->GetStoreSizeBytes(), 0u);
132132
EXPECT_GT(stats->GetLastWriteTime(), TInstant::Zero());
133133
EXPECT_GT(stats->GetMaxWriteTimeLag(), TDuration::Zero());
134-
EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0);
135-
EXPECT_GT(stats->GetBytesWrittenPerHour(), 0);
136-
EXPECT_GT(stats->GetBytesWrittenPerDay(), 0);
134+
EXPECT_GT(stats->GetBytesWrittenPerMinute(), 0u);
135+
EXPECT_GT(stats->GetBytesWrittenPerHour(), 0u);
136+
EXPECT_GT(stats->GetBytesWrittenPerDay(), 0u);
137137
} else {
138-
EXPECT_EQ(stats->GetStoreSizeBytes(), 0);
138+
EXPECT_EQ(stats->GetStoreSizeBytes(), 0u);
139139
}
140140
}
141141

tests/integration/topic/direct_read.cpp

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ using namespace ::testing; // Google mock.
2727

2828

2929

30-
namespace NYdb::NTopic::NTests {
30+
namespace NYdb::inline V3::NTopic::NTests {
3131

3232
namespace {
3333
const char* SERVER_SESSION_ID = "server-session-id-1";
@@ -903,7 +903,7 @@ TEST_F(DirectReadWithClient, OneMessage) {
903903
ASSERT_EVENT_TYPE(*event, TReadSessionEvent::TDataReceivedEvent);
904904
auto& dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
905905
auto& messages = dataReceived.GetMessages();
906-
ASSERT_EQ(messages.size(), 1);
906+
ASSERT_EQ(messages.size(), 1u);
907907
dataReceived.Commit();
908908
}
909909

@@ -924,10 +924,6 @@ TEST_F(DirectReadWithClient, ManyMessages) {
924924
so the server sends multiple DirectReadResponses.
925925
*/
926926

927-
if (std::getenv("MANY_MESSAGES_DISABLED")) {
928-
GTEST_SKIP() << "Many messages test is disabled";
929-
}
930-
931927
DropTopic(GetTopicPath());
932928

933929
constexpr std::size_t partitionCount = 2;
@@ -940,12 +936,13 @@ TEST_F(DirectReadWithClient, ManyMessages) {
940936

941937
// Write messages to all partitions:
942938
for (std::size_t partitionId = 0; partitionId < partitionCount; ++partitionId) {
939+
std::string messageGroup = "test-message_group_id_" + std::to_string(partitionId);
943940
auto settings = TWriteSessionSettings()
944941
.Path(GetTopicPath())
945942
.Codec(ECodec::RAW)
946943
.PartitionId(partitionId)
947-
.ProducerId("test-message_group_id")
948-
.MessageGroupId("test-message_group_id");
944+
.ProducerId(messageGroup)
945+
.MessageGroupId(messageGroup);
949946

950947
auto writer = client.CreateSimpleBlockingWriteSession(settings);
951948
for (std::size_t i = 0; i < messageCount; ++i) {
@@ -1106,7 +1103,7 @@ TEST_F(DirectReadWithControlSession, StopPartitionSessionGracefully) {
11061103

11071104
EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
11081105
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
1109-
ASSERT_EQ(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1106+
ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
11101107
}));
11111108

11121109
EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
@@ -1127,14 +1124,14 @@ TEST_F(DirectReadWithControlSession, StopPartitionSessionGracefully) {
11271124
EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
11281125
.WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
11291126
ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
1130-
ASSERT_EQ(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
1127+
ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
11311128
ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
11321129
ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
11331130
}));
11341131

11351132
EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
11361133
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
1137-
ASSERT_EQ(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1134+
ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
11381135
ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
11391136
}));
11401137

@@ -1254,7 +1251,7 @@ TEST_F(DirectReadWithControlSession, StopPartitionSession) {
12541251

12551252
EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
12561253
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
1257-
ASSERT_EQ(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1254+
ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
12581255
}));
12591256

12601257
EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
@@ -1275,14 +1272,14 @@ TEST_F(DirectReadWithControlSession, StopPartitionSession) {
12751272
EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
12761273
.WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
12771274
ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
1278-
ASSERT_EQ(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
1275+
ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
12791276
ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
12801277
ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
12811278
}));
12821279

12831280
EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
12841281
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
1285-
ASSERT_EQ(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1282+
ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
12861283
ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
12871284
}));
12881285

@@ -1422,7 +1419,7 @@ TEST_F(DirectReadWithControlSession, EmptyDirectReadResponse) {
14221419

14231420
EXPECT_CALL(*setup.MockReadProcessor, OnStartPartitionSessionResponse(_))
14241421
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamReadMessage::StartPartitionSessionResponse& resp) {
1425-
ASSERT_EQ(resp.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1422+
ASSERT_EQ(static_cast<std::uint64_t>(resp.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
14261423
}));
14271424

14281425
EXPECT_CALL(*setup.MockReadProcessor, OnDirectReadAck(_))
@@ -1448,14 +1445,14 @@ TEST_F(DirectReadWithControlSession, EmptyDirectReadResponse) {
14481445
EXPECT_CALL(*setup.MockDirectReadProcessor, OnInitRequest(_))
14491446
.WillOnce(Invoke([&setup](const Ydb::Topic::StreamDirectReadMessage::InitRequest& req) {
14501447
ASSERT_EQ(req.session_id(), SERVER_SESSION_ID);
1451-
ASSERT_EQ(req.topics_read_settings_size(), setup.ReadSessionSettings.Topics_.size());
1448+
ASSERT_EQ(static_cast<std::size_t>(req.topics_read_settings_size()), setup.ReadSessionSettings.Topics_.size());
14521449
ASSERT_EQ(req.topics_read_settings(0).path(), setup.ReadSessionSettings.Topics_[0].Path_);
14531450
ASSERT_EQ(req.consumer(), setup.ReadSessionSettings.ConsumerName_);
14541451
}));
14551452

14561453
EXPECT_CALL(*setup.MockDirectReadProcessor, OnStartDirectReadPartitionSessionRequest(_))
14571454
.WillOnce(Invoke([&startPartitionSessionRequest](const Ydb::Topic::StreamDirectReadMessage::StartDirectReadPartitionSessionRequest& request) {
1458-
ASSERT_EQ(request.partition_session_id(), startPartitionSessionRequest.PartitionSessionId);
1455+
ASSERT_EQ(static_cast<std::uint64_t>(request.partition_session_id()), startPartitionSessionRequest.PartitionSessionId);
14591456
ASSERT_EQ(request.generation(), startPartitionSessionRequest.Generation);
14601457
}));
14611458
}

tests/integration/topic/local_partition.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
#include <format>
1313
#include <thread>
1414

15-
using namespace NYdb;
16-
17-
namespace NYdb::NTopic::NTests {
15+
namespace NYdb::inline V3::NTopic::NTests {
1816

1917
struct TYdbPqTestRetryState : IRetryPolicy::IRetryState {
2018
TYdbPqTestRetryState(std::function<void ()> retryCallback, std::function<void ()> destroyCallback, const TDuration& delay)
@@ -70,7 +68,7 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy {
7068
}
7169
std::this_thread::sleep_for(std::chrono::milliseconds(100));
7270
}
73-
EXPECT_EQ(CurrentRetries.load(), 0);
71+
EXPECT_EQ(CurrentRetries.load(), 0u);
7472
}
7573
auto retryCb = [this]() mutable {this->RetryDone();};
7674
auto destroyCb = [this]() mutable {this->StateDestroyed();};
@@ -207,7 +205,7 @@ class LocalPartition : public TTopicTestFixture {
207205
dataReceived->Commit();
208206

209207
auto& messages = dataReceived->GetMessages();
210-
EXPECT_EQ(messages.size(), 1);
208+
EXPECT_EQ(messages.size(), 1u);
211209
EXPECT_EQ(messages[0].GetData(), "message");
212210

213211
event = readSession->GetEvent(true);

tests/integration/topic/setup/fixture.cpp

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@
22

33
#include <ydb-cpp-sdk/client/discovery/discovery.h>
44

5-
static std::string binaryName = "";
5+
#include <util/system/execpath.h>
66

7-
namespace NYdb::NTopic::NTests {
7+
namespace NYdb::inline V3::NTopic::NTests {
88

99
void TTopicTestFixture::SetUp() {
1010
TTopicClient client(MakeDriver());
1111

1212
const testing::TestInfo* const testInfo = testing::UnitTest::GetInstance()->current_test_info();
13+
std::filesystem::path execPath(std::string{GetExecPath()});
1314

1415
std::stringstream builder;
15-
builder << std::getenv("YDB_TEST_ROOT") << "/" << binaryName << "/" << testInfo->test_suite_name() << "_" << testInfo->name();
16-
TopicPath = builder.str();
16+
builder << std::getenv("YDB_TEST_ROOT") << "/" << execPath.filename().string() << "/" << testInfo->test_suite_name() << "_" << testInfo->name();
17+
TopicPath_ = builder.str();
1718

18-
client.DropTopic(TopicPath).GetValueSync();
19-
CreateTopic(TopicPath);
19+
client.DropTopic(TopicPath_).GetValueSync();
20+
CreateTopic(TopicPath_);
2021
}
2122

2223
void TTopicTestFixture::TearDown() {
@@ -43,17 +44,17 @@ void TTopicTestFixture::CreateTopic(const std::string& path, const std::string&
4344
topics.AppendConsumers(consumers);
4445

4546
auto status = client.CreateTopic(path, topics).GetValueSync();
46-
EXPECT_TRUE(status.IsSuccess()) << ToString(status);
47+
Y_ENSURE(status.IsSuccess(), status);
4748
}
4849

4950
std::string TTopicTestFixture::GetTopicPath() {
50-
return TopicPath;
51+
return TopicPath_;
5152
}
5253

5354
void TTopicTestFixture::DropTopic(const std::string& path) {
5455
TTopicClient client(MakeDriver());
5556
auto status = client.DropTopic(path).GetValueSync();
56-
EXPECT_TRUE(status.IsSuccess()) << ToString(status);
57+
Y_ENSURE(status.IsSuccess(), status);
5758
}
5859

5960
TDriverConfig TTopicTestFixture::MakeDriverConfig() const {
@@ -69,9 +70,7 @@ TDriver TTopicTestFixture::MakeDriver() const {
6970

7071
std::uint16_t TTopicTestFixture::GetPort() const {
7172
auto endpoint = std::getenv("YDB_ENDPOINT");
72-
if (!endpoint) {
73-
throw std::runtime_error("YDB_ENDPOINT is not set");
74-
}
73+
Y_ENSURE(endpoint, "YDB_ENDPOINT is not set");
7574

7675
auto portPos = std::string(endpoint).find(":");
7776
return std::stoi(std::string(endpoint).substr(portPos + 1));
@@ -80,7 +79,7 @@ std::uint16_t TTopicTestFixture::GetPort() const {
8079
std::vector<std::uint32_t> TTopicTestFixture::GetNodeIds() const {
8180
NDiscovery::TDiscoveryClient client(MakeDriver());
8281
auto result = client.ListEndpoints().GetValueSync();
83-
EXPECT_TRUE(result.IsSuccess());
82+
Y_ENSURE(result.IsSuccess(), static_cast<TStatus>(result));
8483

8584
std::vector<std::uint32_t> nodeIds;
8685
for (const auto& endpoint : result.GetEndpointsInfo()) {
@@ -91,11 +90,3 @@ std::vector<std::uint32_t> TTopicTestFixture::GetNodeIds() const {
9190
}
9291

9392
}
94-
95-
int main(int argc, char** argv) {
96-
std::filesystem::path binaryPath(argv[0]);
97-
binaryName = binaryPath.filename().string();
98-
99-
::testing::InitGoogleTest(&argc, argv);
100-
return RUN_ALL_TESTS();
101-
}

tests/integration/topic/setup/fixture.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
#include <gtest/gtest.h>
66

7-
namespace NYdb::NTopic::NTests {
7+
namespace NYdb::inline V3::NTopic::NTests {
88

99
class TTopicTestFixture : public ::testing::Test {
1010
public:
@@ -25,7 +25,7 @@ class TTopicTestFixture : public ::testing::Test {
2525
std::uint16_t GetPort() const;
2626
std::vector<std::uint32_t> GetNodeIds() const;
2727
private:
28-
std::string TopicPath;
28+
std::string TopicPath_;
2929
};
3030

3131
}

0 commit comments

Comments
 (0)