A simple Kafka → MySQL data ingestion pipeline with a live Flask dashboard
A real-time climate data pipeline that demonstrates modern data streaming architecture using Apache Kafka, Python, MySQL, and Flask.
This project showcases a complete data streaming solution for processing and visualizing climate data in real-time. The system ingests weather data through Kafka, stores it in MySQL, and presents it through a dynamic web dashboard.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │───▶│ Kafka │───▶│ Consumer │───▶│ MySQL │
│ │ │ Broker │ │ │ │ Database │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Flask │
│ Dashboard │
└─────────────┘
- Real-time Data Processing: Streams climate data using Apache Kafka
- Persistent Storage: Stores data in MySQL for historical analysis
- Live Dashboard: Web-based interface with auto-refresh functionality
- Scalable Architecture: Designed to handle high-volume data streams
- Simple Setup: Easy to deploy and configure
- Python 3.8+
- Apache Kafka 2.8+
- MySQL 8.0+
- Java 8+ (for Kafka)
-
Clone the repository
git clone <(https://github.com/razaqshaik/DataIngestion_Pipeline_Basic.git)> cd climate-data-streaming-dashboard
-
Install Python dependencies
pip install -r requirements.txt
-
Set up MySQL database
CREATE DATABASE climate_db; USE climate_db; CREATE TABLE climate_data ( id INT AUTO_INCREMENT PRIMARY KEY, state VARCHAR(50) NOT NULL, latitude FLOAT NOT NULL, longitude FLOAT NOT NULL, datetime DATETIME NOT NULL, temperature FLOAT NOT NULL, INDEX idx_datetime (datetime), INDEX idx_state (state) );
-
Configure Kafka
# Start Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # Start Kafka Server bin/kafka-server-start.sh config/server.properties # Create topic bin/kafka-topics.sh --create \ --topic climate-topic \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1
Create a .env file in the project root:
# Database Configuration
DB_HOST=localhost
DB_PORT=3306
DB_NAME=climate_db
DB_USER=your_username
DB_PASSWORD=your_password
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_TOPIC=climate-topic
# Flask Configuration
FLASK_HOST=127.0.0.1
FLASK_PORT=5000
FLASK_DEBUG=Trueclimate-data-streaming-dashboard/
├── 📁 src/
│ ├── producer.py # Kafka producer for climate data
│ ├── consumer.py # Kafka consumer with MySQL integration
│ └── app.py # Flask web application
├── 📁 templates/
│ └── index.html # Dashboard HTML template
├── 📁 static/
│ ├── css/
│ │ └── style.css # Dashboard styling
│ └── js/
│ └── dashboard.js # Frontend JavaScript
├── 📁 config/
│ └── database.py # Database configuration
├── requirements.txt # Python dependencies
├── .env.example # Environment variables template
├── docker-compose.yml # Docker setup (optional)
└── README.md # This file
python consumer.pyThis process listens to Kafka and inserts data into MySQL
python producer.pySends sample climate data to Kafka topic
python app.pyAccess the dashboard at http://127.0.0.1:5000
- Real-time Updates: Auto-refreshes every 5 seconds
- Latest Data: Shows the most recent 10 climate records
- Interactive UI: Clean, responsive design
- Data Visualization: Temperature trends and geographical distribution
- Modify
producer.pyto connect to your data source - Ensure data format matches the expected schema:
{ "state": "California", "latitude": 34.0522, "longitude": -118.2437, "datetime": "2024-01-15T10:30:00", "temperature": 22.5 }
- Add new visualizations in
templates/index.html - Implement AJAX for smoother updates
- Add filtering and search capabilities
- Include additional weather metrics
- Kafka Partitions: Increase partitions for higher throughput
- Database Indexing: Optimize queries with proper indexes
- Connection Pooling: Use connection pools for database operations
- Caching: Implement Redis for frequently accessed data
# Build and run with Docker Compose
docker-compose up --build
# Run in detached mode
docker-compose up -d- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
Kafka Connection Error
# Check if Kafka is running
netstat -an | grep 9092MySQL Connection Error
# Verify MySQL service
systemctl status mysqlDashboard Not Loading
- Check if Flask is running on the correct port
- Verify database connection
- Ensure consumer is processing messages
For issues and questions:
- 📧 Email: [razaqshaik03@gmail.com]
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
Built with ❤️ for real-time data processing
⭐ Star this repository if you found it helpful!