-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathals_pipeline_dag.py
More file actions
61 lines (52 loc) · 1.33 KB
/
als_pipeline_dag.py
File metadata and controls
61 lines (52 loc) · 1.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from dag_utils import script_cmd
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2025, 5, 28),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'als_movie_pipeline',
default_args=default_args,
description='Pipeline with ALS-based movie recommendation',
schedule_interval=None,
catchup=False,
)
ingest_imdb = BashOperator(
task_id='ingest_imdb',
bash_command=script_cmd('ingest_imdb_api.py'),
dag=dag,
)
ingest_netflix = BashOperator(
task_id='ingest_netflix',
bash_command=script_cmd('ingest_netflix_api.py'),
dag=dag,
)
format_imdb = BashOperator(
task_id='format_imdb',
bash_command=script_cmd('format_imdb.py'),
dag=dag,
)
format_netflix = BashOperator(
task_id='format_netflix',
bash_command=script_cmd('format_netflix.py'),
dag=dag,
)
als_train = BashOperator(
task_id='als_recommend',
bash_command=script_cmd('als_recommend.py'),
dag=dag,
)
index_als = BashOperator(
task_id='index_als',
bash_command=script_cmd('index_als.py'),
dag=dag,
)
ingest_imdb >> format_imdb
ingest_netflix >> format_netflix
[format_imdb, format_netflix] >> als_train
als_train >> index_als