Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/modules/demos/pages/airflow-scheduled-job.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,65 @@ asynchronously - and another to poll the running job to report on its status.

image::airflow-scheduled-job/airflow_11.png[]

== Patching Airflow to stress-test DAG parsing using relevant environment variables

The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI.
This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_).
To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below.
The patch also sets some environment variables that can be used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html[window=_blank].

[source,yaml]
----
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
clusterConfig:
volumeMounts:
- name: airflow-dags
mountPath: /dags/dag_factory.py
subPath: dag_factory.py
- name: airflow-dags
mountPath: /dags/date_demo.py
subPath: date_demo.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.py
subPath: pyspark_pi.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.yaml
subPath: pyspark_pi.yaml
webservers:
roleGroups:
default:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
kubernetesExecutors:
envOverrides: *envOverrides
schedulers:
roleGroups:
default:
envOverrides: *envOverrides
----

THe patch can be applied like this:

[source,console]
----
kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml
----

[NOTE]
====
The scheduled job runs every minute and so an instance of it may be running while the scheduler is being re-started as a result of the patch, in which case that instance may fail.
====

== Summary

This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.
14 changes: 0 additions & 14 deletions stacks/_templates/redis-airflow.yaml

This file was deleted.

45 changes: 35 additions & 10 deletions stacks/airflow/airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ spec:
gracefulShutdownTimeout: 30s
roleGroups:
default:
envOverrides:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
replicas: 1
kubernetesExecutors:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
envOverrides: *envOverrides
schedulers:
config:
gracefulShutdownTimeout: 30s
Expand All @@ -57,9 +55,7 @@ spec:
limit: 1Gi
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
envOverrides: *envOverrides
replicas: 1
---
apiVersion: v1
Expand All @@ -83,11 +79,42 @@ data:
tags=['example'],
params={},
) as dag:

run_this = BashOperator(
task_id='run_every_minute',
bash_command='date',
)
dag_factory.py: |
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

# Number of DAGs to generate
NUM_DAGS = 10 # Increase for more stress
DAG_PREFIX = "stress_dag_"

default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(seconds=5),
}

def create_dag(dag_id):
with DAG(
dag_id=dag_id,
default_args=default_args,
schedule=None,
catchup=False,
tags=["stress_test"],
) as dag:
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
start >> end
return dag

for i in range(NUM_DAGS):
dag_id = f"{DAG_PREFIX}{i:04d}"
globals()[dag_id] = create_dag(dag_id)
pyspark_pi.py: |
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
from datetime import datetime, timedelta
Expand Down Expand Up @@ -318,5 +345,3 @@ stringData:
adminUser.password: "{{ airflowAdminPassword }}"
connections.secretKey: "{{ airflowSecretKey }}"
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow
connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow
connections.celeryBrokerUrl: redis://:airflow@redis-airflow-master:6379/0
36 changes: 36 additions & 0 deletions stacks/airflow/patch_airflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
clusterConfig:
volumeMounts:
- name: airflow-dags
mountPath: /dags/dag_factory.py
subPath: dag_factory.py
- name: airflow-dags
mountPath: /dags/date_demo.py
subPath: date_demo.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.py
subPath: pyspark_pi.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.yaml
subPath: pyspark_pi.yaml
webservers:
roleGroups:
default:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
kubernetesExecutors:
envOverrides: *envOverrides
schedulers:
roleGroups:
default:
envOverrides: *envOverrides
1 change: 0 additions & 1 deletion stacks/stacks-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ stacks:
- airflow
manifests:
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/redis-airflow.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml
supportedNamespaces: []
resourceRequests:
Expand Down