This project demonstrates a real-time streaming pipeline that simulates stock market data, processes it with Kafka, stores it in AWS S3, and queries it using AWS Athena.
+-------------+ +-------------+ +-------------+ +-------------+
| Stock Data | ---> | Kafka | ---> | Consumer | ---> | S3 Bucket |
| Simulator | | (Broker) | | (Python) | | (CSV/Parquet)|
+-------------+ +-------------+ +-------------+ +-------------+
|
v
+-------------+
| AWS Athena |
+-------------+
real-time-stock-data-pipeline/
├── kafka/
│ ├── kafka_installation.md # Kafka + Zookeeper setup guide
│ ├── start_kafka.sh # Script to start Kafka and Zookeeper
│ └── create_topic.sh # Script to create Kafka topic
│
├── producer/
│ └── producer.py # Simulates stock data and sends to Kafka
│
├── consumer/
│ └── consumer_to_s3.py # Consumes Kafka data and uploads to S3
│
├── aws/
│ ├── s3_bucket_setup.md # How to create an S3 bucket
│ ├── athena_query.sql # SQL to create Athena table
│ └── iam_policy.json # IAM policy for S3 and Athena access
│
├── data/
│ └── stock_data.csv # Sample CSV for local testing
│
├── scripts/
│ ├── requirements.txt # Python dependencies
│ └── setup_env.sh # Optional env setup script
│
├── docs/
│ └── architecture_diagram.png # Visual overview of the architecture
│
├── .gitignore # Ignore __pycache__, .env, etc.
├── README.md # This documentation
└── LICENSE # Project license (MIT)
sudo apt update && sudo apt upgrade -y
sudo apt install default-jdk -y
wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xvzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
# Start Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Broker
bin/kafka-server-start.sh config/server.propertiesbin/kafka-topics.sh --create \
--topic stock-data \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1from kafka import KafkaProducer
import json, time, random
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
tickers = ['AAPL', 'GOOG', 'TSLA', 'MSFT']
while True:
data = {
'ticker': random.choice(tickers),
'price': round(random.uniform(100, 1500), 2),
'timestamp': time.time()
}
producer.send('stock-data', value=data)
print("Produced:", data)
time.sleep(1)from kafka import KafkaConsumer
import json, boto3, csv, os
from datetime import datetime
consumer = KafkaConsumer(
'stock-data',
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
s3 = boto3.client('s3')
bucket_name = 'your-s3-bucket-name'
local_file = 'stock_data.csv'
if not os.path.exists(local_file):
with open(local_file, 'w', newline='') as f:
csv.writer(f).writerow(['ticker', 'price', 'timestamp'])
for message in consumer:
data = message.value
with open(local_file, 'a', newline='') as f:
csv.writer(f).writerow([data['ticker'], data['price'], datetime.fromtimestamp(data['timestamp'])])
s3.upload_file(local_file, bucket_name, f'stock_data/{datetime.now().isoformat()}.csv')
print("Uploaded:", data)- Create a new S3 bucket
- Name it
your-s3-bucket-name - Allow write permissions via IAM
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::your-s3-bucket-name/*"]
}
]
}CREATE EXTERNAL TABLE stock_data (
ticker string,
price double,
timestamp string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"'
)
LOCATION 's3://your-s3-bucket-name/stock_data/'
TBLPROPERTIES ('has_encrypted_data'='false');kafka-python
boto3
python3 -m venv venv
source venv/bin/activate
pip install -r scripts/requirements.txt- Stream data using Apache Avro or Parquet for better efficiency
- Integrate AWS Glue for schema discovery
- Visualize data using Amazon QuickSight or Streamlit
- Add monitoring/logging with Prometheus + Grafana
- Dockerize the whole system for reproducibility
This project is licensed under the MIT License.