Skip to content

Commit 1fabba3

Browse files
committed
Improvements of #25843 (#26164)
1 parent 6d9ea7e commit 1fabba3

File tree

7 files changed

+404
-280
lines changed

7 files changed

+404
-280
lines changed

ydb/core/persqueue/public/fetcher/fetch_request_actor.cpp

Lines changed: 227 additions & 235 deletions
Large diffs are not rendered by default.

ydb/core/persqueue/public/fetcher/fetch_request_actor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ struct TFetchRequestSettings {
2929
TString Database;
3030
TString Consumer;
3131
TVector<TPartitionFetchRequest> Partitions;
32-
ui64 MaxWaitTimeMs;
33-
ui64 TotalMaxBytes;
32+
ui64 MaxWaitTimeMs = 0;
33+
ui64 TotalMaxBytes = 0;
3434

3535
bool RuPerRequest = false;
3636
ui64 RequestId = 0;

ydb/core/persqueue/public/fetcher/fetch_request_ut.cpp

Lines changed: 150 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
#include <ydb/core/persqueue/events/internal.h>
55
#include <ydb/core/testlib/tenant_runtime.h>
66
#include <ydb/core/tx/scheme_board/cache.h>
7-
#include <ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h>
7+
#include <ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h>
88

99
namespace NKikimr::NPQ {
1010
using namespace NPersQueue;
1111
//using namespace NYdb::NTopic;
12-
using namespace NYdb::NPersQueue::NTests;
12+
using namespace NYdb::NTopic::NTests;
1313

1414

1515
Y_UNIT_TEST_SUITE(TFetchRequestTests) {
@@ -26,7 +26,7 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
2626
}
2727

2828
Y_UNIT_TEST(HappyWay) {
29-
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
29+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
3030
setup->GetServer().EnableLogs(
3131
{ NKikimrServices::TX_PROXY_SCHEME_CACHE, NKikimrServices::PQ_FETCH_REQUEST },
3232
NActors::NLog::PRI_DEBUG
@@ -36,28 +36,18 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
3636

3737
ui32 totalPartitions = 5;
3838
setup->CreateTopic("topic1", "dc1", totalPartitions);
39-
setup->CreateTopic("topic2", "dc1", totalPartitions);
40-
auto pqClient = setup->GetPersQueueClient();
41-
auto settings1 = TWriteSessionSettings().Path("topic1").MessageGroupId("src-id").PartitionGroupId(2);
42-
auto settings2 = TWriteSessionSettings().Path("topic2").MessageGroupId("src-id").PartitionGroupId(4);
43-
auto ws1 = pqClient.CreateSimpleBlockingWriteSession(settings1);
44-
auto ws2 = pqClient.CreateSimpleBlockingWriteSession(settings2);
45-
46-
ws1->Write("Data 1-1");
47-
ws1->Write("Data 1-2");
48-
ws1->Write("Data 1-3");
49-
50-
ws2->Write("Data 2-1");
51-
ws2->Write("Data 2-2");
52-
53-
ws1->Close();
54-
ws2->Close();
39+
setup->Write("/Root/topic1", "Data 1-1", 1);
40+
setup->Write("/Root/topic1", "Data 1-2", 1);
41+
setup->Write("/Root/topic1", "Data 1-3", 1);
5542

43+
setup->CreateTopic("topic2", "dc1", totalPartitions);
44+
setup->Write("/Root/topic1", "Data 2-1", 3);
45+
setup->Write("/Root/topic1", "Data 2-2", 3);
5646

5747
auto edgeId = runtime.AllocateEdgeActor();
58-
TPartitionFetchRequest p1{"Root/PQ/rt3.dc1--topic1", 1, 1, 10000};
59-
TPartitionFetchRequest p2{"Root/PQ/rt3.dc1--topic2", 3, 0, 10000};
60-
TPartitionFetchRequest pbad{"Root/PQ/rt3.dc1--topic2", 2, 1, 10000};
48+
TPartitionFetchRequest p1{"/Root/topic1", 1, 1, 10000};
49+
TPartitionFetchRequest p2{"/Root/topic2", 3, 0, 10000};
50+
TPartitionFetchRequest pbad{"/Root/topic2", 2, 1, 10000};
6151

6252
TFetchRequestSettings settings{{}, NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER, {p1, p2, pbad}, 1000, 1000};
6353
auto fetchId = runtime.Register(CreatePQFetchRequestActor(settings, MakeSchemeCacheID(), edgeId));
@@ -68,25 +58,143 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
6858
UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SUCCESS, ev->Message);
6959
Cerr << "Got event: " << ev->Response.DebugString() << Endl;
7060
UNIT_ASSERT_VALUES_EQUAL(ev->Response.PartResultSize(), 3);
71-
for (const auto& part : ev->Response.GetPartResult()) {
72-
if (part.GetTopic().Contains("topic1")) {
73-
UNIT_ASSERT(part.GetReadResult().GetErrorCode() == NPersQueue::NErrorCode::EErrorCode::OK);
74-
UNIT_ASSERT_VALUES_EQUAL(part.GetPartition(), 1);
75-
UNIT_ASSERT_VALUES_EQUAL(part.GetReadResult().GetResult(0).GetOffset(), 1);
76-
} else {
77-
UNIT_ASSERT(part.GetTopic().Contains("topic2"));
78-
if (part.GetPartition() == 2) {
79-
UNIT_ASSERT(part.GetReadResult().GetErrorCode() != NPersQueue::NErrorCode::EErrorCode::OK);
80-
} else {
81-
UNIT_ASSERT_VALUES_EQUAL(part.GetPartition(), 3);
82-
UNIT_ASSERT_VALUES_EQUAL(part.GetReadResult().GetResult(0).GetOffset(), 0);
83-
}
84-
}
61+
62+
{
63+
auto& result = ev->Response.GetPartResult(0);
64+
UNIT_ASSERT_VALUES_EQUAL(result.GetTopic(), "/Root/topic1");
65+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 1);
66+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
67+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::OK));
68+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 3);
69+
}
70+
71+
{
72+
auto& result = ev->Response.GetPartResult(1);
73+
UNIT_ASSERT_VALUES_EQUAL(result.GetTopic(), "/Root/topic2");
74+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 3);
75+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
76+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::OK));
77+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 0);
78+
}
79+
80+
{
81+
auto& result = ev->Response.GetPartResult(2);
82+
UNIT_ASSERT_VALUES_EQUAL(result.GetTopic(), "/Root/topic2");
83+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 2);
84+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
85+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::READ_ERROR_TOO_BIG_OFFSET));
86+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 0);
87+
}
88+
}
89+
90+
Y_UNIT_TEST(SmallBytesRead) {
91+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
92+
setup->GetServer().EnableLogs(
93+
{ NKikimrServices::TX_PROXY_SCHEME_CACHE, NKikimrServices::PQ_FETCH_REQUEST },
94+
NActors::NLog::PRI_DEBUG
95+
);
96+
auto& runtime = setup->GetRuntime();
97+
StartSchemeCache(runtime);
98+
99+
setup->CreateTopic("topic1", "dc1", 2);
100+
setup->Write("/Root/topic1", TString(2_KB, 'a'), 0);
101+
102+
TPartitionFetchRequest p1 {"/Root/topic1", 0, 0, 1_KB};
103+
TPartitionFetchRequest p2 {"/Root/topic1", 1, 0, 1_KB};
104+
105+
TFetchRequestSettings settings{
106+
.Database = {},
107+
.Consumer = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER,
108+
.Partitions = {p1, p2},
109+
.MaxWaitTimeMs = 1000,
110+
.TotalMaxBytes = 100
111+
};
112+
113+
auto edgeId = runtime.AllocateEdgeActor();
114+
auto fetchActorId = runtime.Register(CreatePQFetchRequestActor(settings, MakeSchemeCacheID(), edgeId));
115+
runtime.EnableScheduleForActor(fetchActorId);
116+
runtime.DispatchEvents();
117+
118+
auto ev = runtime.GrabEdgeEvent<TEvPQ::TEvFetchResponse>();
119+
Cerr << ev->Response.DebugString() << Endl;
120+
UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SUCCESS, ev->Message);
121+
122+
UNIT_ASSERT_VALUES_EQUAL(ev->Response.PartResultSize(), 2);
123+
124+
{
125+
auto& result = ev->Response.GetPartResult(0);
126+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 0);
127+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
128+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::OK));
129+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 1);
130+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().ResultSize(), 1);
131+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetResult(0).GetUncompressedSize(), 2_KB);
132+
}
133+
134+
{
135+
auto& result = ev->Response.GetPartResult(1);
136+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 1);
137+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
138+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::READ_NOT_DONE));
139+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 0);
140+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().ResultSize(), 0);
141+
}
142+
}
143+
144+
Y_UNIT_TEST(EmptyTopic) {
145+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
146+
setup->GetServer().EnableLogs(
147+
{ NKikimrServices::TX_PROXY_SCHEME_CACHE, NKikimrServices::PQ_FETCH_REQUEST },
148+
NActors::NLog::PRI_DEBUG
149+
);
150+
auto& runtime = setup->GetRuntime();
151+
StartSchemeCache(runtime);
152+
153+
setup->CreateTopic("topic1", "dc1", 2);
154+
155+
TPartitionFetchRequest p1 {"/Root/topic1", 0, 0, 1_KB};
156+
TPartitionFetchRequest p2 {"/Root/topic1", 1, 0, 1_KB};
157+
158+
TFetchRequestSettings settings{
159+
.Database = {},
160+
.Consumer = NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER,
161+
.Partitions = {p1, p2},
162+
.MaxWaitTimeMs = 100,
163+
.TotalMaxBytes = 100
164+
};
165+
166+
auto edgeId = runtime.AllocateEdgeActor();
167+
auto fetchActorId = runtime.Register(CreatePQFetchRequestActor(settings, MakeSchemeCacheID(), edgeId));
168+
runtime.EnableScheduleForActor(fetchActorId);
169+
runtime.DispatchEvents();
170+
171+
auto ev = runtime.GrabEdgeEvent<TEvPQ::TEvFetchResponse>();
172+
Cerr << ev->Response.DebugString() << Endl;
173+
UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SUCCESS, ev->Message);
174+
175+
UNIT_ASSERT_VALUES_EQUAL(ev->Response.PartResultSize(), 2);
176+
177+
{
178+
auto& result = ev->Response.GetPartResult(0);
179+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 0);
180+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
181+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::READ_NOT_DONE));
182+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 0);
183+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().ResultSize(), 0);
184+
}
185+
186+
{
187+
auto& result = ev->Response.GetPartResult(1);
188+
UNIT_ASSERT_VALUES_EQUAL(result.GetPartition(), 1);
189+
UNIT_ASSERT_VALUES_EQUAL(NPersQueue::NErrorCode::EErrorCode_Name(result.GetReadResult().GetErrorCode()),
190+
NPersQueue::NErrorCode::EErrorCode_Name(NPersQueue::NErrorCode::EErrorCode::READ_NOT_DONE));
191+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().GetMaxOffset(), 0);
192+
UNIT_ASSERT_VALUES_EQUAL(result.GetReadResult().ResultSize(), 0);
85193
}
86194
}
87195

