ETL pipeline for crypto data built with Python.
About • Project Structure • Installation & Usage • Volume Conversion Notes
This repository hosts a Python-based ETL pipeline that collects cryptocurrency spot data (candlesticks and general info on instruments) from multiple exchanges. The pipeline supports:
- Initial Load: Full ingestion of all available data (limited by API method's restrictions; hardcoded date is 2025-01-01)
- Incremental Load: Updating only with newly (T-2) available data
- Custom Load: Updating data specified by starting date upon request
Once the raw data is collected, it’s transformed using pandas, then loaded into a Postgres database (via SQLAlchemy). From there, you can explore and visualize the results via a connected Superset dashboard.
This project is organized into clear sections that reflect the ETL workflow:
-
Data Collection (
exchange.py)- A set of classes that interface with different crypto exchanges
- Each class implements methods to:
- Connect to an exchange’s public REST API
- Fetch the latest kline (candlestick) data and instrument info details
- List of Exchanges:
Bybit()Binance()Gateio()Kraken()Okx()
-
Data Transformation & Loading (
raw_etl.py)- Classes that handle interaction between python and Postgres DB via SQLAlchemy
RawETLoaderstores methods for insertion and reading (and transforming) RAW dataDmETLoaderstores methods implemention UPSERT strategy when loading transformed data to normalized structures
- Classes that handle interaction between python and Postgres DB via SQLAlchemy
-
Database files (
ddl.sql)- DDL contructions for schemas and table are provided.
- RAW layer: Contains
exchange_api_klineandexchange_api_instrument_info- Both tables store raw data (API responses) in a JSONB field, are insert-only, and act as a data lake for the project
- DM layer: Consists of
dim_coin,tfct_coin, andtfct_exchange_rate.- They support an UPSERT data manipulation strategy:
- Rows unmatched by a primary key in the source are inserted into the target.
- Matched rows update in the target only if their insertion timestamp is greater
- They support an UPSERT data manipulation strategy:
Prerequisites
- Docker
-
Clone or Download the repository:
git clone https://github.com/butriman/bhft_test_task.git cd bhft_test_task -
Build and run containers:
docker compose build docker compose up -d postgres superset
Make sure there are no Errors in logs (dashboard-import error is acceptable)
-
Launch ETL-module using
docker compose run python-scripts python main.py -m initial
or
docker compose run python-scripts python main.py -m incremental
or (in case of loading particular exchanges' data)
docker compose run python-scripts python main.py -e Bybit Binance
Overall options:
-m [{initial,incremental,custom}] -d [START_DT] -e [{Bybit,Binance,Gateio,Kraken,Okx} ...] -
After script finishes, go to Superset UI http://127.0.0.1:8088/ and log in using superset (both login and pass). In case of failed dashboard import via CLI, use UI import to add config /dashboards/dashboard_spot_trade.zip
-
Superset Dashboard
Spot Trade Dashboard ready to explore
There is a block in the project aimed at converting the volume amount (the primary metric) to USDT.
Currently, this functionality is not fully implemented according to the original request. For now, it only takes spot pairs with a quote coin ≠ USDT and uses the quote coin/USDT instrument (or USDT/quote coin if the first is not found) with a volume-weighted average price as the exchange rate. For pairs not found using this logic, there is a USDT vs Quote table in the Superset dashboard, allowing you to filter out such pairs by clicking on the in_USDT value (via dashboard cross-filtering).
For future development:
Fix volume conversionContainerize the project- Julia port
- Use .env files to avoid hardcoding connection strings and credentials