- Install
Airflow,MySQL, andMongoDBon Google Cloud VM - Create a
GCS bucket - Create two BigQuery datasets: a
staging datasetfor data preparation and processing, and afinal datasetready for analysis or deployment - Set up a Google Cloud connection for
Airflow - Configure
Airflow SMTPto send alert emails when a task failed
- Flow: Extract data => Migrate data => Load data to the data staging area => Transform and load data => Create data mart
- DAG
- Run at 7 AM every day
- Retry 3 times, each time 5 minutes apart
- Send an alert email when a task failed
- Extract
newegg-datatable fromscraped_datadatabase inMySQLtonewegg_data.csvfile: extract-newegg-data - Extract
tiki-datacollection fromscraped_datadatabase inMongoDBto atiki_data.jsonfile: extract-tiki-data- Use
sedcommand to removeHTMLtags in theJSONfile:sed -E 's/<[^>]*>//g'
- Use
- Migrate
newegg_data.csv(migrate-Newegg-data) andtiki_data.json(migrate-Tiki-data) file to aGCS bucket
- Load
newegg_data.csvandtiki_data.jsonfile from theGCS bucketto thedata staging areainBigQuery
- Transform tiki-data
SELECT
*
, (price * all_time_quantity_sold) total_revenue -- find total revenue of each product
, DATE_SUB(CURRENT_DATE(), INTERVAL day_ago_created DAY) created_date -- create the created_date column
FROM `project_id.staging_warehouse.tiki_data`
WHERE stock_item.qty is not null; -- get only products still in stock
- Load data from the
data staging areato a database inBigQuery
-
Create a data mart for other teams to use
- Tiki-data:
WITH product_origin AS ( SELECT h.id , h.categories.name category , a.value origin FROM `project_id.scraped_data.tiki_data` h , UNNEST(specifications) s , UNNEST(s.attributes) a WHERE a.name = 'Xuất xứ' ) SELECT t.id , t.categories.name category , t.current_seller.name seller_name , t.all_time_quantity_sold , t.price , t.rating_average , p.origin FROM `project_id.scraped_data.tiki_data` t LEFT JOIN product_origin p ON t.id = p.id;- Newegg-data:
SELECT itemID , brand , total_price , rating FROM `project_id.scraped_data.newegg_data`;