Skip to content

Commit 67a0e68

Browse files
authored
Fixed response code for kafka fetch request (#26245)
2 parents e9cdc65 + 1fabba3 commit 67a0e68

File tree

12 files changed

+756
-429
lines changed

12 files changed

+756
-429
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ inline EKafkaErrors ConvertErrorCode(NPersQueue::NErrorCode::EErrorCode code) {
120120
return EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
121121
case NPersQueue::NErrorCode::EErrorCode::READ_TIMEOUT:
122122
return EKafkaErrors::REQUEST_TIMED_OUT;
123+
case NPersQueue::NErrorCode::EErrorCode::READ_NOT_DONE:
124+
return EKafkaErrors::NONE_ERROR;
125+
case NPersQueue::NErrorCode::EErrorCode::TABLET_PIPE_DISCONNECTED:
126+
return EKafkaErrors::NOT_LEADER_OR_FOLLOWER;
123127
default:
124128
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
125129
}

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,18 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3535
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
3636
PrepareFetchRequestData(topicIndex, partPQRequests);
3737
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
38-
NKikimr::NPQ::TFetchRequestSettings request(Context->DatabasePath, partPQRequests, FetchRequestData->MaxWaitMs, FetchRequestData->MaxBytes, Context->RlContext, Context->UserToken, 0, ruPerRequest);
38+
auto consumer = Context->GroupId.empty() ? NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER : Context->GroupId;
39+
NKikimr::NPQ::TFetchRequestSettings request {
40+
.Database = Context->DatabasePath,
41+
.Consumer = consumer,
42+
.Partitions = partPQRequests,
43+
.MaxWaitTimeMs = FetchRequestData->MaxWaitMs < 0 ? 1000u : FetchRequestData->MaxWaitMs,
44+
.TotalMaxBytes = FetchRequestData->MaxBytes < 0 ? 8_MB : FetchRequestData->MaxBytes,
45+
.RuPerRequest = ruPerRequest,
46+
.RequestId = 0,
47+
.RlCtx = Context->RlContext,
48+
.UserToken = Context->UserToken
49+
};
3950
auto fetchActor = NKikimr::NPQ::CreatePQFetchRequestActor(request, NKikimr::MakeSchemeCacheID(), ctx.SelfID);
4051
auto actorId = ctx.Register(fetchActor);
4152
PendingResponses++;
@@ -62,7 +73,6 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
6273
partPQRequest.Partition = partKafkaRequest.Partition;
6374
partPQRequest.Offset = partKafkaRequest.FetchOffset;
6475
partPQRequest.MaxBytes = partKafkaRequest.PartitionMaxBytes;
65-
partPQRequest.ClientId = Context->GroupId.empty() ? NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER : Context->GroupId;
6676
}
6777
}
6878

ydb/core/kafka_proxy/actors/kafka_fetch_actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
#include <ydb/core/persqueue/events/internal.h>
1010
#include <ydb/library/aclib/aclib.h>
1111

12-
1312
namespace NKafka {
1413

14+
struct TestAccessor;
15+
1516
class TKafkaFetchActor: public NActors::TActorBootstrapped<TKafkaFetchActor> {
17+
friend struct TestAccessor;
1618
public:
1719
TKafkaFetchActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TFetchRequestData>& message)
1820
: Context(context)

ydb/core/kafka_proxy/ut/actors_ut.cpp

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
#include <ydb/core/base/statestorage.h>
88
#include <ydb/core/kafka_proxy/kafka_messages.h>
99
#include <ydb/core/kafka_proxy/actors/actors.h>
10-
#include <ydb/core/kafka_proxy/actors/kafka_metadata_actor.h>
1110
#include <ydb/core/kafka_proxy/actors/kafka_describe_configs_actor.h>
11+
#include <ydb/core/kafka_proxy/actors/kafka_fetch_actor.h>
12+
#include <ydb/core/kafka_proxy/actors/kafka_metadata_actor.h>
1213
#include <ydb/core/discovery/discovery.h>
1314

1415

@@ -87,13 +88,24 @@ TMetarequestTestParams SetupServer(const TString shortTopicName, bool serverless
8788
serverSettings.AppConfig->MutableKafkaProxyConfig()->MutableProxy()->SetPort(FAKE_SERVERLESS_KAFKA_PROXY_PORT);
8889
}
8990
NPersQueue::TTestServer server(serverSettings, true, {}, NActors::NLog::PRI_INFO, pm);
91+
server.EnableLogs({NKikimrServices::PERSQUEUE, NKikimrServices::PQ_FETCH_REQUEST});
9092

9193
server.AnnoyingClient->CreateTopic(fullTopicName, 1);
9294
server.WaitInit(shortTopicName);
9395

9496
return {std::move(server), kafkaPort, serverSettings.AppConfig->GetKafkaProxyConfig(), fullTopicName};
9597
}
9698

