From 3c5bd7b603a7a16eb1838ce1e5419e8e302b1341 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 7 Aug 2025 16:27:23 +0200 Subject: [PATCH 1/6] add patch file for stress testing --- stacks/airflow/airflow.yaml | 44 ++++++++++++++++++++++++------- stacks/airflow/patch_airflow.yaml | 37 ++++++++++++++++++++++++++ stacks/stacks-v2.yaml | 1 - 3 files changed, 72 insertions(+), 10 deletions(-) create mode 100644 stacks/airflow/patch_airflow.yaml diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index 1a02b1ef..5d22120f 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -38,14 +38,12 @@ spec: gracefulShutdownTimeout: 30s roleGroups: default: - envOverrides: + envOverrides: &envOverrides AIRFLOW__CORE__DAGS_FOLDER: "/dags" AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 1 kubernetesExecutors: - envOverrides: - AIRFLOW__CORE__DAGS_FOLDER: "/dags" - AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + envOverrides: *envOverrides schedulers: config: gracefulShutdownTimeout: 30s @@ -57,9 +55,7 @@ spec: limit: 1Gi roleGroups: default: - envOverrides: - AIRFLOW__CORE__DAGS_FOLDER: "/dags" - AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + envOverrides: *envOverrides replicas: 1 --- apiVersion: v1 @@ -83,11 +79,42 @@ data: tags=['example'], params={}, ) as dag: - run_this = BashOperator( task_id='run_every_minute', bash_command='date', ) + dag_factory.py: | + from airflow import DAG + from airflow.operators.empty import EmptyOperator + from datetime import datetime, timedelta + + # Number of DAGs to generate + NUM_DAGS = 10 # Increase for more stress + DAG_PREFIX = "stress_dag_" + + default_args = { + 'owner': 'airflow', + 'start_date': datetime(2025, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(seconds=5), + } + + def create_dag(dag_id): + with DAG( + dag_id=dag_id, + default_args=default_args, + schedule=None, + catchup=False, + tags=["stress_test"], + ) as dag: + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') + start >> end + return dag + + for i in range(NUM_DAGS): + dag_id = f"{DAG_PREFIX}{i:04d}" + globals()[dag_id] = create_dag(dag_id) pyspark_pi.py: | """Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster""" from datetime import datetime, timedelta @@ -319,4 +346,3 @@ stringData: connections.secretKey: "{{ airflowSecretKey }}" connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow - connections.celeryBrokerUrl: redis://:airflow@redis-airflow-master:6379/0 diff --git a/stacks/airflow/patch_airflow.yaml b/stacks/airflow/patch_airflow.yaml new file mode 100644 index 00000000..09a7c52f --- /dev/null +++ b/stacks/airflow/patch_airflow.yaml @@ -0,0 +1,37 @@ +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + dbInit: false + volumeMounts: + - name: airflow-dags + mountPath: /dags/dag_factory.py + subPath: dag_factory.py + - name: airflow-dags + mountPath: /dags/date_demo.py + subPath: date_demo.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.py + subPath: pyspark_pi.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.yaml + subPath: pyspark_pi.yaml + webservers: + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + kubernetesExecutors: + envOverrides: *envOverrides + schedulers: + roleGroups: + default: + envOverrides: *envOverrides diff --git a/stacks/stacks-v2.yaml b/stacks/stacks-v2.yaml index 9dd32287..31f3746d 100644 --- a/stacks/stacks-v2.yaml +++ b/stacks/stacks-v2.yaml @@ -99,7 +99,6 @@ stacks: - airflow manifests: - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml - - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/redis-airflow.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml supportedNamespaces: [] resourceRequests: From 2a0bbab97dc5eb1f7220ac2d970c82a12c0c3883 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 8 Aug 2025 11:40:34 +0200 Subject: [PATCH 2/6] added patch section --- .../demos/pages/airflow-scheduled-job.adoc | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 03dc438c..10b230dc 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -129,6 +129,75 @@ asynchronously - and another to poll the running job to report on its status. image::airflow-scheduled-job/airflow_11.png[] +== Patching Airflow to deactivate database initialization + +By default, Airflow runs database intialization routines on start-up. +These check that an Admin user exists and that the database schema is up-to-date. +Since they are idempotent and invoke little overhead, they can be run safely each time for most environments. +If, however, it makes sense to deactivate this, it can be turned off by patching the running cluster with a resource definition such as this: + +[source,yaml] +---- +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + dbInit: false # <1> +---- +<1> Turn off the initialization routine by setting `dbInit` to `false` + +NOTE: The field `dbInit` is `true` by default to be backwards-compatible. +A fresh Airflow cluster cannot be created with this field set to `false` as this results in missing metadata in the Airflow database. + +The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. +This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). +To include this is in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. +The patch also sets some environment variables that can used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html. + +[source,yaml] +---- +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + dbInit: false + volumeMounts: + - name: airflow-dags + mountPath: /dags/dag_factory.py + subPath: dag_factory.py + - name: airflow-dags + mountPath: /dags/date_demo.py + subPath: date_demo.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.py + subPath: pyspark_pi.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.yaml + subPath: pyspark_pi.yaml + webservers: + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + kubernetesExecutors: + envOverrides: *envOverrides + schedulers: + roleGroups: + default: + envOverrides: *envOverrides +---- + == Summary This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI. From e6c7573d8d9f3e0f947c5e79f326b2ee5cd55972 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 11 Aug 2025 11:46:40 +0200 Subject: [PATCH 3/6] updated docs --- .../demos/pages/airflow-scheduled-job.adoc | 15 ++++++++++----- stacks/airflow/patch_airflow.yaml | 3 ++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 10b230dc..bc625cf3 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -129,7 +129,7 @@ asynchronously - and another to poll the running job to report on its status. image::airflow-scheduled-job/airflow_11.png[] -== Patching Airflow to deactivate database initialization +== Patching Airflow to stress-test DAG parsing using relevant environment variables By default, Airflow runs database intialization routines on start-up. These check that an Admin user exists and that the database schema is up-to-date. @@ -145,13 +145,17 @@ metadata: name: airflow spec: clusterConfig: - dbInit: false # <1> + databaseInitialization: + enabled: false # <1> ---- -<1> Turn off the initialization routine by setting `dbInit` to `false` +<1> Turn off the initialization routine by setting `databaseInitialization.enabled` to `false` -NOTE: The field `dbInit` is `true` by default to be backwards-compatible. +NOTE: The field `databaseInitialization.enabled` is `true` by default to be backwards-compatible. A fresh Airflow cluster cannot be created with this field set to `false` as this results in missing metadata in the Airflow database. +WARNING: Setting `databaseInitialization.enabled` to `false` is an unsupported operation as subsequent updates to a running Airflow cluster can result in broken behaviour due to inconsistent metadata. +Only set `databaseInitialization.enabled` to `false` if you know what you are doing! + The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). To include this is in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. @@ -166,7 +170,8 @@ metadata: name: airflow spec: clusterConfig: - dbInit: false + databaseInitialization: + enabled: false volumeMounts: - name: airflow-dags mountPath: /dags/dag_factory.py diff --git a/stacks/airflow/patch_airflow.yaml b/stacks/airflow/patch_airflow.yaml index 09a7c52f..3142dfd7 100644 --- a/stacks/airflow/patch_airflow.yaml +++ b/stacks/airflow/patch_airflow.yaml @@ -5,7 +5,8 @@ metadata: name: airflow spec: clusterConfig: - dbInit: false + databaseInitialization: + enabled: false volumeMounts: - name: airflow-dags mountPath: /dags/dag_factory.py From 405b04ce720e04522a953910e0a215eb6002ede6 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy <1712947+adwk67@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:06:34 +0200 Subject: [PATCH 4/6] Update docs/modules/demos/pages/airflow-scheduled-job.adoc Co-authored-by: Xenia --- docs/modules/demos/pages/airflow-scheduled-job.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index bc625cf3..33a68c8e 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -158,7 +158,7 @@ Only set `databaseInitialization.enabled` to `false` if you know what you are do The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). -To include this is in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. +To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. The patch also sets some environment variables that can used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html. [source,yaml] From a601a41c01314ad783a56fb6b1ce6b7634c1d0ca Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy <1712947+adwk67@users.noreply.github.com> Date: Tue, 12 Aug 2025 15:07:04 +0200 Subject: [PATCH 5/6] Update docs/modules/demos/pages/airflow-scheduled-job.adoc Co-authored-by: Xenia --- docs/modules/demos/pages/airflow-scheduled-job.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 33a68c8e..962419c3 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -159,7 +159,7 @@ Only set `databaseInitialization.enabled` to `false` if you know what you are do The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. -The patch also sets some environment variables that can used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html. +The patch also sets some environment variables that can be used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html[window=_blank]. [source,yaml] ---- From 67a7c7b1ce9eb78905c42753a2d82342a09c42ee Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 12 Aug 2025 16:33:41 +0200 Subject: [PATCH 6/6] review feedback comments --- .../demos/pages/airflow-scheduled-job.adoc | 39 ++++++------------- stacks/_templates/redis-airflow.yaml | 14 ------- stacks/airflow/airflow.yaml | 1 - stacks/airflow/patch_airflow.yaml | 2 - 4 files changed, 12 insertions(+), 44 deletions(-) delete mode 100644 stacks/_templates/redis-airflow.yaml diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 962419c3..54c38be0 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -131,31 +131,6 @@ image::airflow-scheduled-job/airflow_11.png[] == Patching Airflow to stress-test DAG parsing using relevant environment variables -By default, Airflow runs database intialization routines on start-up. -These check that an Admin user exists and that the database schema is up-to-date. -Since they are idempotent and invoke little overhead, they can be run safely each time for most environments. -If, however, it makes sense to deactivate this, it can be turned off by patching the running cluster with a resource definition such as this: - -[source,yaml] ----- ---- -apiVersion: airflow.stackable.tech/v1alpha1 -kind: AirflowCluster -metadata: - name: airflow -spec: - clusterConfig: - databaseInitialization: - enabled: false # <1> ----- -<1> Turn off the initialization routine by setting `databaseInitialization.enabled` to `false` - -NOTE: The field `databaseInitialization.enabled` is `true` by default to be backwards-compatible. -A fresh Airflow cluster cannot be created with this field set to `false` as this results in missing metadata in the Airflow database. - -WARNING: Setting `databaseInitialization.enabled` to `false` is an unsupported operation as subsequent updates to a running Airflow cluster can result in broken behaviour due to inconsistent metadata. -Only set `databaseInitialization.enabled` to `false` if you know what you are doing! - The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. @@ -170,8 +145,6 @@ metadata: name: airflow spec: clusterConfig: - databaseInitialization: - enabled: false volumeMounts: - name: airflow-dags mountPath: /dags/dag_factory.py @@ -203,6 +176,18 @@ spec: envOverrides: *envOverrides ---- +THe patch can be applied like this: + +[source,console] +---- +kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml +---- + +[NOTE] +==== +The scheduled job runs every minute and so an instance of it may be running while the scheduler is being re-started as a result of the patch, in which case that instance may fail. +==== + == Summary This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI. diff --git a/stacks/_templates/redis-airflow.yaml b/stacks/_templates/redis-airflow.yaml deleted file mode 100644 index 05c516aa..00000000 --- a/stacks/_templates/redis-airflow.yaml +++ /dev/null @@ -1,14 +0,0 @@ ---- -releaseName: redis-airflow -name: redis -repo: - name: bitnami - url: https://charts.bitnami.com/bitnami/ -version: 21.2.10 # 8.0.3 -options: - commonLabels: - stackable.tech/vendor: Stackable - auth: - password: airflow - replica: - replicaCount: 1 diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index 5d22120f..1eef20ce 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -345,4 +345,3 @@ stringData: adminUser.password: "{{ airflowAdminPassword }}" connections.secretKey: "{{ airflowSecretKey }}" connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow - connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow diff --git a/stacks/airflow/patch_airflow.yaml b/stacks/airflow/patch_airflow.yaml index 3142dfd7..60889187 100644 --- a/stacks/airflow/patch_airflow.yaml +++ b/stacks/airflow/patch_airflow.yaml @@ -5,8 +5,6 @@ metadata: name: airflow spec: clusterConfig: - databaseInitialization: - enabled: false volumeMounts: - name: airflow-dags mountPath: /dags/dag_factory.py