A real-time clickstream data processing demo using Kafka, PostgreSQL, Valkey, and OpenSearch running on Aiven managed services. Supports multiple streaming frameworks including a lightweight kafka-python baseline, Quix Streams, Bytewax, and Mage AI. This project demonstrates how to build real-time data pipelines that process clickstream data for e-commerce analytics.
┌─────────────────────────────────────────────────────────────────────────────┐
│ PORTS AND ADAPTERS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ Framework Layer (STREAMING_IMPL) Infrastructure Layer │
│ ───────────────────────────────── ─────────────────────────────────── │
│ ┌───────────────────────────────────────────┐ PostgreSQLEventRepository │
│ │ default │ quix │ bytewax │ mage │ PostgreSQLSessionRepository │
│ │ (kafka- │ (Quix │(Bytewax)│ (Mage AI │ OpenSearchRepository │
│ │ python) │ Streams)│ │ pipelines) │ ValkeySessionState │
│ └───────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Producer: CSV reader → Kafka → clickstream-events topic │
│ │
│ PostgreSQL Consumer: Kafka → events table + sessions table │
│ │
│ OpenSearch Consumer: Kafka → events index (optional) │
└─────────────────────────────────────────────────────────────────────────────┘
The pipeline supports multiple streaming framework implementations, selectable via the STREAMING_IMPL environment variable:
| Framework | STREAMING_IMPL |
Description | Install |
|---|---|---|---|
| Default | default |
Lightweight kafka-python baseline | Base install |
| Quix Streams | quix |
Production-ready stream processing with DataFrames | Base install |
| Bytewax | bytewax |
Python streaming with Rust performance | pip install -e ".[bytewax]" |
| Mage AI | mage |
Visual pipeline orchestration | pip install -e ".[mage]" |
The default framework uses raw kafka-python with no additional streaming dependencies. It's ideal for:
- Learning the codebase without framework complexity
- Environments where minimal dependencies are preferred
- Baseline performance comparisons
Quix Streams provides a Pythonic API for Kafka stream processing with:
- Streaming DataFrames with functional transformations
- Custom sinks for any data store
- Automatic consumer group management
Bytewax is a Python-native stream processing framework with a Rust-based distributed engine:
- Dataflow API with operators like
map,filter,key_by, and stateful operations - Native Kafka connectors via
bytewax.connectors.kafka - Custom sinks using
DynamicSinkandStatefulSinkPartition - Runs as a single Python process or distributed across workers
Mage AI offers visual pipeline orchestration with:
- DAG-based pipeline definitions
- Built-in data loaders, transformers, and exporters
- Web UI for pipeline monitoring
Set STREAMING_IMPL in your .env file:
# Lightweight baseline (default)
STREAMING_IMPL=default
# Quix Streams
STREAMING_IMPL=quix
# Bytewax (requires extra install: pip install -e ".[bytewax]")
STREAMING_IMPL=bytewax
# Mage AI (requires extra install: pip install -e ".[mage]")
STREAMING_IMPL=mageAll frameworks share the same infrastructure layer (repositories, session state), ensuring consistent data handling regardless of framework choice.
# Clone and setup
git clone https://github.com/tonypiazza/aiven-assignment.git
cd aiven-assignment
git pull
# Install dependencies (choose one)
uv venv && source .venv/bin/activate && uv sync && uv pip install -e .
# OR: python -m venv .venv && source .venv/bin/activate && pip install -e .
# Copy example and update terraform.tfvars with your Aiven credentials
cp terraform/terraform.tfvars.example terraform/terraform.tfvars
# Provision infrastructure
cd terraform && terraform init && terraform apply && cd ..
# Generate .env from Terraform outputs (auto-configures all services)
clickstream config generate
# Run pipelines
clickstream consumer start
clickstream producer start --limit 10000
clickstream consumer stop
clickstream status
# Cleanup
cd terraform && terraform destroyFor local development with Docker, see: local-docker.md
clickstream --help # Show all commands
clickstream status # Service health and counts
clickstream config show # Current configuration
clickstream config generate # Generate .env from Terraform outputs
clickstream consumer start # Start consumers (1 per partition)
clickstream consumer stop # Stop all consumers
clickstream consumer restart # Restart consumers (e.g., after config change)
clickstream producer start # Batch mode (fastest)
clickstream producer start --limit N # First N events only
clickstream producer start --realtime # Real-time replay (1x speed)
clickstream producer start --realtime 5x # 5x faster than real-time
clickstream producer stop # Stop producer
clickstream data reset [-y] # Reset all data stores
clickstream analytics [--json] # Funnel metrics
clickstream benchmark [--limit N] [-y] # Benchmark consumer throughput
clickstream opensearch init # Import dashboardsclickstream/
├── app.py # CLI entry point
├── consumer_runner.py # PostgreSQL consumer process
├── opensearch_runner.py # OpenSearch consumer process
├── producer_runner.py # Kafka producer process
├── base/ # Abstract base classes (ports)
│ ├── repositories.py # EventRepository, SessionRepository, SearchRepository
│ └── ...
├── core/ # Domain logic
│ ├── models.py # Event, Session domain models
│ └── session_processor.py # Session aggregation logic
├── framework/ # Streaming framework implementations
│ ├── __init__.py # get_framework() factory
│ ├── base.py # StreamingFramework ABC
│ ├── default/ # kafka-python baseline (no extra dependencies)
│ │ ├── producer.py # CSV → Kafka producer
│ │ ├── postgresql_consumer.py
│ │ └── opensearch_consumer.py
│ ├── quix/ # Quix Streams implementation
│ │ ├── producer.py
│ │ ├── postgresql_consumer.py
│ │ ├── opensearch_consumer.py
│ │ └── sinks/ # Quix BatchingSink adapters
│ ├── bytewax/ # Bytewax implementation
│ │ ├── producer.py
│ │ ├── postgresql_consumer.py
│ │ ├── opensearch_consumer.py
│ │ └── sinks/ # Bytewax DynamicSink adapters
│ └── mage/ # Mage AI pipelines
│ ├── data_loaders/ # Kafka/CSV sources
│ ├── transformers/ # Event/session transforms
│ ├── data_exporters/ # PostgreSQL/OpenSearch sinks
│ └── pipelines/ # Pipeline definitions
├── infrastructure/ # Shared infrastructure (adapters)
│ ├── repositories/
│ │ └── postgresql.py # PostgreSQLEventRepository, PostgreSQLSessionRepository
│ ├── search/
│ │ └── opensearch.py # OpenSearchRepository
│ ├── cache/
│ │ └── valkey.py # ValkeyCache
│ └── kafka.py # Kafka utilities
├── cli/ # CLI commands
└── utils/ # Shared utilities
├── config.py # Pydantic settings
├── session_state.py # Valkey session management
└── ...
The project uses a modular architecture with separate producer and consumer implementations that can be mixed and matched:
from clickstream.producers import get_producer
from clickstream.consumers import get_consumer
# Get producer (controlled by PRODUCER_IMPL env var)
producer = get_producer() # Returns KafkaPythonProducer or QuixProducer
producer.run(limit=1000)
# Get consumer (controlled by CONSUMER_IMPL env var)
consumer = get_consumer("postgresql") # Returns implementation-specific consumer
consumer.run()Available implementations:
- Producer:
kafka_python(default),quix - Consumer:
kafka_python(default),quix,mage,bytewax
This separation allows you to:
- Use different implementations for producing vs consuming
- Compare performance across implementations
- Choose the right tool for your environment
- Add new implementations without changing application code
Session state is stored in Valkey (Redis-compatible) rather than framework-specific state stores:
- External state remains visible via
clickstream statuscommand - No data loss on consumer restart
- Preserves existing batch operations and retry logic
- Works seamlessly with Aiven Valkey
- Consistent behavior across all frameworks
The project uses a Ports and Adapters (Hexagonal) architecture for persistence:
Infrastructure Repositories (shared by all frameworks):
PostgreSQLEventRepository- Bulk inserts withON CONFLICT DO NOTHINGPostgreSQLSessionRepository- Complex upserts with array mergingOpenSearchRepository- Bulk indexing with document ID-based deduplication
Framework-Specific Adapters (thin wrappers):
- Default: Direct repository calls from kafka-python consumers
- Quix:
BatchingSinkadapters that wrap repositories - Mage: Data exporters that wrap repositories
This separation allows the infrastructure layer to be reused across all streaming frameworks while keeping framework-specific concerns isolated.
Sessions are accumulated during event processing and flushed to PostgreSQL when Kafka offsets are committed. This provides:
- Efficient batching (not writing on every event)
- Consistency with Kafka offset commits
- Same behavior across all framework implementations
The included data/events.csv contains ~2.7M clickstream events which includes a small percentage of duplicates.
| Column | Type | Description |
|---|---|---|
timestamp |
Unix ms | Event timestamp |
visitorid |
Integer | Unique visitor identifier |
event |
Enum | view, addtocart, or transaction |
itemid |
Integer | Product identifier |
transactionid |
Integer | Transaction ID (nullable) |
Sessions are aggregated from events using a configurable timeout:
| Column | Type | Description |
|---|---|---|
session_id |
VARCHAR | Unique session identifier |
visitor_id |
BIGINT | Visitor identifier |
session_start |
TIMESTAMPTZ | Session start time |
session_end |
TIMESTAMPTZ | Session end time |
duration_seconds |
INTEGER | Session duration |
event_count |
INTEGER | Total events |
view_count |
INTEGER | Number of views |
cart_count |
INTEGER | Add-to-cart count |
transaction_count |
INTEGER | Transaction count |
items_viewed |
BIGINT[] | Viewed item IDs |
items_carted |
BIGINT[] | Carted item IDs |
items_purchased |
BIGINT[] | Purchased item IDs |
converted |
BOOLEAN | Had a transaction |
Kafka partitioning involves several key tradeoffs:
- Choosing the right partition count - Too few partitions limits parallelism and throughput; too many increases broker overhead, memory usage, and end-to-end latency. Once created, increasing partitions is possible but decreasing is not (requires topic recreation).
- Partition key selection - The partition key determines which partition receives each message. Poor key selection can lead to "hot partitions" where one partition receives disproportionate traffic.
- Ordering guarantees - Kafka only guarantees message ordering within a single partition. If strict global ordering is required, you're limited to a single partition.
- Consumer group rebalancing - When consumers join or leave a group, partitions are reassigned, causing temporary processing delays.
For more information, use the following Aiven documentation links:
The above chart shows that throughput scales with the number of Kafka partitions, as each partition is consumed by a dedicated consumer process. Benchmarks show gains up to 4 partitions, with diminishing returns as the volume of incoming data grows. To change partition count, set kafka_events_topic_partitions in terraform/terraform.tfvars, then apply:
cd terraform && terraform apply && cd ..Run your own benchmarks with:
clickstream benchmarkThis command can be useful for understanding the impact of both partitioning and replication on performance.
OpenSearch is disabled by default. The OpenSearch consumer uses a separate Kafka consumer group (clickstream-opensearch), which enables automatic backfill: if you enable OpenSearch after events have already been consumed, the OpenSearch consumer will automatically read from the beginning of the topic and index all events.
OpenSearch is not provisioned by default. To enable:
-
Set in
terraform/terraform.tfvars:opensearch_enabled = true
-
Apply the changes:
cd terraform && terraform apply && cd ..
-
Regenerate
.envto include OpenSearch credentials:clickstream config generate -f
-
Restart consumer:
clickstream consumer restart
First, initialize the dashboards:
clickstream opensearch initThe output should look like:
Initializing OpenSearch
✓ OpenSearch is reachable
✓ Index 'clickstream-events' exists (99,987 documents)
✓ Imported index pattern: clickstream-events*
✓ Imported 5 visualizations
✓ Imported 2 dashboards
Open dashboards at https://<your-opensearch-host>
Use the link to open the OpenSearch Dashboards web UI. The URL will be your Aiven OpenSearch service URL.
NOTE: When prompted for tenant, select Global.
cd terraform && terraform destroyMIT

