A serverless ETL pipeline for fetching, storing, and processing currency exchange rates from Open Exchange Rates API to Snowflake, using AWS Lambda and S3.
This project provides an automated solution for retrieving real-time currency exchange rates and storing them in a structured format in Snowflake. The pipeline involves:
- Fetching exchange rates from the Open Exchange Rates API via AWS Lambda
- Storing the raw JSON response in an S3 bucket
- Processing and loading the data into Snowflake tables for analysis
- AWS Lambda: Executes the main ETL process on a schedule
- Amazon S3: Stores raw exchange rate data as JSON files
- AWS Secrets Manager: Securely manages database credentials
- Snowflake: Stores and processes the exchange rate data
- AWS Account with appropriate permissions
- Snowflake account with database creation privileges
- Open Exchange Rates API key (https://openexchangerates.org/)
- Python 3.9
-
Create an IAM role with the following permissions:
- S3FullAccess
- Lambda execution role
- EventBridge access
- Lambda full access
-
Create a Secrets Manager secret with the ID
db/currency-exchange-ratecontaining Snowflake credentials:{ "fusion_snowflake": { "username": "your_username", "password": "your_password", "account_name": "your_snowflake_account" } } -
Create an S3 bucket for storing the raw exchange rate data
- Run the SQL scripts in
code/snowflake.sqlto:- Create the
CURRENCY_DBdatabase andCURRENCYschema - Create the necessary tables (
EXCHANGE_RATES_RAW,EXCHANGE_RATES_STG,EXCHANGE_RATES) - Create the stored procedure for data processing
- Create the
-
Create a new Lambda function using Python 3.8+
-
Upload the code from
code/lambda_function.pyandcode/snowflake_provider.py -
Set up the following environment variables:
environment: DEVoer_app_id: YOUR-APP-KEYoer_base_currency: USDoer_base_url: https://openexchangerates.org/api/latest.jsonregion_name: us-east-1s3_bucket_name: --------snowflake_db: CURRENCY_DBsnowflake_role: ACCOUNTADMINsnowflake_wh: COMPUTE_WH
-
Configure an EventBridge (CloudWatch Events) trigger to run the Lambda function on your desired schedule
.
├── code/
│ ├── lambda_function.py # Main AWS Lambda function
│ ├── snowflake_provider.py # Snowflake connection and query utility
│ └── snowflake.sql # SQL scripts for Snowflake setup
├── environment-variables.txt # Required environment variables
├── roles.txt # Required AWS IAM roles
├── secret-manager.txt # Instructions for secrets management
└── convention.txt # Project conventions and guidelines
- The Lambda function is triggered on a schedule
- It fetches the latest exchange rates from the Open Exchange Rates API
- The raw JSON response is stored in the S3 bucket with a structured path:
exchange_rates/{year}/{month}/{day}/exchange-rates-{hour}.json - The data is inserted into the
EXCHANGE_RATES_RAWtable in Snowflake - A stored procedure (
SP_EXCHANGE_RATE_LOADING) processes the raw data:- Extracts relevant information into the staging table
- Merges the staging data into the final
EXCHANGE_RATEStable
- AWS CloudWatch can be used to monitor Lambda function executions
- Snowflake provides query history and usage monitoring
- Check the Snowflake tables regularly to ensure data is being loaded correctly