Skip to content

Commit 1edb570

Browse files
Fix reading messages from Kafka API in no authentication mode (#13357)
1 parent 1ad8704 commit 1edb570

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

ydb/core/kafka_proxy/actors/actors.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ struct TContext {
5151

5252
NKikimr::NPQ::TRlContext RlContext;
5353

54-
bool Authenticated() { return AuthenticationStep == SUCCESS; }
54+
bool Authenticated() {
55+
return !RequireAuthentication || AuthenticationStep == SUCCESS;
56+
}
5557
};
5658

5759
template<std::derived_from<TApiMessage> T>

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
339339
if (Request->Header.ClientId.has_value() && Request->Header.ClientId != "") {
340340
Context->KafkaClient = Request->Header.ClientId.value();
341341
}
342-
342+
343343
switch (Request->Header.RequestApiKey) {
344344
case PRODUCE:
345345
HandleMessage(&Request->Header, Cast<TProduceRequestData>(Request), ctx);
@@ -627,10 +627,12 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
627627
case INFLIGTH_CHECK:
628628
if (!Context->Authenticated() && !PendingRequestsQueue.empty()) {
629629
// Allow only one message to be processed at a time for non-authenticated users
630+
KAFKA_LOG_ERROR("DoRead: failed inflight check: there are " << PendingRequestsQueue.size() << " pending requests and user is not authnicated. Only one paraller request is allowed for a non-authenticated user.");
630631
return true;
631632
}
632633
if (InflightSize + Request->ExpectedSize > Context->Config.GetMaxInflightSize()) {
633634
// We limit the size of processed messages so as not to exceed the size of available memory
635+
KAFKA_LOG_ERROR("DoRead: failed inflight check: InflightSize + Request->ExpectedSize=" << InflightSize + Request->ExpectedSize << " > Context->Config.GetMaxInflightSize=" << Context->Config.GetMaxInflightSize());
634636
return true;
635637
}
636638
InflightSize += Request->ExpectedSize;
@@ -713,12 +715,9 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
713715
}
714716

715717
bool RequireAuthentication(EApiKey apiKey) {
716-
bool configuredToAuthenticate = NKikimr::AppData()->EnforceUserTokenRequirement;
717-
bool apiKeyRequiresAuthentication = !(EApiKey::API_VERSIONS == apiKey ||
718-
EApiKey::SASL_HANDSHAKE == apiKey ||
719-
EApiKey::SASL_AUTHENTICATE == apiKey);
720-
721-
return configuredToAuthenticate && apiKeyRequiresAuthentication;
718+
return !(EApiKey::API_VERSIONS == apiKey ||
719+
EApiKey::SASL_HANDSHAKE == apiKey ||
720+
EApiKey::SASL_AUTHENTICATE == apiKey);
722721
}
723722

724723
void HandleConnected(TEvPollerReady::TPtr event, const TActorContext& ctx) {

0 commit comments

Comments
 (0)