A high-performance real-time analytics pipeline that processes e-commerce clickstream data from an API through Confluent Cloud Kafka to ClickHouse for analytics.
Data Generated by :https://github.com/kuldeep27396/clickstream-datagenerator
graph TD
subgraph "Data Sources"
A[Clickstream API<br/>10K msg/sec<br/>JSON Format]
end
subgraph "Message Broker"
B[Confluent Cloud Kafka<br/>โข Topics & Partitions<br/>โข Consumer Groups<br/>โข SASL_SSL Security]
end
subgraph "Processing Layer"
C[Stream Consumer<br/>โข Batch Processing<br/>โข Error Handling<br/>โข ClickHouse Integration]
end
subgraph "Analytics Storage"
D[ClickHouse Cloud<br/>โข MergeTree Engine<br/>โข Columnar Storage<br/>โข Real-time Analytics]
end
subgraph "Monitoring & Insights"
E[Dashboard & Queries<br/>โข Performance Metrics<br/>โข Business Intelligence<br/>โข Real-time Monitoring]
end
A -->|HTTPS Stream| B
B -->|Kafka Protocol| C
C -->|Batch Insert| D
D -->|SQL Queries| E
classDef source fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef broker fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
classDef process fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px
classDef storage fill:#fff3e0,stroke:#e65100,stroke-width:2px
classDef monitor fill:#fce4ec,stroke:#880e4f,stroke-width:2px
class A source
class B broker
class C process
class D storage
class E monitor
graph LR
subgraph "Producer Service"
A1[API Client<br/>requests lib]
A2[Rate Limiter<br/>10K msg/sec]
A3[Message Formatter<br/>JSON Validation]
A4[Kafka Producer<br/>kafka-python]
A5[Batch Manager<br/>16KB batches]
A6[Compression<br/>gzip]
end
subgraph "Confluent Cloud Kafka"
B1[Topic: clickstream-events<br/>6 Partitions]
B2[Replication Factor: 3]
B3[Retention: 7 days]
B4[Consumer Group: analytics-group]
B5[Security: SASL_SSL]
B6[Compression: Snappy]
end
subgraph "Consumer Service"
C1[Kafka Consumer<br/>kafka-python]
C2[Offset Manager<br/>Auto-commit]
C3[Batch Processor<br/>1000 events/batch]
C4[Timestamp Parser<br/>ISO โ Unix ms]
C5[Data Transformer<br/>Field Mapping]
C6[ClickHouse Client<br/>clickhouse-connect]
end
subgraph "ClickHouse Database"
D1[Database: ecommerce]
D2[Table: clickstream_events]
D3[Engine: MergeTree]
D4[Order Key: timestamp user_id]
D5[Compression: LZ4]
D6[TTL: 30 days]
end
A1 --> A2 --> A3 --> A4 --> A5 --> A6
A6 -->|Produce| B1
B1 --> B2 --> B3 --> B4 --> B5 --> B6
B6 -->|Consume| C1
C1 --> C2 --> C3 --> C4 --> C5 --> C6
C6 -->|Insert| D1 --> D2 --> D3 --> D4 --> D5 --> D6
classDef producer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef kafka fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef consumer fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef clickhouse fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
class A1,A2,A3,A4,A5,A6 producer
class B1,B2,B3,B4,B5,B6 kafka
class C1,C2,C3,C4,C5,C6 consumer
class D1,D2,D3,D4,D5,D6 clickhouse
flowchart TD
subgraph "External API"
A[Clickstream Generator<br/>Railway.app<br/>RESTful API]
end
subgraph "Producer Service"
B[HTTP Client<br/>Async Requests]
C[Message Queue<br/>Buffer: 32MB]
D[Kafka Producer<br/>Optimized Settings]
end
subgraph "Confluent Cloud"
E[Topic Partition 0]
F[Topic Partition 1]
G[Topic Partition 2]
H[Topic Partition 3]
I[Topic Partition 4]
J[Topic Partition 5]
end
subgraph "Consumer Service"
K[Consumer Instance 1<br/>Partitions: 0,1]
L[Consumer Instance 2<br/>Partitions: 2,3]
M[Consumer Instance 3<br/>Partitions: 4,5]
end
subgraph "ClickHouse Cluster"
N[Primary Node<br/>INSERT Operations]
O[Replica Nodes<br/>Read Replicas]
end
subgraph "Analytics Layer"
P[Query Engine<br/>SQL Processing]
Q[Dashboard<br/>Real-time Metrics]
R[BI Tools<br/>External Connections]
end
A -->|HTTPS| B -->|JSON| C -->|Batch| D
D -->|Round Robin| E & F & G & H & I & J
E --> K
F --> K
G --> L
H --> L
I --> M
J --> M
K & L & M -->|Batch Insert| N -->|Replicate| O
N & O -->|SELECT| P -->|Metrics| Q
O -->|Analytics| R
classDef external fill:#ffebee,stroke:#c62828,stroke-width:2px
classDef producer fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef kafka fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef consumer fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
classDef storage fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef analytics fill:#e0f2f1,stroke:#00695c,stroke-width:2px
class A external
class B,C,D producer
class E,F,G,H,I,J kafka
class K,L,M consumer
class N,O storage
class P,Q,R analytics
- Throughput: 650+ events/second sustained
- Total Events Processed: 510,414+ events
- Latency: Sub-second end-to-end
- Message Format: JSON with gzip compression
- Batch Processing: 1000 events per batch
- Database: ClickHouse with MergeTree engine
- Python 3.11+
- UV package manager
- Confluent Cloud account
- ClickHouse Cloud account
- Install dependencies:
uv sync- Configure environment:
cp .env.example .env
# Edit .env with your Confluent Cloud and ClickHouse credentials- Run the producer:
uv run python services/producer/src/simple_producer.py- Run the consumer:
uv run python services/consumer/src/simple_consumer.py- Check data in ClickHouse:
uv run python check_clickhouse.pyThe Confluent Cloud dashboard shows the cluster configuration and monitoring interface.
Real-time message streaming from the API endpoint to Confluent Cloud topics.
Consumer group management showing active consumers and their offset tracking.
Topic configuration, partition layout, and message retention policies.
Real-time message processing statistics and throughput metrics.
Producer performance metrics showing message production rates and latency.
Consumer lag monitoring showing real-time consumption rates.
Detailed throughput analysis and message processing efficiency.
ClickHouse web interface for SQL queries and data analysis.
Data visualization showing clickstream event patterns and analytics.
Query performance metrics showing fast analytics on large datasets.
Real-time monitoring dashboard for system health and performance.
Database schema and table structure for clickstream events.
Confluent Cloud cluster management and resource allocation.
Data ingestion pipeline showing real-time event processing.
Comprehensive analytics dashboard with key metrics and KPIs.
Event type distribution and user behavior analytics.
Complete system overview showing all components and their status.
# Confluent Cloud Configuration
KAFKA_BOOTSTRAP_SERVERS=your-cluster.cloud.confluent.cloud:9092
KAFKA_TOPIC=clickstream-events
KAFKA_GROUP_ID=analytics-consumer-group
KAFKA_API_KEY=your-api-key
KAFKA_API_SECRET=your-api-secret
# ClickHouse Configuration
CLICKHOUSE_HOST=your-host.clickhouse.cloud
CLICKHOUSE_PORT=8443
CLICKHOUSE_DATABASE=ecommerce
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your-password
CLICKHOUSE_SECURE=true
# API Configuration
API_URL=https://clickstream-datagenerator-production.up.railway.app/stream/interactions
API_RATE=10000
API_DURATION=3600The producer is optimized for high throughput:
- Batch Size: 16KB
- Linger Time: 5ms
- Compression: Gzip
- In-flight Requests: 10
- Buffer Memory: 32MB
- Retries: 3
The consumer uses batch processing for efficiency:
- Batch Size: 1000 events
- Batch Timeout: 5 seconds
- Auto-commit: Enabled
- Offset Reset: Latest
CREATE TABLE clickstream_events (
user_id String,
session_id String,
timestamp UInt64,
event_type String,
product_id String,
product_category String,
price Float64,
quantity Int32,
source String,
received_time DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (timestamp, user_id)-- Total events processed
SELECT COUNT(*) FROM clickstream_events;
-- Event type distribution
SELECT event_type, COUNT(*) as count
FROM clickstream_events
GROUP BY event_type
ORDER BY count DESC;
-- Revenue by category
SELECT product_category, SUM(price * quantity) as revenue
FROM clickstream_events
GROUP BY product_category
ORDER BY revenue DESC;
-- Events over time (hourly)
SELECT
toStartOfHour(received_time) as hour,
COUNT(*) as event_count
FROM clickstream_events
GROUP BY hour
ORDER BY hour;
-- Top users by activity
SELECT user_id, COUNT(*) as activity_count
FROM clickstream_events
GROUP BY user_id
ORDER BY activity_count DESC
LIMIT 10;pie
title Performance Metrics Distribution
"Throughput (650+ msg/sec)" : 35
"Total Events (510K+)" : 30
"Storage Efficiency (5-10x)" : 20
"Query Performance (<1sec)" : 15
graph TB
subgraph "Current Scale"
A[Single Producer<br/>650 msg/sec]
B[6 Kafka Partitions<br/>Balanced Load]
C[3 Consumer Instances<br/>Auto-scaling]
D[ClickHouse Single Node<br/>Adequate Performance]
end
subgraph "Horizontal Scaling"
E[Multiple Producers<br/>Load Balancing]
F[12+ Kafka Partitions<br/>Higher Throughput]
G[6+ Consumer Instances<br/>Container Orchestration]
H[ClickHouse Cluster<br/>Distributed Processing]
end
subgraph "Vertical Scaling"
I[Enhanced Producer<br/>Larger Batches]
J[Optimized Kafka<br/>Tuned Parameters]
K[Improved Consumer<br/>Better Memory]
L[ClickHouse Upgrade<br/>More Resources]
end
A -.->|Scale Out| E
B -.->|Expand| F
C -.->|Multiply| G
D -.->|Cluster| H
A -.->|Optimize| I
B -.->|Tune| J
C -.->|Enhance| K
D -.->|Upgrade| L
classDef current fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef horizontal fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef vertical fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
class A,B,C,D current
class E,F,G,H horizontal
class I,J,K,L vertical
graph LR
subgraph "Producer Optimizations"
A[Batch Size: 16KB โ 32KB]
B[Linger Time: 5ms โ 10ms]
C[Buffer Memory: 32MB โ 64MB]
D[Compression: gzip โ zstd]
end
subgraph "Kafka Optimizations"
E[Partitions: 6 โ 12]
F[Replication: 3 โ 3]
G[Retention: 7d โ 14d]
H[Compression: snappy โ lz4]
end
subgraph "Consumer Optimizations"
I[Batch Size: 1000 โ 2000]
J[Timeout: 5s โ 3s]
K[Instances: 3 โ 6]
L[Parallel Processing]
end
subgraph "ClickHouse Optimizations"
M[Partitioning: Daily]
N[Compression: LZ4 โ ZSTD]
O[Replication: Async]
P[Materialized Views]
end
A -->|+15%| E
B -->|+10%| F
C -->|+20%| G
D -->|+25%| H
E -->|+30%| I
F -->|+15%| J
G -->|+40%| K
H -->|+20%| L
I -->|+50%| M
J -->|+25%| N
K -->|+35%| O
L -->|+45%| P
classDef producer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef kafka fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef consumer fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef clickhouse fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
class A,B,C,D producer
class E,F,G,H kafka
class I,J,K,L consumer
class M,N,O,P clickhouse
graph TD
subgraph "Monitoring Layer"
A[Prometheus<br/>Metrics Collection]
B[Grafana<br/>Visualization]
C[AlertManager<br/>Notifications]
D[Health Checks<br/>Automated Testing]
end
subgraph "Producer Monitoring"
E[Message Rate<br/>Events/sec]
F[Error Rate<br/>Failed/Total]
G[Latency<br/>Publish Time]
H[Buffer Usage<br/>Memory/CPU]
end
subgraph "Kafka Monitoring"
I[Topic Throughput<br/>MB/sec]
J[Consumer Lag<br/>Offset Delay]
K[Partition Health<br/>Leader Status]
L[Network I/O<br/>Read/Write]
end
subgraph "Consumer Monitoring"
M[Processing Rate<br/>Events/sec]
N[Batch Efficiency<br/>Size/Frequency]
O[Error Handling<br/>Retry Count]
P[Resource Usage<br/>Memory/CPU]
end
subgraph "ClickHouse Monitoring"
Q[Query Performance<br/>Execution Time]
R[Storage Efficiency<br/>Compression Ratio]
S[Memory Usage<br/>Buffer Cache]
T[Replication Lag<br/>Sync Status]
end
E --> A
F --> A
G --> A
H --> A
I --> A
J --> A
K --> A
L --> A
M --> A
N --> A
O --> A
P --> A
Q --> A
R --> A
S --> A
T --> A
A --> B
A --> C
D --> A
classDef monitor fill:#e0f2f1,stroke:#00695c,stroke-width:2px
classDef producer fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef kafka fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef consumer fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef clickhouse fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
class A,B,C,D monitor
class E,F,G,H producer
class I,J,K,L kafka
class M,N,O,P consumer
class Q,R,S,T clickhouse
# Check Kafka connectivity
python debug_kafka.py
# Check ClickHouse data
python check_clickhouse.py
# Test ClickHouse connection
python test_clickhouse_insert.pygantt
title Key Metrics Monitoring Timeline
dateFormat s
section Producer Metrics
Message Rate :active, prod-rate, 0, 60
Error Rate :active, prod-error, 0, 60
Latency :active, prod-latency, 0, 60
Buffer Usage :active, prod-buffer, 0, 60
section Kafka Metrics
Topic Throughput :active, kafka-throughput, 0, 60
Consumer Lag :active, kafka-lag, 0, 60
Partition Health :active, kafka-health, 0, 60
Network I/O :active, kafka-network, 0, 60
section Consumer Metrics
Processing Rate :active, cons-rate, 0, 60
Batch Efficiency :active, cons-batch, 0, 60
Error Handling :active, cons-error, 0, 60
Resource Usage :active, cons-resource, 0, 60
section ClickHouse Metrics
Query Performance :active, ch-query, 0, 60
Storage Efficiency :active, ch-storage, 0, 60
Memory Usage :active, ch-memory, 0, 60
Replication Lag :active, ch-replication, 0, 60
flowchart TD
A[System Issue Detected] --> B{Check Producer Health}
B -->|Healthy| C{Check Kafka Connectivity}
B -->|Issues| D[Producer Troubleshooting]
C -->|Connected| E{Check Consumer Status}
C -->|Disconnected| F[Kafka Troubleshooting]
E -->|Running| G{Check ClickHouse Health}
E -->|Stopped| H[Consumer Troubleshooting]
G -->|Healthy| I[Data Analysis]
G -->|Issues| J[ClickHouse Troubleshooting]
D --> K[Review Logs]
D --> L[Check Credentials]
D --> M[Verify Network]
F --> N[Check Bootstrap Servers]
F --> O[Validate Security Settings]
F --> P[Monitor Topic Health]
H --> Q[Check Consumer Groups]
H --> R[Review Processing Logs]
H --> S[Monitor Resource Usage]
J --> T[Optimize Queries]
J --> U[Check Storage Space]
J --> V[Review Schema Design]
classDef issue fill:#ffebee,stroke:#c62828,stroke-width:2px
classDef healthy fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef troubleshooting fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
classDef analysis fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
class A issue
class B,C,E,G healthy
class D,F,H,J troubleshooting
class I analysis
stateDiagram-v2
[*] --> ProducerStart
ProducerStart --> APIConnection : Connect to API
APIConnection --> MessageProcessing : Success
APIConnection --> RetryConnection : Failed
RetryConnection --> APIConnection : After Delay
RetryConnection --> ProducerError : Max Retries
MessageProcessing --> KafkaPublish : Format Message
KafkaPublish --> BatchComplete : Success
KafkaPublish --> RetryPublish : Failed
RetryPublish --> KafkaPublish : Within Retries
RetryPublish --> ProducerError : Max Retries
BatchComplete --> ConsumerStart : Message Published
ConsumerStart --> KafkaConsume : Connect to Kafka
KafkaConsume --> DataProcessing : Message Received
KafkaConsume --> ConsumerError : Connection Failed
DataProcessing --> ClickHouseInsert : Transform Data
ClickHouseInsert --> InsertComplete : Success
ClickHouseInsert --> RetryInsert : Failed
RetryInsert --> ClickHouseInsert : Within Retries
RetryInsert --> ConsumerError : Max Retries
InsertComplete --> [*] : Processing Complete
ProducerError --> [*] : Alert & Stop
ConsumerError --> [*] : Alert & Stop
state ProducerStart {
[*] --> Initialize
Initialize --> ValidateConfig
ValidateConfig --> [*]
}
state ConsumerStart {
[*] --> Initialize
Initialize --> ValidateSettings
ValidateSettings --> [*]
}
graph TD
subgraph "Producer Issues"
A1[Connection Timeout<br/>โข Network Issues<br/>โข API Down]
A2[Authentication Failed<br/>โข Invalid Credentials<br/>โข API Key Expired]
A3[Rate Limiting<br/>โข Too Many Requests<br/>โข API Throttling]
A4[Memory Issues<br/>โข Buffer Overflow<br/>โข Insufficient RAM]
end
subgraph "Kafka Issues"
B1[Topic Not Found<br/>โข Misspelled Name<br/>โข Permission Issues]
B2[Consumer Lag<br/>โข Slow Processing<br/>โข Under-provisioned]
B3[Partition Imbalance<br/>โข Uneven Distribution<br/>โข Broker Issues]
B4[Message Loss<br/>โข Retention Policy<br/>โข Broker Failure]
end
subgraph "Consumer Issues"
C1[Deserialization Errors<br/>โข Invalid JSON<br/>โข Schema Changes]
C2[Offset Management<br/>โข Duplicate Processing<br/>โข Data Loss]
C3[Processing Bottlenecks<br/>โข CPU Bound<br/>โข I/O Bound]
C4[ClickHouse Connection<br/>โข Network Issues<br/>โข Authentication]
end
subgraph "ClickHouse Issues"
D1[Query Timeouts<br/>โข Complex Queries<br/>โข Large Datasets]
D2[Insert Failures<br/>โข Schema Mismatch<br/>โข Data Types]
D3[Performance Issues<br/>โข Missing Indexes<br/>โข Poor Partitioning]
D4[Storage Issues<br/>โข Disk Full<br/>โข Corruption]
end
A1 -->|Solution| E1[Check Network<br/>Retry Logic<br/>Circuit Breaker]
A2 -->|Solution| E2[Refresh Credentials<br/>Check API Keys<br/>Update Tokens]
A3 -->|Solution| E3[Implement Backoff<br/>Rate Limiting<br/>Queue Messages]
A4 -->|Solution| E4[Increase Memory<br/>Optimize Batching<br/>Monitor Usage]
B1 -->|Solution| F1[Verify Topic Name<br/>Check Permissions<br/>Recreate Topic]
B2 -->|Solution| F2[Scale Consumers<br/>Optimize Processing<br/>Monitor Lag]
B3 -->|Solution| F3[Rebalance Partitions<br/>Add Brokers<br/>Monitor Health]
B4 -->|Solution| F4[Adjust Retention<br/>Enable Replication<br/>Monitor Brokers]
C1 -->|Solution| G1[Schema Validation<br/>Error Handling<br/>Data Recovery]
C2 -->|Solution| G2[Manual Offset Reset<br/>Idempotent Processing<br/>Checkpointing]
C3 -->|Solution| G3[Horizontal Scaling<br/>Optimize Code<br/>Profile Performance]
C4 -->|Solution| G4[Check Connection<br/>Update Credentials<br/>Retry Logic]
D1 -->|Solution| H1[Query Optimization<br/>Add Indexes<br/>Limit Results]
D2 -->|Solution| H2[Schema Validation<br/>Data Transformation<br/>Error Handling]
D3 -->|Solution| H3[Database Tuning<br/>Materialized Views<br/>Partitioning]
D4 -->|Solution| H4[Storage Cleanup<br/>Data Archival<br/>Disk Expansion]
classDef issue fill:#ffebee,stroke:#c62828,stroke-width:2px
classDef solution fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
class A1,A2,A3,A4,B1,B2,B3,B4,C1,C2,C3,C4,D1,D2,D3,D4 issue
class E1,E2,E3,E4,F1,F2,F3,F4,G1,G2,G3,G4,H1,H2,H3,H4 solution
โโโ services/
โ โโโ producer/
โ โ โโโ src/
โ โ โโโ simple_producer.py
โ โ โโโ config.py
โ โโโ consumer/
โ โโโ src/
โ โโโ simple_consumer.py
โ โโโ config.py
โโโ pyproject.toml
โโโ .env
โโโ check_clickhouse.py
โโโ debug_kafka.py
โโโ test_clickhouse_insert.py
- New Event Types: Update ClickHouse schema
- Additional Metrics: Modify consumer processing
- New Analytics: Create additional queries
- Performance Tuning: Adjust batch sizes and timeouts
- Flink Integration: Replace simple consumer with Apache Flink
- Real-time Dashboards: Add Grafana or similar visualization
- Alerting System: Implement monitoring alerts
- Data Validation: Add schema validation
- Machine Learning: Add predictive analytics
- Multi-tenant: Support multiple clients
- Advanced Analytics: Customer journey analysis
- Performance Optimization: Further tuning and scaling
This project is for educational and learning purposes.
- Fork the repository
- Create a feature branch
- Make your changes
- Test thoroughly
- Submit a pull request
Built with โค๏ธ using Confluent Cloud, ClickHouse, and Python
Processing 510,414+ events and counting...

















