Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions docs/modules/demos/pages/airflow-scheduled-job.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,80 @@ asynchronously - and another to poll the running job to report on its status.

image::airflow-scheduled-job/airflow_11.png[]

== Patching Airflow to stress-test DAG parsing using relevant environment variables

By default, Airflow runs database intialization routines on start-up.
These check that an Admin user exists and that the database schema is up-to-date.
Since they are idempotent and invoke little overhead, they can be run safely each time for most environments.
If, however, it makes sense to deactivate this, it can be turned off by patching the running cluster with a resource definition such as this:

[source,yaml]
----
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
clusterConfig:
databaseInitialization:
enabled: false # <1>
----
<1> Turn off the initialization routine by setting `databaseInitialization.enabled` to `false`

NOTE: The field `databaseInitialization.enabled` is `true` by default to be backwards-compatible.
A fresh Airflow cluster cannot be created with this field set to `false` as this results in missing metadata in the Airflow database.

WARNING: Setting `databaseInitialization.enabled` to `false` is an unsupported operation as subsequent updates to a running Airflow cluster can result in broken behaviour due to inconsistent metadata.
Only set `databaseInitialization.enabled` to `false` if you know what you are doing!

The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI.
This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_).
To include this is in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below.
The patch also sets some environment variables that can used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html.

[source,yaml]
----
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
clusterConfig:
databaseInitialization:
enabled: false
volumeMounts:
- name: airflow-dags
mountPath: /dags/dag_factory.py
subPath: dag_factory.py
- name: airflow-dags
mountPath: /dags/date_demo.py
subPath: date_demo.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.py
subPath: pyspark_pi.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.yaml
subPath: pyspark_pi.yaml
webservers:
roleGroups:
default:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
kubernetesExecutors:
envOverrides: *envOverrides
schedulers:
roleGroups:
default:
envOverrides: *envOverrides
----

== Summary

This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.
44 changes: 35 additions & 9 deletions stacks/airflow/airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ spec:
gracefulShutdownTimeout: 30s
roleGroups:
default:
envOverrides:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
replicas: 1
kubernetesExecutors:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
envOverrides: *envOverrides
schedulers:
config:
gracefulShutdownTimeout: 30s
Expand All @@ -57,9 +55,7 @@ spec:
limit: 1Gi
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
envOverrides: *envOverrides
replicas: 1
---
apiVersion: v1
Expand All @@ -83,11 +79,42 @@ data:
tags=['example'],
params={},
) as dag:

run_this = BashOperator(
task_id='run_every_minute',
bash_command='date',
)
dag_factory.py: |
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

# Number of DAGs to generate
NUM_DAGS = 10 # Increase for more stress
DAG_PREFIX = "stress_dag_"

default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(seconds=5),
}

def create_dag(dag_id):
with DAG(
dag_id=dag_id,
default_args=default_args,
schedule=None,
catchup=False,
tags=["stress_test"],
) as dag:
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
start >> end
return dag

for i in range(NUM_DAGS):
dag_id = f"{DAG_PREFIX}{i:04d}"
globals()[dag_id] = create_dag(dag_id)
pyspark_pi.py: |
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
from datetime import datetime, timedelta
Expand Down Expand Up @@ -319,4 +346,3 @@ stringData:
connections.secretKey: "{{ airflowSecretKey }}"
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow
connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow
connections.celeryBrokerUrl: redis://:airflow@redis-airflow-master:6379/0
38 changes: 38 additions & 0 deletions stacks/airflow/patch_airflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
clusterConfig:
databaseInitialization:
enabled: false
volumeMounts:
- name: airflow-dags
mountPath: /dags/dag_factory.py
subPath: dag_factory.py
- name: airflow-dags
mountPath: /dags/date_demo.py
subPath: date_demo.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.py
subPath: pyspark_pi.py
- name: airflow-dags
mountPath: /dags/pyspark_pi.yaml
subPath: pyspark_pi.yaml
webservers:
roleGroups:
default:
envOverrides: &envOverrides
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60"
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60"
AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60"
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
kubernetesExecutors:
envOverrides: *envOverrides
schedulers:
roleGroups:
default:
envOverrides: *envOverrides
1 change: 0 additions & 1 deletion stacks/stacks-v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ stacks:
- airflow
manifests:
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml
- helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/redis-airflow.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml
supportedNamespaces: []
resourceRequests:
Expand Down