Skip to content

Commit ffddccc

Browse files
authored
Kafka server balancing (#26299)
1 parent c0a911c commit ffddccc

36 files changed

+1324
-233
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
#pragma once
22

33
#include <ydb/core/base/path.h>
4+
#include <ydb/core/kafka_proxy/kafka_messages.h>
45
#include <ydb/core/persqueue/public/pq_rl_helpers.h>
56
#include <ydb/core/protos/config.pb.h>
67
#include <ydb/library/aclib/aclib.h>
78
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
89
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h> // strange
910

10-
#include <ydb/core/kafka_proxy/kafka_messages.h>
11-
1211
namespace NKafka {
1312

1413
static constexpr int ProxyNodeId = 1;
@@ -23,13 +22,29 @@ enum EAuthSteps {
2322
FAILED
2423
};
2524

25+
enum class EBalancingMode {
26+
Server,
27+
Native,
28+
};
29+
30+
struct TReadSession {
31+
EBalancingMode BalancingMode = EBalancingMode::Server;
32+
std::optional<EBalancingMode> PendingBalancingMode;
33+
TActorId ProxyActorId;
34+
};
35+
2636
struct TContext {
2737
using TPtr = std::shared_ptr<TContext>;
2838

2939
TContext(const NKikimrConfig::TKafkaProxyConfig& config)
3040
: Config(config) {
3141
}
3242

43+
TContext(const TContext& other)
44+
: Config(other.Config)
45+
{
46+
}
47+
3348
const NKikimrConfig::TKafkaProxyConfig& Config;
3449

3550
TActorId ConnectionId;
@@ -49,13 +64,13 @@ struct TContext {
4964
TString ClientDC;
5065
bool IsServerless = false;
5166
bool RequireAuthentication = false;
67+
TReadSession ReadSession;
5268

5369
NKikimr::NPQ::TRlContext RlContext;
5470

55-
bool Authenticated() {
5671

72+
bool Authenticated() {
5773
return !RequireAuthentication || AuthenticationStep == SUCCESS;
58-
5974
}
6075
};
6176

@@ -77,6 +92,10 @@ class TMessagePtr {
7792
return Ptr;
7893
}
7994

95+
T& operator*() const {
96+
return *Ptr;
97+
}
98+
8099
operator bool() const {
81100
return nullptr != Ptr;
82101
}
@@ -178,6 +197,7 @@ NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui
178197
const TMessagePtr<TMetadataRequestData>& message,
179198
const TActorId& discoveryCacheActor);
180199
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
200+
NActors::IActor* CreateKafkaReadSessionProxyActor(const TContext::TPtr context, ui64 cookie);
181201
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
182202
NActors::IActor* CreateKafkaBalancerActor(const TContext::TPtr context, ui64 cookie);
183203
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ void TKafkaBalancerActor::JoinStepCheckGroupState(NKqp::TEvKqp::TEvQueryResponse
351351
if (SessionTimeoutMs > MAX_SESSION_TIMEOUT_MS || SessionTimeoutMs < MIN_SESSION_TIMEOUT_MS) {
352352
SendJoinGroupResponseFail(ctx, CorrelationId,
353353
EKafkaErrors::INVALID_SESSION_TIMEOUT,
354-
"Invalid session timeout");
354+
TStringBuilder() << "Invalid session timeout " << SessionTimeoutMs);
355355
return;
356356
}
357357

ydb/core/kafka_proxy/actors/kafka_fetch_actor.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ void TKafkaFetchActor::Bootstrap(const NActors::TActorContext& ctx) {
3030

3131
void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
3232
Response->Responses.resize(FetchRequestData->Topics.size());
33-
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. DatabasePath: " << Context->DatabasePath << " MaxWaitMs: " << FetchRequestData->MaxWaitMs << " MaxBytes: " << FetchRequestData->MaxBytes);
33+
KAFKA_LOG_D(TStringBuilder() << "Fetch actor: New request. DatabasePath: " << Context->DatabasePath
34+
<< " MaxWaitMs: " << FetchRequestData->MaxWaitMs << " MaxBytes: " << FetchRequestData->MaxBytes);
3435
for (size_t topicIndex = 0; topicIndex < Response->Responses.size(); topicIndex++) {
35-
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
36-
PrepareFetchRequestData(topicIndex, partPQRequests);
36+
auto partPQRequests = PrepareFetchRequestData(topicIndex);
3737
auto ruPerRequest = topicIndex == 0 && Context->Config.GetMeteringV2Enabled();
3838
auto consumer = Context->GroupId.empty() ? NKikimr::NPQ::CLIENTID_WITHOUT_CONSUMER : Context->GroupId;
3939
NKikimr::NPQ::TFetchRequestSettings request {
@@ -55,11 +55,13 @@ void TKafkaFetchActor::SendFetchRequests(const TActorContext& ctx) {
5555
}
5656
}
5757

58-
void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<NKikimr::NPQ::TPartitionFetchRequest>& partPQRequests) {
58+
TVector<NKikimr::NPQ::TPartitionFetchRequest> TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex) {
5959
auto& topicKafkaRequest = FetchRequestData->Topics[topicIndex];
6060
TFetchResponseData::TFetchableTopicResponse& topicKafkaResponse = Response->Responses[topicIndex];
61+
6162
topicKafkaResponse.Topic = topicKafkaRequest.Topic;
6263

64+
TVector<NKikimr::NPQ::TPartitionFetchRequest> partPQRequests;
6365
partPQRequests.resize(topicKafkaRequest.Partitions.size());
6466
topicKafkaResponse.Partitions.resize(topicKafkaRequest.Partitions.size());
6567
for (size_t partIndex = 0; partIndex < topicKafkaRequest.Partitions.size(); partIndex++) {
@@ -74,6 +76,7 @@ void TKafkaFetchActor::PrepareFetchRequestData(const size_t topicIndex, TVector<
7476
partPQRequest.Offset = partKafkaRequest.FetchOffset;
7577
partPQRequest.MaxBytes = partKafkaRequest.PartitionMaxBytes;
7678
}
79+
return partPQRequests;
7780
}
7881

7982
void TKafkaFetchActor::Handle(NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, const TActorContext& ctx) {

ydb/core/kafka_proxy/actors/kafka_fetch_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class TKafkaFetchActor: public NActors::TActorBootstrapped<TKafkaFetchActor> {
3737
void Handle(NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, const TActorContext& ctx);
3838

3939
void SendFetchRequests(const TActorContext& ctx);
40-
void PrepareFetchRequestData(const size_t topicIndex, TVector<NKikimr::NPQ::TPartitionFetchRequest>& partPQRequests);
40+
TVector<NKikimr::NPQ::TPartitionFetchRequest> PrepareFetchRequestData(const size_t topicIndex);
4141
void HandleErrorResponse(const NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, TFetchResponseData::TFetchableTopicResponse& topicResponse);
4242
void HandleSuccessResponse(const NKikimr::TEvPQ::TEvFetchResponse::TPtr& ev, TFetchResponseData::TFetchableTopicResponse& topicResponse, const TActorContext& ctx);
4343
void FillRecordsBatch(const NKikimrClient::TPersQueueFetchResponse_TPartResult& partPQResponse, TKafkaRecordBatch& recordsBatch, const TActorContext& ctx);

ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ void TKafkaMetadataActor::SendCreateTopicsRequest(const TString& topicName, ui32
296296
topicToCreate.NumPartitions = Context->Config.GetTopicCreationDefaultPartitions();
297297
message->Topics.push_back(topicToCreate);
298298
TContext::TPtr ContextForTopicCreation;
299-
ContextForTopicCreation = std::make_shared<TContext>(TContext(Context->Config));
299+
ContextForTopicCreation = std::make_shared<TContext>(TContext(*Context));
300300
ContextForTopicCreation->ConnectionId = ctx.SelfID;
301301
ContextForTopicCreation->UserToken = Context->UserToken;
302302
ContextForTopicCreation->DatabasePath = Context->DatabasePath;

ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ void TKafkaOffsetFetchActor::CreateTopicIfNecessary(const TString& topicName,
522522
message->Topics.push_back(topicToCreate);
523523

524524
TContext::TPtr ContextForTopicCreation;
525-
ContextForTopicCreation = std::make_shared<TContext>(TContext(Context->Config));
525+
ContextForTopicCreation = std::make_shared<TContext>(TContext(*Context));
526526
ContextForTopicCreation->ConnectionId = ctx.SelfID;
527527
ContextForTopicCreation->UserToken = Context->UserToken;
528528
ContextForTopicCreation->DatabasePath = Context->DatabasePath;

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "kafka_read_session_actor.h"
2+
#include "kafka_read_session_utils.h"
23

34
namespace NKafka {
45
static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1);
@@ -100,7 +101,7 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
100101
<< "_" << "kafka";
101102

102103
if (!supportedProtocolFound) {
103-
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << SUPPORTED_ASSIGN_STRATEGY);
104+
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << ASSIGN_STRATEGY_ROUNDROBIN);
104105
CloseReadSession(ctx);
105106
return;
106107
}
@@ -231,7 +232,7 @@ void TKafkaReadSessionActor::SendJoinGroupResponseOk(const TActorContext&, ui64
231232
TJoinGroupResponseData::TPtr response = std::make_shared<TJoinGroupResponseData>();
232233

233234
response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL;
234-
response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY;
235+
response->ProtocolName = ASSIGN_STRATEGY_ROUNDROBIN;
235236
response->ErrorCode = EKafkaErrors::NONE_ERROR;
236237
response->GenerationId = GenerationId;
237238
response->MemberId = Session;
@@ -252,7 +253,7 @@ void TKafkaReadSessionActor::SendSyncGroupResponseOk(const TActorContext& ctx, u
252253
TSyncGroupResponseData::TPtr response = std::make_shared<TSyncGroupResponseData>();
253254

254255
response->ProtocolType = SUPPORTED_JOIN_GROUP_PROTOCOL;
255-
response->ProtocolName = SUPPORTED_ASSIGN_STRATEGY;
256+
response->ProtocolName = ASSIGN_STRATEGY_ROUNDROBIN;
256257
response->ErrorCode = EKafkaErrors::NONE_ERROR;
257258

258259
auto assignment = BuildAssignmentAndInformBalancerIfRelease(ctx);
@@ -323,17 +324,28 @@ bool TKafkaReadSessionActor::CheckHeartbeatIsExpired() {
323324
return now - LastHeartbeatTime > MaxHeartbeatTimeoutMs;
324325
}
325326

326-
bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics) {
327-
auto supportedProtocolFound = false;
328-
for (auto protocol: joinGroupRequestData->Protocols) {
329-
KAFKA_LOG_D("JOIN_GROUP assign protocol supported by client: " << protocol.Name);
330-
if (protocol.Name == SUPPORTED_ASSIGN_STRATEGY) {
331-
FillTopicsFromJoinGroupMetadata(protocol.Metadata, topics);
332-
supportedProtocolFound = true;
333-
break;
327+
bool TKafkaReadSessionActor::TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> request, THashSet<TString>& topics) {
328+
auto validProtocol = request->ProtocolType == SUPPORTED_JOIN_GROUP_PROTOCOL
329+
&& AnyOf(request->Protocols, [](const TJoinGroupRequestData::TJoinGroupRequestProtocol& p) {
330+
return p.Name == ASSIGN_STRATEGY_ROUNDROBIN || p.Name == ASSIGN_STRATEGY_SERVER;
331+
});
332+
if (!validProtocol) {
333+
return false;
334+
}
335+
336+
auto result = GetSubscriptions(*request);
337+
for (auto topic: result->Topics) {
338+
if (topic.has_value()) {
339+
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
340+
341+
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
342+
OriginalTopicNames[normalizedTopicName] = topic.value();
343+
OriginalTopicNames[normalizedTopicName + "/streamImpl"] = topic.value();
344+
topics.emplace(normalizedTopicName);
334345
}
335346
}
336-
return supportedProtocolFound;
347+
348+
return true;
337349
}
338350

339351
TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBalancerIfRelease(const TActorContext& ctx) {
@@ -376,26 +388,6 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
376388
return assignment;
377389
}
378390

379-
void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics) {
380-
TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
381-
382-
TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
383-
TKafkaReadable readable(buffer);
384-
385-
TConsumerProtocolSubscription result;
386-
result.Read(readable, version);
387-
388-
for (auto topic: result.Topics) {
389-
if (topic.has_value()) {
390-
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
391-
OriginalTopicNames[normalizedTopicName] = topic.value();
392-
OriginalTopicNames[normalizedTopicName + "/streamImpl"] = topic.value();
393-
topics.emplace(normalizedTopicName);
394-
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
395-
}
396-
}
397-
}
398-
399391
void TKafkaReadSessionActor::HandlePipeConnected(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
400392
const auto* msg = ev->Get();
401393
if (msg->Status != NKikimrProto::OK) {

ydb/core/kafka_proxy/actors/kafka_read_session_actor.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ namespace NKafka {
5252
* <----------------
5353
*/
5454

55-
static const TString SUPPORTED_ASSIGN_STRATEGY = "roundrobin";
56-
static const TString SUPPORTED_JOIN_GROUP_PROTOCOL = "consumer";
57-
5855
class TKafkaReadSessionActor: public NActors::TActorBootstrapped<TKafkaReadSessionActor> {
5956

6057
enum EReadSessionSteps {
@@ -157,7 +154,6 @@ struct TNextRequestError {
157154
bool CheckHeartbeatIsExpired();
158155
bool CheckTopicsListAreChanged(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData);
159156
bool TryFillTopicsToRead(const TMessagePtr<TJoinGroupRequestData> joinGroupRequestData, THashSet<TString>& topics);
160-
void FillTopicsFromJoinGroupMetadata(TKafkaBytes& metadata, THashSet<TString>& topics);
161157

162158
private:
163159
const TContext::TPtr Context;

0 commit comments

Comments
 (0)