|
1 | | -# import json |
2 | | -# from enum import Enum |
3 | | -# from typing import Optional, List |
4 | | -# |
5 | | -# from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator |
6 | | -# from airflow.providers.standard.operators.python import PythonOperator |
7 | | -# from kubernetes.client import V1EnvFromSource, V1SecretReference |
8 | | -# |
9 | | -# |
10 | | -# class GaiaflowMode(Enum): |
11 | | -# DEV = "dev" |
12 | | -# PROD_LOCAL = "prod_local" |
13 | | -# PROD = "prod" |
14 | | -# |
15 | | -# |
16 | | -# # TASK_REGISTRY = { |
17 | | -# # GaiaflowMode.DEV: _create_python_task, |
18 | | -# # GaiaflowMode.PROD: _create_kubernetes_task, |
19 | | -# # GaiaflowMode.PROD_LOCAL: _create_kubernetes_task, |
20 | | -# # } |
21 | | -# |
22 | | -# |
23 | | -# class XComConfig: |
24 | | -# def __init__(self, task: str, key: str = "return_value"): |
25 | | -# self.task = task |
26 | | -# self.key = key |
27 | | -# |
28 | | -# def to_dict(self) -> dict: |
29 | | -# return {"task": self.task, "key": self.key} |
30 | | -# |
31 | | -# def is_xcom_config_dict(val): |
32 | | -# return isinstance(val, dict) and "task" in val and "key" in val |
33 | | -# |
34 | | -# |
35 | | -# def create_task( |
36 | | -# task_id: str, |
37 | | -# func_path: str, |
38 | | -# func_kwargs: dict = None, |
39 | | -# func_args: list = None, |
40 | | -# image: str = None, |
41 | | -# env: str = "dev", |
42 | | -# func_args_from_tasks: dict = None, |
43 | | -# func_kwargs_from_tasks: dict = None, |
44 | | -# secrets: list = None, |
45 | | -# env_vars: dict = None, |
46 | | -# retries: int = 3, |
47 | | -# dag = None, |
48 | | -# params = None, |
49 | | -# ): |
50 | | -# """ |
51 | | -# Create an Airflow task that can run in different environments. |
52 | | -# |
53 | | -# Args: |
54 | | -# task_id: Unique task identifier |
55 | | -# func_path: Python import path to the function (e.g.,'my.module:myfunction') |
56 | | -# func_args: Static positional arguments to pass to the function |
57 | | -# func_kwargs: Static keyword arguments to pass to the function |
58 | | -# image: Docker image for production and prod-like environments |
59 | | -# env: GaiaflowMode to run in ('dev', 'prod_local', 'prod') |
60 | | -# func_kwargs_from_tasks: Dynamic keyword arguments from other tasks via XCom |
61 | | -# secrets: List of Kubernetes secrets to mount |
62 | | -# env_vars: GaiaflowMode variables for the task for production and prod-like environments |
63 | | -# retries: Number of retries on failure |
64 | | -# func_args_from_tasks: |
65 | | -# """ |
66 | | -# |
67 | | -# try: |
68 | | -# environment = GaiaflowMode(env) |
69 | | -# except ValueError: |
70 | | -# raise ValueError( |
71 | | -# f"env must be one of {[e.value for e in GaiaflowMode]}, got '{env}'" |
72 | | -# ) |
73 | | -# |
74 | | -# if func_kwargs is None: |
75 | | -# func_kwargs = {} |
76 | | -# |
77 | | -# if func_args is None: |
78 | | -# func_args = [] |
79 | | -# |
80 | | -# if env_vars is None: |
81 | | -# env_vars = {} |
82 | | -# |
83 | | -# if dag and hasattr(dag, "params"): |
84 | | -# dag_params = dag.params or {} |
85 | | -# else: |
86 | | -# dag_params = {} |
87 | | -# |
88 | | -# combined_params = {**dag_params, **(params or {})} |
89 | | -# # if combined_params: |
90 | | -# # func_kwargs["params"] = combined_params |
91 | | -# |
92 | | -# func_kwargs_from_tasks_dict = {} |
93 | | -# if func_kwargs_from_tasks: |
94 | | -# func_kwargs_from_tasks_dict = { |
95 | | -# k: (v if is_xcom_config_dict(v) else XComConfig( |
96 | | -# v).to_dict()) |
97 | | -# for k, v in func_kwargs_from_tasks.items() |
98 | | -# } |
99 | | -# |
100 | | -# # func_args_from_tasks_dict = {} |
101 | | -# # if func_args_from_tasks: |
102 | | -# # func_args_from_tasks_dict = { |
103 | | -# # k: (v if is_xcom_config_dict(v) else XComConfig(v).to_dict()) |
104 | | -# # for k, v in func_args_from_tasks.items() |
105 | | -# # } |
106 | | -# |
107 | | -# |
108 | | -# if environment == GaiaflowMode.DEV: |
109 | | -# return _create_python_task( |
110 | | -# task_id=task_id, |
111 | | -# func_path=func_path, |
112 | | -# func_args=func_args, |
113 | | -# func_kwargs=func_kwargs, |
114 | | -# func_kwargs_from_tasks=func_kwargs_from_tasks_dict, |
115 | | -# func_args_from_tasks=func_args_from_tasks, |
116 | | -# xcom_push=True, |
117 | | -# retries=retries, |
118 | | -# params=combined_params, |
119 | | -# ) |
120 | | -# |
121 | | -# elif environment in [GaiaflowMode.PROD, GaiaflowMode.PROD_LOCAL]: |
122 | | -# if not image: |
123 | | -# raise ValueError(f"Docker image is required for {env} environment") |
124 | | -# |
125 | | -# return _create_kubernetes_task( |
126 | | -# task_id=task_id, |
127 | | -# func_path=func_path, |
128 | | -# func_args=func_args, |
129 | | -# func_kwargs=func_kwargs, |
130 | | -# func_kwargs_from_tasks=func_kwargs_from_tasks_dict, |
131 | | -# func_args_from_tasks=func_args_from_tasks, |
132 | | -# image=image, |
133 | | -# secrets=secrets, |
134 | | -# env_vars=env_vars, |
135 | | -# xcom_push=True, |
136 | | -# retries=retries, |
137 | | -# in_cluster=(environment == GaiaflowMode.PROD), |
138 | | -# params=combined_params |
139 | | -# ) |
140 | | -# |
141 | | -# def _create_python_task( |
142 | | -# task_id: str, |
143 | | -# func_path: str, |
144 | | -# func_args: list, |
145 | | -# func_kwargs: dict, |
146 | | -# func_args_from_tasks: dict, |
147 | | -# func_kwargs_from_tasks: dict, |
148 | | -# xcom_push: bool, |
149 | | -# retries: int, |
150 | | -# params: dict |
151 | | -# ) -> PythonOperator: |
152 | | -# from .runner import run |
153 | | -# |
154 | | -# full_kwargs = { |
155 | | -# "func_path": func_path, |
156 | | -# "args": func_args, |
157 | | -# "kwargs": func_kwargs, |
158 | | -# } |
159 | | -# |
160 | | -# if func_kwargs_from_tasks: |
161 | | -# full_kwargs["xcom_pull_kwargs"] = func_kwargs_from_tasks |
162 | | -# |
163 | | -# if func_args_from_tasks: |
164 | | -# full_kwargs["xcom_pull_args"] = func_args_from_tasks |
165 | | -# |
166 | | -# return PythonOperator( |
167 | | -# task_id=task_id, |
168 | | -# python_callable=run, |
169 | | -# op_kwargs=full_kwargs, |
170 | | -# do_xcom_push=xcom_push, |
171 | | -# retries=retries, |
172 | | -# params=params |
173 | | -# ) |
174 | | -# |
175 | | -# def _inject_params_as_env_vars(params: dict[str, str]) -> dict[str, str]: |
176 | | -# return { |
177 | | -# f"PARAMS_{k.upper()}": f"{{{{ params.{k} }}}}" |
178 | | -# for k in params |
179 | | -# } |
180 | | -# |
181 | | -# def _create_kubernetes_task( |
182 | | -# task_id: str, |
183 | | -# func_path: str, |
184 | | -# func_args: list, |
185 | | -# func_kwargs: dict, |
186 | | -# func_kwargs_from_tasks: dict, |
187 | | -# func_args_from_tasks: dict, |
188 | | -# image: str, |
189 | | -# secrets: Optional[List[str]], |
190 | | -# env_vars: dict, |
191 | | -# xcom_push: bool, |
192 | | -# retries: int, |
193 | | -# in_cluster: bool, |
194 | | -# params: dict, |
195 | | -# ) -> KubernetesPodOperator: |
196 | | -# |
197 | | -# if func_kwargs_from_tasks: |
198 | | -# xcom_kwargs_pull_results = _build_xcom_templates(func_kwargs_from_tasks) |
199 | | -# else: |
200 | | -# xcom_kwargs_pull_results= {} |
201 | | -# if func_args_from_tasks: |
202 | | -# xcom_args_pull_results = _build_args_xcom_templates(func_args_from_tasks) |
203 | | -# else: |
204 | | -# xcom_args_pull_results = {} |
205 | | -# |
206 | | -# task_env_vars = _build_env_vars( |
207 | | -# func_path=func_path, |
208 | | -# func_args=func_args, |
209 | | -# func_kwargs=func_kwargs, |
210 | | -# func_args_from_tasks=func_args_from_tasks, |
211 | | -# func_kwargs_from_tasks=func_kwargs_from_tasks, |
212 | | -# xcom_args_pull_results=xcom_args_pull_results, |
213 | | -# xcom_kwargs_pull_results=xcom_kwargs_pull_results, |
214 | | -# custom_env_vars=env_vars, |
215 | | -# ) |
216 | | -# |
217 | | -# env_from = _build_env_from_secrets(secrets) if secrets else None |
218 | | -# |
219 | | -# return KubernetesPodOperator( |
220 | | -# task_id=task_id, |
221 | | -# image=image, |
222 | | -# cmds=["python", "-m", "gaiaflow.core.runner"], |
223 | | -# env_vars={**_inject_params_as_env_vars(params), **task_env_vars}, |
224 | | -# params=params, |
225 | | -# env_from=env_from, |
226 | | -# get_logs=True, |
227 | | -# is_delete_operator_pod=True, |
228 | | -# log_events_on_failure=True, |
229 | | -# in_cluster=in_cluster, |
230 | | -# do_xcom_push=xcom_push, |
231 | | -# retries=retries, |
232 | | -# ) |
233 | | -# |
234 | | -# |
235 | | -# def _build_xcom_templates(func_from_tasks: dict) -> dict: |
236 | | -# xcom_pull_results = {} |
237 | | -# |
238 | | -# for arg_key, pull_config in func_from_tasks.items(): |
239 | | -# source_task = pull_config["task"] |
240 | | -# key = pull_config.get("key", "return_value") |
241 | | -# xcom_pull_results[source_task] = ( |
242 | | -# "{{ ti.xcom_pull(task_ids='" + source_task + "', key='" + key + "') }}" |
243 | | -# ) |
244 | | -# |
245 | | -# return xcom_pull_results |
246 | | -# |
247 | | -# def _build_args_xcom_templates(func_from_tasks: dict) -> dict: |
248 | | -# xcom_pull_results = {} |
249 | | -# |
250 | | -# for arg_key, pull_config in func_from_tasks.items(): |
251 | | -# source_task = pull_config["task"] |
252 | | -# |
253 | | -# key = "return_value" |
254 | | -# xcom_pull_results[source_task] = ( |
255 | | -# "{{ ti.xcom_pull(task_ids='" + source_task + "', key='" + key + "') }}" |
256 | | -# ) |
257 | | -# |
258 | | -# return xcom_pull_results |
259 | | -# |
260 | | -# |
261 | | -# def _build_env_vars( |
262 | | -# func_path: str, |
263 | | -# func_kwargs: dict, |
264 | | -# func_args: list, |
265 | | -# func_args_from_tasks: dict, |
266 | | -# func_kwargs_from_tasks: dict, |
267 | | -# xcom_kwargs_pull_results: dict, |
268 | | -# xcom_args_pull_results: dict, |
269 | | -# custom_env_vars: dict, |
270 | | -# ) -> dict: |
271 | | -# print("DEBUG: xcom_args_pull_results:", xcom_args_pull_results) |
272 | | -# print("DEBUG: xcom_kwargs_pull_results:", xcom_kwargs_pull_results) |
273 | | -# |
274 | | -# default_env_vars = { |
275 | | -# "FUNC_PATH": func_path, |
276 | | -# "FUNC_ARGS": json.dumps(func_args), |
277 | | -# "FUNC_KWARGS": json.dumps(func_kwargs), |
278 | | -# "XCOM_PULL_ARGS": json.dumps(func_args_from_tasks), |
279 | | -# "XCOM_PULL_KWARGS": json.dumps(func_kwargs_from_tasks), |
280 | | -# "XCOM_PULL_ARGS_RESULTS": json.dumps(xcom_args_pull_results), |
281 | | -# "XCOM_PULL_KWARGS_RESULTS": json.dumps(xcom_kwargs_pull_results), |
282 | | -# "ENV": "prod", |
283 | | -# } |
284 | | -# |
285 | | -# print( |
286 | | -# "DEBUG: XCOM_PULL_ARGS_RESULTS env var:", |
287 | | -# default_env_vars["XCOM_PULL_ARGS_RESULTS"], |
288 | | -# ) |
289 | | -# print( |
290 | | -# "DEBUG: XCOM_PULL_KWARGS_RESULTS env var:", |
291 | | -# default_env_vars["XCOM_PULL_KWARGS_RESULTS"], |
292 | | -# ) |
293 | | -# final_env_vars = {**default_env_vars, **custom_env_vars} |
294 | | -# |
295 | | -# return final_env_vars |
296 | | -# |
297 | | -# |
298 | | -# def _build_env_from_secrets(secrets: List[str]) -> List[V1EnvFromSource]: |
299 | | -# return [ |
300 | | -# V1EnvFromSource(secret_ref=V1SecretReference(name=secret)) for secret in secrets |
301 | | -# ] |
302 | | - |
303 | | - |
304 | 1 | from enum import Enum |
305 | 2 |
|
306 | 3 | from .operators import (DevTaskOperator, DockerTaskOperator, |
|
0 commit comments