@@ -38,14 +38,12 @@ spec:
38
38
gracefulShutdownTimeout : 30s
39
39
roleGroups :
40
40
default :
41
- envOverrides :
41
+ envOverrides : &envOverrides
42
42
AIRFLOW__CORE__DAGS_FOLDER : " /dags"
43
43
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER : " kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
44
44
replicas : 1
45
45
kubernetesExecutors :
46
- envOverrides :
47
- AIRFLOW__CORE__DAGS_FOLDER : " /dags"
48
- AIRFLOW_CONN_KUBERNETES_IN_CLUSTER : " kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
46
+ envOverrides : *envOverrides
49
47
schedulers :
50
48
config :
51
49
gracefulShutdownTimeout : 30s
57
55
limit : 1Gi
58
56
roleGroups :
59
57
default :
60
- envOverrides :
61
- AIRFLOW__CORE__DAGS_FOLDER : " /dags"
62
- AIRFLOW_CONN_KUBERNETES_IN_CLUSTER : " kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
58
+ envOverrides : *envOverrides
63
59
replicas : 1
64
60
---
65
61
apiVersion : v1
@@ -83,11 +79,42 @@ data:
83
79
tags=['example'],
84
80
params={},
85
81
) as dag:
86
-
87
82
run_this = BashOperator(
88
83
task_id='run_every_minute',
89
84
bash_command='date',
90
85
)
86
+ dag_factory.py : |
87
+ from airflow import DAG
88
+ from airflow.operators.empty import EmptyOperator
89
+ from datetime import datetime, timedelta
90
+
91
+ # Number of DAGs to generate
92
+ NUM_DAGS = 10 # Increase for more stress
93
+ DAG_PREFIX = "stress_dag_"
94
+
95
+ default_args = {
96
+ 'owner': 'airflow',
97
+ 'start_date': datetime(2025, 1, 1),
98
+ 'retries': 1,
99
+ 'retry_delay': timedelta(seconds=5),
100
+ }
101
+
102
+ def create_dag(dag_id):
103
+ with DAG(
104
+ dag_id=dag_id,
105
+ default_args=default_args,
106
+ schedule=None,
107
+ catchup=False,
108
+ tags=["stress_test"],
109
+ ) as dag:
110
+ start = EmptyOperator(task_id='start')
111
+ end = EmptyOperator(task_id='end')
112
+ start >> end
113
+ return dag
114
+
115
+ for i in range(NUM_DAGS):
116
+ dag_id = f"{DAG_PREFIX}{i:04d}"
117
+ globals()[dag_id] = create_dag(dag_id)
91
118
pyspark_pi.py : |
92
119
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
93
120
from datetime import datetime, timedelta
@@ -319,4 +346,3 @@ stringData:
319
346
connections.secretKey : " {{ airflowSecretKey }}"
320
347
connections.sqlalchemyDatabaseUri : postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow
321
348
connections.celeryResultBackend : db+postgresql://airflow:airflow@postgresql-airflow/airflow
322
- connections.celeryBrokerUrl : redis://:airflow@redis-airflow-master:6379/0
0 commit comments