Skip to content
Open
60 changes: 54 additions & 6 deletions examples/python/getting-started/consumer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
PARTITION_ID = 0
BATCHES_LIMIT = 5

ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address"])
ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address", "tls", "tls_ca_file", "username", "password"])


class ValidateUrl(argparse.Action):
Expand All @@ -57,23 +57,71 @@ def parse_args():
action=ValidateUrl,
default="127.0.0.1:8090",
)
return parser.parse_args()

parser.add_argument(
"--tls",
action="store_true",
default=False,
help="Enable TLS for TCP connection",
)
parser.add_argument(
"--tls-ca-file",
default="",
help="Path to TLS CA certificate file",
)
parser.add_argument(
"--username",
default="iggy",
help="Username for authentication",
)
parser.add_argument(
"--password",
default="iggy",
help="Password for authentication",
)
args = parser.parse_args()

# Validate TLS requirements
if args.tls and not args.tls_ca_file:
parser.error("--tls requires --tls-ca-file")

return ArgNamespace(**vars(args))

def build_connection_string(args) -> str:
"""Build a connection string with TLS support."""

conn_str = f"iggy://{args.username}:{args.password}@{args.tcp_server_address}"

if args.tls:
# Extract domain from server address (host:port -> host)
host = args.tcp_server_address.split(":")[0]
query_params = [f"tls=true", f"tls_domain={host}"]

# Add CA file if provided
if args.tls_ca_file:
query_params.append(f"tls_ca_file={args.tls_ca_file}")
conn_str += "?" + "&".join(query_params)

return conn_str

async def main():
args: ArgNamespace = parse_args()
client = IggyClient(args.tcp_server_address)

# Build connection string with TLS support
connection_string = build_connection_string(args)
logger.info(f"Connection string: {connection_string}")

client = IggyClient.from_connection_string(connection_string)
try:
logger.info("Connecting to IggyClient...")
await client.connect()
logger.info("Connected. Logging in user...")
await client.login_user("iggy", "iggy")
await client.login_user(args.username, args.password)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the credentials are provided in the connection string, then auto_login is enabled, so calling login is no longer needed as a separate step

logger.info("Logged in.")
await consume_messages(client)
except Exception as error:
logger.exception("Exception occurred in main function: {}", error)


async def consume_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(
Expand Down
59 changes: 54 additions & 5 deletions examples/python/getting-started/producer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
PARTITION_ID = 0
BATCHES_LIMIT = 5

ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address"])
ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address", "tls", "tls_ca_file", "username", "password"])


class ValidateUrl(argparse.Action):
Expand All @@ -58,16 +58,65 @@ def parse_args():
action=ValidateUrl,
default="127.0.0.1:8090",
)
return parser.parse_args()

parser.add_argument(
"--tls",
action="store_true",
default=False,
help="Enable TLS for TCP connection",
)
parser.add_argument(
"--tls-ca-file",
default="",
help="Path to TLS CA certificate file",
)
parser.add_argument(
"--username",
default="iggy",
help="Username for authentication",
)
parser.add_argument(
"--password",
default="iggy",
help="Password for authentication",
)
args = parser.parse_args()

# Validate TLS requirements
if args.tls and not args.tls_ca_file:
parser.error("--tls requires --tls-ca-file")

return ArgNamespace(**vars(args))

def build_connection_string(args) -> str:
"""Build a connection string with TLS support."""

conn_str = f"iggy://{args.username}:{args.password}@{args.tcp_server_address}"

if args.tls:
# Extract domain from server address (host:port -> host)
host = args.tcp_server_address.split(":")[0]
query_params = [f"tls=true", f"tls_domain={host}"]

# Add CA file if provided
if args.tls_ca_file:
query_params.append(f"tls_ca_file={args.tls_ca_file}")
conn_str += "?" + "&".join(query_params)

return conn_str

async def main():
args: ArgNamespace = parse_args()
client = IggyClient(args.tcp_server_address)
# Build connection string with TLS support
connection_string = build_connection_string(args)
logger.info(f"Connection string: {connection_string}")
logger.info(f"Connecting to {args.tcp_server_address} (TLS: {args.tls})")


client = IggyClient.from_connection_string(connection_string)
logger.info("Connecting to IggyClient")
await client.connect()
logger.info("Connected. Logging in user...")
await client.login_user("iggy", "iggy")
await client.login_user(args.username, args.password)
logger.info("Logged in.")
await init_system(client)
await produce_messages(client)
Expand Down
2 changes: 1 addition & 1 deletion foreign/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def pytest_collection_modifyitems(config, items):
"""Modify test collection to add markers automatically."""
for item in items:
# Mark all tests in test_iggy_sdk.py as integration tests
if "test_iggy_sdk" in item.nodeid:
if "test_iggy_sdk" in item.nodeid or "test_tls" in item.nodeid:
item.add_marker(pytest.mark.integration)
189 changes: 189 additions & 0 deletions foreign/python/tests/test_tls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Integration tests for TLS connectivity using testcontainers.

These tests spin up a TLS-enabled Iggy server in a Docker container
so they are fully self-contained and work in CI without a pre-running
TLS server.