99+
namespace NKafka {
100+
101+
struct TestAccessor {
102+
static std::unordered_map<TActorId, size_t> GetTopicIndexes(TKafkaFetchActor* actor) {
103+
return actor->TopicIndexes;
104+
}
105+
};
106+
107+
}
108+
97109
namespace NKafka::NTests {
98110
Y_UNIT_TEST_SUITE(DiscoveryIsNotBroken) {
99111
void CheckEndpointsInDiscovery(bool withSsl, bool expectKafkaEndpoints) {
@@ -277,7 +289,6 @@ namespace NKafka::NTests {
277289
CheckKafkaMetaResponse(runtime, kafkaPort);
278290
}
279291

280-
281292
Y_UNIT_TEST(DiscoveryResponsesWithError) {
282293
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
283294

@@ -467,6 +478,77 @@ namespace NKafka::NTests {
467478
}
468479
}
469480

481+
Y_UNIT_TEST_SUITE(FetchActorTests) {
482+
std::pair<TActorId, NKafka::TKafkaFetchActor*> CreateFetchActor(
483+
const TActorId& edge, const TString& topic, auto* runtime, const auto& kafkaConfig
484+
) {
485+
TFetchRequestData::TPtr request = std::make_shared<TFetchRequestData>();
486+
request->MaxBytes = 10000;
487+
request->MaxWaitMs = 1000;
488+
request->Topics.resize(1);
489+
request->Topics[0].Topic = topic;
490+
request->Topics[0].Partitions.resize(1);
491+
request->Topics[0].Partitions[0].Partition = 0;
492+
request->Topics[0].Partitions[0].PartitionMaxBytes = 10000;
493+
494+
auto context = std::make_shared<TContext>(kafkaConfig);
495+
context->ConnectionId = edge;
496+
context->DatabasePath = "/Root";
497+
context->ResourceDatabasePath = "/Root";
498+
context->UserToken = new NACLib::TUserToken("root@builtin", {});
499+
500+
auto* actor = new NKafka::TKafkaFetchActor(context, 1, TMessagePtr<TFetchRequestData>(std::make_shared<TBuffer>(), request));
501+
TActorId actorId = runtime->Register(actor);
502+
runtime->EnableScheduleForActor(actorId);
503+
return {actorId, actor};
504+
}
505+
506+
Y_UNIT_TEST(FetchWithNoneData) {
507+
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
508+
509+
auto* runtime = server.GetRuntime();
510+
auto edge = runtime->AllocateEdgeActor();
511+
512+
CreateFetchActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, config);
513+
514+
TAutoPtr<IEventHandle> handle;
515+
auto* ev = runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(handle);
516+
UNIT_ASSERT(ev);
517+
auto response = dynamic_cast<TFetchResponseData*>(ev->Response.get());
518+
519+
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
520+
UNIT_ASSERT_VALUES_EQUAL(response->Responses.size(), 1);
521+
UNIT_ASSERT_VALUES_EQUAL(response->Responses[0].Partitions.size(), 1);
522+
UNIT_ASSERT_VALUES_EQUAL(response->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
523+
}
524+
525+
Y_UNIT_TEST(FetchWithTimeout) {
526+
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");
527+
528+
auto* runtime = server.GetRuntime();
529+
auto edge = runtime->AllocateEdgeActor();
530+
531+
auto [actorId, actor] = CreateFetchActor(edge, {NKikimr::JoinPath({"/Root/PQ/", topicName})}, runtime, config);
532+
Sleep(TDuration::MilliSeconds(100)); // wait actor willbe created
533+
534+
// emulate pipe error
535+
auto topicIndexes = TestAccessor::GetTopicIndexes(actor);
536+
UNIT_ASSERT(topicIndexes.size() == 1);
537+
auto fetchActorId = topicIndexes.begin()->first;
538+
runtime->Send(fetchActorId, fetchActorId, new TEvents::TEvWakeup(1000));
539+
540+
TAutoPtr<IEventHandle> handle;
541+
auto* ev = runtime->GrabEdgeEvent<TEvKafka::TEvResponse>(handle);
542+
UNIT_ASSERT(ev);
543+
auto response = dynamic_cast<TFetchResponseData*>(ev->Response.get());
544+
545+
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
546+
UNIT_ASSERT_VALUES_EQUAL(response->Responses.size(), 1);
547+
UNIT_ASSERT_VALUES_EQUAL(response->Responses[0].Partitions.size(), 1);
548+
UNIT_ASSERT_VALUES_EQUAL(response->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
549+
}
550+
}
551+
470552
Y_UNIT_TEST_SUITE(RequestUtilityActors) {
471553
Y_UNIT_TEST(DescribeConfigs) {
472554
auto [server, kafkaPort, config, topicName] = SetupServer("topic1");

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <ydb/core/kafka_proxy/kafka_constants.h>
1313
#include <ydb/core/kafka_proxy/actors/actors.h>
1414
#include <ydb/core/kafka_proxy/kafka_transactional_producers_initializers.h>
15+
#include <ydb/core/persqueue/events/global.h>
1516
#include <ydb/core/persqueue/public/constants.h>
1617
#include <ydb/services/ydb/ydb_common_ut.h>
1718
#include <ydb/services/ydb/ydb_keys_ut.h>
@@ -1349,6 +1350,35 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
13491350
}
13501351
} // Y_UNIT_TEST(FetchScenarioWithJoinGroup)
13511352

1353+
Y_UNIT_TEST(FetchEmptyTopicScenario) {
1354+
TInsecureTestServer testServer("FetchEmptyTopicScenario");
1355+
1356+
TString protocolName = "roundrobin";
1357+
1358+
TString topicName = "/Root/topic-0-test";
1359+
TString group = "group-0-test";
1360+
1361+
1362+
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
1363+
CreateTopic(pqClient, topicName, 1, { group });
1364+
1365+
TKafkaTestClient client(testServer.Port);
1366+
1367+
client.AuthenticateToKafka();
1368+
1369+
{
1370+
// Check FETCH
1371+
std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
1372+
auto msg = client.Fetch(topics);
1373+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1374+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
1375+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
1376+
// To protect the clients from failing due to null records,
1377+
// Java SDK always convert null records to MemoryRecords.EMPTY
1378+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), false);
1379+
}
1380+
} // Y_UNIT_TEST(FetchEmptyTopicScenario)
1381+
13521382
void RunBalanceScenarionTest(bool forFederation) {
13531383
TString protocolName = "roundrobin";
13541384
TInsecureTestServer testServer("2", false, false);
@@ -3091,7 +3121,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
30913121
auto alteredTopic = TTopicConfig(
30923122
shortTopic0Name,
30933123
1,
3094-
std::to_string(365 * 24 * 60 * 60 * 1000ul),
3124+
std::to_string(TDuration::Days(365).MilliSeconds()),
30953125
std::nullopt
30963126
);
30973127
auto msg = client.AlterConfigs({alteredTopic});

0 commit comments

Comments
 (0)