Skip to content

Commit ccbfcc1

Browse files
adwk67xeniape
andauthored
Airflow demo: add patch file for stress testing (#291)
* add patch file for stress testing * added patch section * updated docs * Update docs/modules/demos/pages/airflow-scheduled-job.adoc Co-authored-by: Xenia <[email protected]> * Update docs/modules/demos/pages/airflow-scheduled-job.adoc Co-authored-by: Xenia <[email protected]> * review feedback comments --------- Co-authored-by: Xenia <[email protected]>
1 parent 0edffd5 commit ccbfcc1

File tree

5 files changed

+130
-25
lines changed

5 files changed

+130
-25
lines changed

docs/modules/demos/pages/airflow-scheduled-job.adoc

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,65 @@ asynchronously - and another to poll the running job to report on its status.
129129

130130
image::airflow-scheduled-job/airflow_11.png[]
131131

132+
== Patching Airflow to stress-test DAG parsing using relevant environment variables
133+
134+
The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI.
135+
This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_).
136+
To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below.
137+
The patch also sets some environment variables that can be used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html[window=_blank].
138+
139+
[source,yaml]
140+
----
141+
---
142+
apiVersion: airflow.stackable.tech/v1alpha1
143+
kind: AirflowCluster
144+
metadata:
145+
name: airflow
146+
spec:
147+
clusterConfig:
148+
volumeMounts:
149+
- name: airflow-dags
150+
mountPath: /dags/dag_factory.py
151+
subPath: dag_factory.py
152+
- name: airflow-dags
153+
mountPath: /dags/date_demo.py
154+
subPath: date_demo.py
155+
- name: airflow-dags
156+
mountPath: /dags/pyspark_pi.py
157+
subPath: pyspark_pi.py
158+
- name: airflow-dags
159+
mountPath: /dags/pyspark_pi.yaml
160+
subPath: pyspark_pi.yaml
161+
webservers:
162+
roleGroups:
163+
default:
164+
envOverrides: &envOverrides
165+
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
166+
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
167+
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
168+
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
169+
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
170+
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
171+
kubernetesExecutors:
172+
envOverrides: *envOverrides
173+
schedulers:
174+
roleGroups:
175+
default:
176+
envOverrides: *envOverrides
177+
----
178+
179+
THe patch can be applied like this:
180+
181+
[source,console]
182+
----
183+
kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml
184+
----
185+
186+
[NOTE]
187+
====
188+
The scheduled job runs every minute and so an instance of it may be running while the scheduler is being re-started as a result of the patch, in which case that instance may fail.
189+
====
190+
132191
== Summary
133192

134193
This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.

stacks/_templates/redis-airflow.yaml

Lines changed: 0 additions & 14 deletions
This file was deleted.

stacks/airflow/airflow.yaml

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,12 @@ spec:
3838
gracefulShutdownTimeout: 30s
3939
roleGroups:
4040
default:
41-
envOverrides:
41+
envOverrides: &envOverrides
4242
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
4343
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
4444
replicas: 1
4545
kubernetesExecutors:
46-
envOverrides:
47-
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
48-
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
46+
envOverrides: *envOverrides
4947
schedulers:
5048
config:
5149
gracefulShutdownTimeout: 30s
@@ -57,9 +55,7 @@ spec:
5755
limit: 1Gi
5856
roleGroups:
5957
default:
60-
envOverrides:
61-
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
62-
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
58+
envOverrides: *envOverrides
6359
replicas: 1
6460
---
6561
apiVersion: v1
@@ -83,11 +79,42 @@ data:
8379
tags=['example'],
8480
params={},
8581
) as dag:
86-
8782
run_this = BashOperator(
8883
task_id='run_every_minute',
8984
bash_command='date',
9085
)
86+
dag_factory.py: |
87+
from airflow import DAG
88+
from airflow.operators.empty import EmptyOperator
89+
from datetime import datetime, timedelta
90+
91+
# Number of DAGs to generate
92+
NUM_DAGS = 10 # Increase for more stress
93+
DAG_PREFIX = "stress_dag_"
94+
95+
default_args = {
96+
'owner': 'airflow',
97+
'start_date': datetime(2025, 1, 1),
98+
'retries': 1,
99+
'retry_delay': timedelta(seconds=5),
100+
}
101+
102+
def create_dag(dag_id):
103+
with DAG(
104+
dag_id=dag_id,
105+
default_args=default_args,
106+
schedule=None,
107+
catchup=False,
108+
tags=["stress_test"],
109+
) as dag:
110+
start = EmptyOperator(task_id='start')
111+
end = EmptyOperator(task_id='end')
112+
start >> end
113+
return dag
114+
115+
for i in range(NUM_DAGS):
116+
dag_id = f"{DAG_PREFIX}{i:04d}"
117+
globals()[dag_id] = create_dag(dag_id)
91118
pyspark_pi.py: |
92119
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
93120
from datetime import datetime, timedelta
@@ -318,5 +345,3 @@ stringData:
318345
adminUser.password: "{{ airflowAdminPassword }}"
319346
connections.secretKey: "{{ airflowSecretKey }}"
320347
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow
321-
connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow
322-
connections.celeryBrokerUrl: redis://:airflow@redis-airflow-master:6379/0

stacks/airflow/patch_airflow.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
apiVersion: airflow.stackable.tech/v1alpha1
3+
kind: AirflowCluster
4+
metadata:
5+
name: airflow
6+
spec:
7+
clusterConfig:
8+
volumeMounts:
9+
- name: airflow-dags
10+
mountPath: /dags/dag_factory.py
11+
subPath: dag_factory.py
12+
- name: airflow-dags
13+
mountPath: /dags/date_demo.py
14+
subPath: date_demo.py
15+
- name: airflow-dags
16+
mountPath: /dags/pyspark_pi.py
17+
subPath: pyspark_pi.py
18+
- name: airflow-dags
19+
mountPath: /dags/pyspark_pi.yaml
20+
subPath: pyspark_pi.yaml
21+
webservers:
22+
roleGroups:
23+
default:
24+
envOverrides: &envOverrides
25+
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
26+
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
27+
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
28+
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
29+
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
30+
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
31+
kubernetesExecutors:
32+
envOverrides: *envOverrides
33+
schedulers:
34+
roleGroups:
35+
default:
36+
envOverrides: *envOverrides

stacks/stacks-v2.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ stacks:
140140
- airflow
141141
manifests:
142142
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml
143-
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/redis-airflow.yaml
144143
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml
145144
supportedNamespaces: []
146145
resourceRequests:

0 commit comments

Comments
 (0)