|
4 | 4 |
|
5 | 5 | # NOTE: Please delete all these comments once you have understood how to use me. |
6 | 6 |
|
7 | | -import os |
8 | | -import sys |
9 | 7 | from datetime import datetime |
10 | 8 |
|
11 | 9 | from airflow import DAG |
12 | 10 | from airflow.utils.task_group import TaskGroup |
13 | | - |
14 | | -# Please do not remove the following line. |
15 | | -sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
16 | | -from task_factory import task_factory |
| 11 | +from gaiaflow.core.create_task import create_task |
| 12 | +from gaiaflow.core.operators import FromTask |
17 | 13 |
|
18 | 14 | # We use `task_factory`, a wrapper developed at BC on top of Airflow operators, to |
19 | 15 | # make it easy for users to create DAGs and switch between different environments. |
|
24 | 20 | # https://github.com/bcdev/gaiaflow/issues |
25 | 21 |
|
26 | 22 | # Define the environment here. It can either be `dev`, `prod` or `prod_local`. |
27 | | -ENVIRONMENT = "dev" |
| 23 | +MODE = "dev" |
28 | 24 |
|
29 | 25 | # TODO (User Action Required): |
30 | 26 | # Please look for change me's below and update them as needed. |
|
93 | 89 | # it, but of course you can. |
94 | 90 | with TaskGroup(group_id="change_group_id", |
95 | 91 | tooltip="Change what appears in the tooltip") as trainer: |
96 | | - preprocess = task_factory( |
| 92 | + preprocess = create_task( |
97 | 93 | # Unique ID of the task. Feel free to change it. |
98 | 94 | task_id="preprocess_data", |
99 | 95 | # This argument expects the path to your function that you want |
100 | 96 | # to execute. It should be available in the __init__.py of your package. |
101 | | - func_path="{{ cookiecutter.package_name }}.preprocess", |
| 97 | + func_path="{{ cookiecutter.package_name }}:preprocess", |
102 | 98 | # This argument expects that you provide all the arguments that your |
103 | 99 | # function as defined in `func_path` expects. |
104 | 100 | # If your function depends on another function (from a different task), you should use |
105 | 101 | # xcom_pull_tasks instead of func_kwargs as shown in the next task which depends on this one. |
106 | 102 | func_kwargs={ |
107 | 103 | "path": "dummmy_path" |
108 | 104 | }, |
109 | | - |
| 105 | + dag=dag, |
110 | 106 | # # For prod_local and prod mode only |
111 | 107 | # You must run the `python minikube_manager.py --build-only`, it will then |
112 | 108 | # create a docker image to run your package with all the dependencies included. |
113 | 109 | # Please update the image name below: |
114 | 110 | # TODO: Talk with Tejas to align on image naming. |
115 | | - image="<your-image-name>", |
| 111 | + # image="<your-image-name>", |
| 112 | + image="gaiaflow_test_pl:v17", |
116 | 113 |
|
117 | 114 | # TODO: Discuss with Tejas about a process for creating secrets |
118 | | - secrets=["my-minio-creds"], |
| 115 | + # secrets=["my-minio-creds"], |
119 | 116 |
|
120 | 117 | # The following argument can be used to pass in environment variables that your |
121 | 118 | # package might need. In the `dev` mode, you can use the .env file to pass your environment |
|
132 | 129 | # fast for testing and development. |
133 | 130 | # The image field is only required when you want to run the DAG in |
134 | 131 | # production (prod) or production-like (prod-like) setting. |
135 | | - env=ENVIRONMENT, |
| 132 | + mode=MODE, |
136 | 133 | ) |
137 | 134 |
|
138 | | - train = task_factory( |
| 135 | + train = create_task( |
139 | 136 | task_id="train", |
140 | | - func_path="{{ cookiecutter.package_name }}.train", |
141 | | - xcom_pull_tasks={ |
142 | | - "preprocessed_path": { |
143 | | - "task": "change_group_id.preprocess_data", |
144 | | - "key": "return_value", |
145 | | - }, |
146 | | - "bucket_name": { |
147 | | - "task": "change_group_id.preprocess_data", |
148 | | - "key": "return_value" |
149 | | - }, |
| 137 | + func_path="{{ cookiecutter.package_name }}:train", |
| 138 | + func_args=[ |
| 139 | + "meow", |
| 140 | + FromTask( |
| 141 | + task="change_group_id.preprocess_data", key="preprocessed_path" |
| 142 | + ), |
| 143 | + ], |
| 144 | + func_kwargs={ |
| 145 | + "preprocessed_path": FromTask( |
| 146 | + task="change_group_id.preprocess_data", key="preprocessed_path" |
| 147 | + ), |
| 148 | + "bucket_name": FromTask( |
| 149 | + task="change_group_id.preprocess_data", key="bucket_name" |
| 150 | + ), |
150 | 151 | }, |
151 | 152 |
|
152 | | - image="<your-image-name>", |
153 | | - secrets=["my-minio-creds"], |
| 153 | + # image="<your-image-name>", |
| 154 | + image="gaiaflow_test_pl:v17>", |
| 155 | + # secrets=["my-minio-creds"], |
154 | 156 | env_vars={ |
155 | 157 | "MLFLOW_TRACKING_URI": f"http://{MINIKUBE_GATEWAY}:5000", |
156 | 158 | "MLFLOW_S3_ENDPOINT_URL": f"http://{MINIKUBE_GATEWAY}:9000", |
157 | 159 | }, |
158 | 160 |
|
159 | | - env=ENVIRONMENT, |
| 161 | + mode=MODE, |
| 162 | + dag=dag, |
160 | 163 | ) |
161 | 164 |
|
162 | 165 | # This bit operator shows the task dependencies. |
163 | 166 | preprocess >> train |
164 | 167 |
|
165 | | - with TaskGroup(group_id="change_me_group_id_2", |
166 | | - tooltip="Change what appears in the tooltip 2") as predictor: |
167 | | - predict = task_factory( |
168 | | - task_id="predict", |
169 | | - func_path="{{ cookiecutter.package_name }}.predict", |
170 | | - # Pull model_uri output from the train task |
171 | | - xcom_pull_tasks={ |
172 | | - "model_uri": { |
173 | | - "task": "change_group_id.train", |
174 | | - "key": "return_value", |
175 | | - }, |
176 | | - }, |
177 | | - image="<your-image-name>", |
178 | | - secrets=["my-minio-creds"], |
179 | | - env_vars={ |
180 | | - "MLFLOW_TRACKING_URI": f"http://{MINIKUBE_GATEWAY}:5000", |
181 | | - "MLFLOW_S3_ENDPOINT_URL": f"http://{MINIKUBE_GATEWAY}:9000", |
182 | | - }, |
183 | | - |
184 | | - env=ENVIRONMENT, |
185 | | - ) |
186 | | - |
187 | | - trainer >> predictor |
| 168 | + # with TaskGroup(group_id="change_me_group_id_2", |
| 169 | + # tooltip="Change what appears in the tooltip 2") as predictor: |
| 170 | + # predict = task_factory( |
| 171 | + # task_id="predict", |
| 172 | + # func_path="{{ cookiecutter.package_name }}.predict", |
| 173 | + # # Pull model_uri output from the train task |
| 174 | + # xcom_pull_tasks={ |
| 175 | + # "model_uri": { |
| 176 | + # "task": "change_group_id.train", |
| 177 | + # "key": "return_value", |
| 178 | + # }, |
| 179 | + # }, |
| 180 | + # image="<your-image-name>", |
| 181 | + # secrets=["my-minio-creds"], |
| 182 | + # env_vars={ |
| 183 | + # "MLFLOW_TRACKING_URI": f"http://{MINIKUBE_GATEWAY}:5000", |
| 184 | + # "MLFLOW_S3_ENDPOINT_URL": f"http://{MINIKUBE_GATEWAY}:9000", |
| 185 | + # }, |
| 186 | + # |
| 187 | + # env=ENVIRONMENT, |
| 188 | + # ) |
| 189 | + # |
| 190 | + # trainer >> predictor |
188 | 191 |
|
189 | 192 |
|
190 | 193 |
|
0 commit comments