-
Notifications
You must be signed in to change notification settings - Fork 34
Description
การติดตั้ง Airflow (DevOps/CI/CD): Freelancer ต้องมั่นใจว่า Airflow Environment บน GCP (Cloud Composer หรือ VM) ถูกตั้งค่าและสามารถติดตั้ง Dependencies ที่จำเป็นได้
การเชื่อมต่อ Airflow และ Code: ต้องกำหนดวิธีการที่ Airflow จะเข้าถึงโค้ดในโฟลเดอร์ connectors/ และ etl_transforms/ (ส่วนใหญ่มักจะใช้ Docker Image หรือ Volume Sharing)
สร้างโค้ด Transform จริง: สร้างไฟล์ใน etl_transforms/data_cleaner.py เพื่อใช้ Pandas ในการทำความสะอาดและจัดรูปแบบข้อมูลให้เป็นมาตรฐานเดียวกัน ก่อนโหลดเข้า BigQueryfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
--- 1. CONFIGURATION ---
กำหนด Argument พื้นฐานของ DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# สำหรับการดึงข้อมูลแบบ Incremental:
'start_date': datetime(2025, 12, 1),
}
--- 2. IMPORT CONNECTOR & TRANSFORM SCRIPTS ---
ต้อง Import ฟังก์ชันหลักจากไฟล์ที่ Freelancer สร้างไว้ใน connectors/ และ etl_transforms/
(ต้องมั่นใจว่า path ใน Airflow สามารถเข้าถึงโค้ดเหล่านี้ได้)
สมมติว่าไฟล์ connector และ transform ถูก Import เข้ามาใน Airflow Environment แล้ว
from connectors.shopee_connector import get_shopee_orders
from connectors.lazada_connector import get_lazada_orders
from etl_transforms.data_cleaner import clean_and_transform
from etl_transforms.bigquery_loader import load_to_bigquery # ฟังก์ชันสำหรับ Task 1.3
--- 3. DEFINE DAG ---
with DAG(
dag_id='marketplace_data_ingestion_v1',
default_args=default_args,
description='ETL pipeline for Shopee, Lazada, and TikTok data ingestion',
schedule_interval=timedelta(hours=24), # รันทุกวัน
catchup=False,
tags=['etl', 'data_ingestion', 'bigquery'],
) as dag:
# === TASK GROUP 1: DATA EXTRACTION (Task 1.1) ===
t1_shopee = PythonOperator(
task_id='extract_shopee_orders',
python_callable=get_shopee_orders,
# ต้องกำหนด op_kwargs เพื่อส่ง parameter (เช่น วันที่เริ่มต้น/สิ้นสุด) ไปให้ฟังก์ชัน
op_kwargs={'start_date': '{{ ds }}', 'end_date': '{{ next_ds }}'},
)
t1_lazada = PythonOperator(
task_id='extract_lazada_orders',
python_callable=get_lazada_orders,
op_kwargs={'start_date': '{{ ds }}', 'end_date': '{{ next_ds }}'},
)
# ... เพิ่ม t1_tiktok และ t1_jsterp ...
# === TASK GROUP 2: DATA TRANSFORMATION (Task 1.2) ===
t2_clean_data = PythonOperator(
task_id='clean_and_transform_raw_data',
# ฟังก์ชันนี้จะดึง Raw Data จาก GCS (ที่ Task 1.1 บันทึกไว้) มาประมวลผลด้วย Pandas
python_callable=clean_and_transform,
op_kwargs={'execution_date': '{{ ds }}'},
)
# === TASK GROUP 3: DATA LOADING (Task 1.3) ===
t3_load_to_dwh = PythonOperator(
task_id='load_transformed_data_to_bigquery',
# ฟังก์ชันนี้จะใช้ Airflow BigQuery Operator หรือโค้ด Python เพื่อโหลดเข้า BigQuery
python_callable=load_to_bigquery,
op_kwargs={'target_table': 'marketplace_orders_cleaned'},
)
# --- 4. DEFINE WORKFLOW ORDER ---
# กำหนดลำดับการทำงาน
# 1. ดึงข้อมูลทั้งหมดพร้อมกัน
[t1_shopee, t1_lazada] >> t2_clean_data
# 2. ทำความสะอาดข้อมูลเสร็จแล้วจึงโหลดเข้า DWH
t2_clean_data >> t3_load_to_dwh