88196
Y_UNIT_TEST(BadTopicName) {
89-
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
197+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
90198
auto& runtime = setup->GetRuntime();
91199
StartSchemeCache(runtime);
92200

@@ -96,8 +204,8 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
96204
setup->CreateTopic("topic1", "dc1", totalPartitions);
97205

98206
auto edgeId = runtime.AllocateEdgeActor();
99-
TPartitionFetchRequest p1{"Root/PQ/rt3.dc1--topic1", 1, 1, 10000};
100-
TPartitionFetchRequest p2{"Root/PQ/rt3.dc1--topic2", 3, 0, 10000};
207+
TPartitionFetchRequest p1{"/Root/topic1", 1, 1, 10000};
208+
TPartitionFetchRequest p2{"/Root/topic2", 3, 0, 10000};
101209

102210
TFetchRequestSettings settings{{}, NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER, {p1, p2}, 1000, 1000};
103211
auto fetchId = runtime.Register(CreatePQFetchRequestActor(settings, MakeSchemeCacheID(), edgeId));
@@ -108,7 +216,7 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
108216
}
109217

110218
Y_UNIT_TEST(CheckAccess) {
111-
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
219+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
112220
auto& runtime = setup->GetRuntime();
113221
runtime.SetLogPriority(NKikimrServices::PQ_FETCH_REQUEST, NActors::NLog::EPriority::PRI_DEBUG);
114222
StartSchemeCache(runtime);
@@ -117,12 +225,12 @@ Y_UNIT_TEST_SUITE(TFetchRequestTests) {
117225
setup->CreateTopic("topic1", "dc1", totalPartitions);
118226

119227
auto edgeId = runtime.AllocateEdgeActor();
120-
TPartitionFetchRequest p1{"Root/PQ/rt3.dc1--topic1", 1, 1, 10000};
228+
TPartitionFetchRequest p1{"/Root/topic1", 1, 1, 10000};
121229

122230
{
123231
NACLib::TDiffACL acl;
124232
acl.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@staff");
125-
setup->GetServer().AnnoyingClient->ModifyACL("/Root/PQ", "rt3.dc1--topic1", acl.SerializeAsString());
233+
setup->GetServer().AnnoyingClient->ModifyACL("/Root", "topic1", acl.SerializeAsString());
126234

127235
auto goodToken = MakeIntrusiveConst<NACLib::TUserToken>("user1@staff", TVector<TString>{});
128236
TFetchRequestSettings settings{

ydb/core/persqueue/public/fetcher/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ SRCS(
99
PEERDIR(
1010
library/cpp/testing/unittest
1111
ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils
12+
ydb/public/sdk/cpp/src/client/topic/ut/ut_utils
1213

1314
)
1415

ydb/public/sdk/cpp/src/client/persqueue_public/ut/ut_utils/ut_utils.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,19 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
9797
.ClusterDiscoveryMode(EClusterDiscoveryMode::On);
9898
return settings;
9999
}
100+
101+
void Write(const TString& topic, ui32 partitionId, const TString& data) {
102+
auto settings = TWriteSessionSettings()
103+
.Path(topic)
104+
.MessageGroupId("src-id")
105+
.PartitionGroupId(partitionId)
106+
.Codec(ECodec::RAW);
107+
auto writeSession = GetPersQueueClient().CreateSimpleBlockingWriteSession(settings);
108+
109+
writeSession->Write(data);
110+
111+
writeSession->Close();
112+
}
100113
};
101114

102115
struct TYDBClientEventLoop : public ::NPersQueue::IClientEventLoop {

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,16 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const std::string& nam
5656
void TTopicSdkTestSetup::Write(const std::string& message, std::uint32_t partitionId,
5757
const std::optional<std::string> producer,
5858
std::optional<std::uint64_t> seqNo) {
59+
Write(GetTopicPath(), message, partitionId, producer, seqNo);
60+
}
61+
62+
void TTopicSdkTestSetup::Write(const std::string& topic, const std::string& message, std::uint32_t partitionId,
63+
const std::optional<std::string> producer,
64+
std::optional<std::uint64_t> seqNo) {
5965
TTopicClient client(MakeDriver());
6066

6167
TWriteSessionSettings settings;
62-
settings.Path(GetTopicPath());
68+
settings.Path(topic);
6369
settings.PartitionId(partitionId);
6470
settings.DeduplicationEnabled(producer.has_value());
6571
if (producer) {

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class TTopicSdkTestSetup : public ITopicTestSetup {
3232
const std::optional<std::string> producer = std::nullopt,
3333
std::optional<std::uint64_t> seqNo = std::nullopt);
3434

35+
void Write(const std::string& topic, const std::string& message, std::uint32_t partitionId = 0,
36+
const std::optional<std::string> producer = std::nullopt,
37+
std::optional<std::uint64_t> seqNo = std::nullopt);
38+
3539
struct TReadResult {
3640
std::shared_ptr<IReadSession> Reader;
3741
bool Timeout;

0 commit comments

Comments
 (0)