Requirements:
- Docker running locally
- testcontainers[docker] installed (in [testing-docker] extras)
- CA certificate available at core/certs/iggy_ca_cert.pem
"""

import os
import uuid

import pytest
from apache_iggy import IggyClient, PollingStrategy
from apache_iggy import SendMessage as Message
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from .utils import wait_for_ping, wait_for_server

# Paths resolved relative to this file → repo root
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
CERTS_DIR = os.path.join(REPO_ROOT, "core", "certs")
CA_FILE = os.path.join(CERTS_DIR, "iggy_ca_cert.pem")

IGGY_IMAGE = os.environ.get("IGGY_SERVER_DOCKER_IMAGE", "apache/iggy:edge")
CONTAINER_TCP_PORT = 8090


@pytest.fixture(scope="module")
def tls_container():
"""Start a TLS-enabled Iggy server in Docker."""
container = (
DockerContainer(IGGY_IMAGE)
.with_exposed_ports(CONTAINER_TCP_PORT)
.with_env("IGGY_ROOT_USERNAME", "iggy")
.with_env("IGGY_ROOT_PASSWORD", "iggy")
.with_env("IGGY_TCP_TLS_ENABLED", "true")
.with_env("IGGY_TCP_TLS_CERT_FILE", "/app/certs/iggy_cert.pem")
.with_env("IGGY_TCP_TLS_KEY_FILE", "/app/certs/iggy_key.pem")
.with_env("IGGY_TCP_ADDRESS", f"0.0.0.0:{CONTAINER_TCP_PORT}")
.with_volume_mapping(CERTS_DIR, "/app/certs", "ro")
.with_kwargs(privileged=True)
)
container.start()
# Wait for the server to be ready inside the container
wait_for_logs(container, "Iggy server is running", timeout=60)
yield container
container.stop()


@pytest.fixture(scope="module")
async def tls_client(tls_container) -> IggyClient:
"""Create an authenticated client connected to the TLS container."""
host = "localhost"
port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)

wait_for_server(host, int(port))

conn_str = (
f"iggy+tcp://iggy:iggy@{host}:{port}"
f"?tls=true&tls_domain={host}&tls_ca_file={CA_FILE}"
)
client = IggyClient.from_connection_string(conn_str)
await client.connect()
await wait_for_ping(client)
await client.login_user("iggy", "iggy")
return client


@pytest.mark.integration
class TestTlsConnectivity:
"""Test TLS connection establishment and basic operations."""

@pytest.mark.asyncio
async def test_ping_over_tls(self, tls_client: IggyClient):
"""Test that the server responds to ping over a TLS connection."""
await tls_client.ping()

@pytest.mark.asyncio
async def test_client_not_none(self, tls_client: IggyClient):
"""Test that the TLS client fixture is properly initialized."""
assert tls_client is not None

@pytest.mark.asyncio
async def test_create_stream_over_tls(self, tls_client: IggyClient):
"""Test creating and getting a stream over TLS."""
stream_name = f"tls-test-stream-{uuid.uuid4().hex[:8]}"
await tls_client.create_stream(stream_name)
stream = await tls_client.get_stream(stream_name)
assert stream is not None

@pytest.mark.asyncio
async def test_produce_and_consume_over_tls(self, tls_client: IggyClient):
"""Test producing and consuming messages over TLS."""
stream_name = f"tls-msg-stream-{uuid.uuid4().hex[:8]}"
topic_name = "tls-test-topic"
partition_id = 0

# Create stream and topic
await tls_client.create_stream(stream_name)
await tls_client.create_topic(
stream_name, topic_name, partitions_count=1
)

# Produce messages
test_messages = [f"tls-message-{i}" for i in range(3)]
messages = [Message(msg) for msg in test_messages]
await tls_client.send_messages(
stream=stream_name,
topic=topic_name,
partitioning=partition_id,
messages=messages,
)

# Consume messages
polled = await tls_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.First(),
count=10,
auto_commit=True,
)
assert len(polled) >= len(test_messages)

# Verify message payloads
for i, expected_msg in enumerate(test_messages):
if i < len(polled):
assert polled[i].payload().decode("utf-8") == expected_msg


@pytest.mark.integration
class TestTlsConnectionString:
"""Test TLS connection string variations."""

@pytest.mark.asyncio
async def test_connection_string_with_tls_params(self, tls_container):
"""Test creating a client from a connection string with TLS parameters."""
host = "localhost"
port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)

wait_for_server(host, int(port))

conn_str = (
f"iggy+tcp://iggy:iggy@{host}:{port}"
f"?tls=true&tls_domain={host}&tls_ca_file={CA_FILE}"
)
client = IggyClient.from_connection_string(conn_str)
await client.connect()
await wait_for_ping(client)
await client.ping()

@pytest.mark.asyncio
async def test_connect_without_tls_should_fail(self, tls_container):
"""Test that connecting without TLS to a TLS-enabled server fails."""
host = "localhost"
port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)

wait_for_server(host, int(port))

# Connect without TLS to a TLS-enabled server
# IggyClient constructor requires IP address, not hostname
client = IggyClient(f"127.0.0.1:{port}")
await client.connect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, doesn't connect already throw an error?
Also, I think the client set up should be as close as possible (we just want to test the difference tls/no tls), so you should use connection string, but without the tls part.


with pytest.raises(RuntimeError):
await client.login_user("iggy", "iggy")
Loading
Loading