Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ Changelog
0.13.0 (????-??-??)
===================

New features:

* Allow SASL Connections to Periodically Re-Authenticate (`KIP-368`_) (pr #1105 by @kprzybyla)

.. _KIP-368: https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate


Improved Documentation:

* Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times``
Expand Down
121 changes: 121 additions & 0 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@

READER_LIMIT = 2**16
SASL_QOP_AUTH = 1
SASL_REQUEST_API_KEYS = frozenset(
request.API_KEY for request in (*SaslHandShakeRequest, *SaslAuthenticateRequest)
)


class CloseReason(IntEnum):
Expand Down Expand Up @@ -116,6 +119,24 @@ async def create_conn(
return conn


def calculate_sasl_reauthentication_time(session_lifetime_ms: int) -> int:
"""
Calculates the SASL session re-authentication time following
the Java Kafka implementation from SaslClientAuthenticator.java
ReauthInfo#setAuthenticationEndAndSessionReauthenticationTimes.

The re-authentication factor is calculated by choosing random value
between 0.85 and 0.95, which accounts for both network latency and clock
drift as well as potential jitter which may cause re-authentication storm
across many channels simultaneously.
"""

reauthentication_time_factor: float = random.uniform(0.85, 0.95)
expiration_time: float = (session_lifetime_ms * reauthentication_time_factor) / 1000

return int(expiration_time)


class AIOKafkaProtocol(asyncio.StreamReaderProtocol):
def __init__(self, closed_fut, *args, loop, **kw):
self._closed_fut = closed_fut
Expand Down Expand Up @@ -184,6 +205,9 @@ def __init__(
self._sasl_kerberos_service_name = sasl_kerberos_service_name
self._sasl_kerberos_domain_name = sasl_kerberos_domain_name
self._sasl_oauth_token_provider = sasl_oauth_token_provider
self._sasl_reauthentication_task = None
self._sasl_reauthentication_done = asyncio.Event()
self._sasl_reauthentication_done.set()

# Version hint is the version determined by initial client bootstrap
self._version_hint = version_hint
Expand Down Expand Up @@ -358,6 +382,16 @@ async def _do_sasl_handshake(self):
raise exc
auth_bytes = resp.sasl_auth_bytes

if (
hasattr(resp, "session_lifetime_ms")
and resp.session_lifetime_ms != 0
):
self._sasl_reauthentication_task = (
self._create_sasl_reauthentication_task(
resp.session_lifetime_ms
)
)

if self._sasl_mechanism == "GSSAPI":
log.info("Authenticated as %s via GSSAPI", self.sasl_principal)
elif self._sasl_mechanism == "OAUTHBEARER":
Expand All @@ -369,6 +403,69 @@ async def _do_sasl_handshake(self):
self._sasl_mechanism,
)

def _create_sasl_reauthentication_task(
self, session_lifetime_ms: int
) -> asyncio.Task:
self_ref = weakref.ref(self)
timeout = calculate_sasl_reauthentication_time(session_lifetime_ms)

log.info(
"SASL re-authentication required after %ds for connection %s:%s",
timeout,
self._host,
self._port,
)

sasl_reauthentication_task = create_task(
self._sasl_reauthentication(self_ref, timeout)
)
sasl_reauthentication_task.add_done_callback(
functools.partial(self._on_sasl_reauthentication_task_error, self_ref)
)

return sasl_reauthentication_task

@staticmethod
async def _sasl_reauthentication(
self_ref: weakref.ReferenceType["AIOKafkaConnection"],
sasl_reauthentication_time: int,
) -> None:
self = self_ref()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The weak reference pattern is defeated by acquiring self before the sleep. Consider moving await asyncio.sleep() before getting a strong ref to self. Or explain why we should keep the strong ref while sleeping


if self is None:
return

await asyncio.sleep(sasl_reauthentication_time)
self._sasl_reauthentication_done.clear()

await self._do_sasl_handshake()
self._sasl_reauthentication_done.set()

log.info(
"SASL re-authentication complete for connection %s:%s",
self._host,
self._port,
)

@staticmethod
def _on_sasl_reauthentication_task_error(
self_ref: weakref.ReferenceType["AIOKafkaConnection"],
sasl_reauthentication_task: asyncio.Task,
) -> None:
if sasl_reauthentication_task.cancelled():
return

try:
sasl_reauthentication_task.result()
except BaseException as exc:
if not isinstance(exc, (OSError, EOFError, ConnectionError)):
log.exception("Unexpected exception in AIOKafkaConnection")

self = self_ref()

if self is not None:
self.close(reason=CloseReason.AUTH_FAILURE, exc=exc)

def authenticator_plain(self):
return SaslPlainAuthenticator(
loop=self._loop,
Expand Down Expand Up @@ -458,6 +555,18 @@ def send(self, request, expect_response=True):
f"No connection to broker at {self._host}:{self._port}"
)

if (
self._sasl_reauthentication_done.is_set()
or request.API_KEY in SASL_REQUEST_API_KEYS
):
return self._send(request=request, expect_response=expect_response)

return self._send_after_sasl_reauthentication(
request=request,
expect_response=expect_response,
)

def _send(self, request, expect_response=True):
correlation_id = self._next_correlation_id()
header = request.build_request_header(
correlation_id=correlation_id, client_id=self._client_id
Expand All @@ -482,6 +591,11 @@ def send(self, request, expect_response=True):
)
return wait_for(fut, self._request_timeout)

async def _send_after_sasl_reauthentication(self, request, expect_response):
await self._sasl_reauthentication_done.wait()

return await self._send(request=request, expect_response=expect_response)

def _send_sasl_token(self, payload, expect_response=True):
if self._writer is None:
raise Errors.KafkaConnectionError(
Expand Down Expand Up @@ -530,6 +644,13 @@ def close(self, reason=None, exc=None):
if self._idle_handle is not None:
self._idle_handle.cancel()

if (
self._sasl_reauthentication_task is not None
and not self._sasl_reauthentication_task.done()
):
self._sasl_reauthentication_task.cancel()
self._sasl_reauthentication_task = None

# transport.close() will close socket, but not right ahead. Return
# a future in case we need to wait on it.
return self._closed_fut
Expand Down
68 changes: 42 additions & 26 deletions docker/scripts/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/sh
#!/bin/bash

OPTIONS=""
OPTIONS=()
PATH="$HOME/bin:$PATH"

# Configure the default number of log partitions per topic
if [ ! -z "$NUM_PARTITIONS" ]; then
echo "default number of partition: $NUM_PARTITIONS"
OPTIONS="$OPTIONS --override num.partitions=$NUM_PARTITIONS"
OPTIONS+=("--override" "num.partitions=$NUM_PARTITIONS")
fi

# Set the external host and port
Expand All @@ -16,38 +16,54 @@ echo "advertised port: $ADVERTISED_PORT"
LISTENERS="PLAINTEXT://:$ADVERTISED_PORT"
ADVERTISED_LISTENERS="PLAINTEXT://$ADVERTISED_HOST:$ADVERTISED_PORT"

if [ ! -z "$ADVERTISED_SSL_PORT" ]; then
if [[ ! -z "$ADVERTISED_SSL_PORT" ]]; then
echo "advertised ssl port: $ADVERTISED_SSL_PORT"

# SSL options
OPTIONS="$OPTIONS --override ssl.protocol=TLS"
OPTIONS="$OPTIONS --override ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1"
OPTIONS="$OPTIONS --override ssl.keystore.type=JKS"
OPTIONS="$OPTIONS --override ssl.keystore.location=/ssl_cert/br_server.keystore.jks"
OPTIONS="$OPTIONS --override ssl.keystore.password=abcdefgh"
OPTIONS="$OPTIONS --override ssl.key.password=abcdefgh"
OPTIONS="$OPTIONS --override ssl.truststore.type=JKS"
OPTIONS="$OPTIONS --override ssl.truststore.location=/ssl_cert/br_server.truststore.jks"
OPTIONS="$OPTIONS --override ssl.truststore.password=abcdefgh"
OPTIONS="$OPTIONS --override ssl.client.auth=required"
OPTIONS="$OPTIONS --override security.inter.broker.protocol=SSL"
OPTIONS="$OPTIONS --override ssl.endpoint.identification.algorithm="
OPTIONS+=("--override" "ssl.protocol=TLS")
OPTIONS+=("--override" "ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1")
OPTIONS+=("--override" "ssl.keystore.type=JKS")
OPTIONS+=("--override" "ssl.keystore.location=/ssl_cert/br_server.keystore.jks")
OPTIONS+=("--override" "ssl.keystore.password=abcdefgh")
OPTIONS+=("--override" "ssl.key.password=abcdefgh")
OPTIONS+=("--override" "ssl.truststore.type=JKS")
OPTIONS+=("--override" "ssl.truststore.location=/ssl_cert/br_server.truststore.jks")
OPTIONS+=("--override" "ssl.truststore.password=abcdefgh")
OPTIONS+=("--override" "ssl.client.auth=required")
OPTIONS+=("--override" "security.inter.broker.protocol=SSL")
OPTIONS+=("--override" "ssl.endpoint.identification.algorithm=")

LISTENERS="$LISTENERS,SSL://:$ADVERTISED_SSL_PORT"
ADVERTISED_LISTENERS="$ADVERTISED_LISTENERS,SSL://$ADVERTISED_HOST:$ADVERTISED_SSL_PORT"
fi

if [ ! -z "$SASL_MECHANISMS" ]; then
if [[ ! -z "$SASL_MECHANISMS" ]]; then
echo "sasl mechanisms: $SASL_MECHANISMS"
echo "advertised sasl plaintext port: $ADVERTISED_SASL_PLAINTEXT_PORT"
echo "advertised sasl ssl port: $ADVERTISED_SASL_SSL_PORT"

OPTIONS="$OPTIONS --override sasl.enabled.mechanisms=$SASL_MECHANISMS"
OPTIONS="$OPTIONS --override sasl.kerberos.service.name=kafka"
OPTIONS="$OPTIONS --override authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer"
OPTIONS="$OPTIONS --override allow.everyone.if.no.acl.found=true"
OPTIONS+=("--override" "sasl.enabled.mechanisms=$SASL_MECHANISMS")
OPTIONS+=("--override" "sasl.kerberos.service.name=kafka")
OPTIONS+=("--override" "authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer")
OPTIONS+=("--override" "allow.everyone.if.no.acl.found=true")
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/$SASL_JAAS_FILE"

# OAUTHBEARER configuration is incompatible with other SASL configurations present in JAAS file
if [[ "$SASL_MECHANISMS" == "OAUTHBEARER" ]]; then
OPTIONS+=("--override" "listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub=\"producer\"
unsecuredValidatorAllowableClockSkewMs=\"3000\";"
)
OPTIONS+=("--override" "listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub=\"consumer\"
unsecuredValidatorAllowableClockSkewMs=\"3000\";"
)
OPTIONS+=("--override" "listener.name.sasl_plaintext.oauthbearer.connections.max.reauth.ms=3600000")
OPTIONS+=("--override" "listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000")
fi

LISTENERS="$LISTENERS,SASL_PLAINTEXT://:$ADVERTISED_SASL_PLAINTEXT_PORT"
ADVERTISED_LISTENERS="$ADVERTISED_LISTENERS,SASL_PLAINTEXT://$ADVERTISED_HOST:$ADVERTISED_SASL_PLAINTEXT_PORT"

Expand All @@ -56,13 +72,13 @@ if [ ! -z "$SASL_MECHANISMS" ]; then
fi

# Enable auto creation of topics
OPTIONS="$OPTIONS --override auto.create.topics.enable=true"
OPTIONS="$OPTIONS --override listeners=$LISTENERS"
OPTIONS="$OPTIONS --override advertised.listeners=$ADVERTISED_LISTENERS"
OPTIONS="$OPTIONS --override super.users=User:admin"
OPTIONS+=("--override" "auto.create.topics.enable=true")
OPTIONS+=("--override" "listeners=$LISTENERS")
OPTIONS+=("--override" "advertised.listeners=$ADVERTISED_LISTENERS")
OPTIONS+=("--override" "super.users=User:admin")


# Run Kafka
echo "$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties $OPTIONS"

exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties $OPTIONS
exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties "${OPTIONS[@]}"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ asyncio_mode = "auto"
addopts = ["--strict-config", "--strict-markers"]
markers = [
"ssl: Tests that require SSL certificates to run",
"oauthbearer: Tests that require SASL OAUTHBEARER mechanism to run",
]
filterwarnings = [
"error",
Expand Down
1 change: 1 addition & 0 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Pygments==2.18.0
gssapi==1.9.0
async-timeout==4.0.3
cramjam==2.9.0
pyjwt==2.10.1
Loading