diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 03dc438c..54c38be0 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -129,6 +129,65 @@ asynchronously - and another to poll the running job to report on its status. image::airflow-scheduled-job/airflow_11.png[] +== Patching Airflow to stress-test DAG parsing using relevant environment variables + +The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. +This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). +To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. +The patch also sets some environment variables that can be used to change the frequency of certain operations. The descriptions can be found here: https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html[window=_blank]. + +[source,yaml] +---- +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + volumeMounts: + - name: airflow-dags + mountPath: /dags/dag_factory.py + subPath: dag_factory.py + - name: airflow-dags + mountPath: /dags/date_demo.py + subPath: date_demo.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.py + subPath: pyspark_pi.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.yaml + subPath: pyspark_pi.yaml + webservers: + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + kubernetesExecutors: + envOverrides: *envOverrides + schedulers: + roleGroups: + default: + envOverrides: *envOverrides +---- + +THe patch can be applied like this: + +[source,console] +---- +kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml +---- + +[NOTE] +==== +The scheduled job runs every minute and so an instance of it may be running while the scheduler is being re-started as a result of the patch, in which case that instance may fail. +==== + == Summary This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI. diff --git a/stacks/_templates/redis-airflow.yaml b/stacks/_templates/redis-airflow.yaml deleted file mode 100644 index 05c516aa..00000000 --- a/stacks/_templates/redis-airflow.yaml +++ /dev/null @@ -1,14 +0,0 @@ ---- -releaseName: redis-airflow -name: redis -repo: - name: bitnami - url: https://charts.bitnami.com/bitnami/ -version: 21.2.10 # 8.0.3 -options: - commonLabels: - stackable.tech/vendor: Stackable - auth: - password: airflow - replica: - replicaCount: 1 diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index 1a02b1ef..1eef20ce 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -38,14 +38,12 @@ spec: gracefulShutdownTimeout: 30s roleGroups: default: - envOverrides: + envOverrides: &envOverrides AIRFLOW__CORE__DAGS_FOLDER: "/dags" AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 1 kubernetesExecutors: - envOverrides: - AIRFLOW__CORE__DAGS_FOLDER: "/dags" - AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + envOverrides: *envOverrides schedulers: config: gracefulShutdownTimeout: 30s @@ -57,9 +55,7 @@ spec: limit: 1Gi roleGroups: default: - envOverrides: - AIRFLOW__CORE__DAGS_FOLDER: "/dags" - AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + envOverrides: *envOverrides replicas: 1 --- apiVersion: v1 @@ -83,11 +79,42 @@ data: tags=['example'], params={}, ) as dag: - run_this = BashOperator( task_id='run_every_minute', bash_command='date', ) + dag_factory.py: | + from airflow import DAG + from airflow.operators.empty import EmptyOperator + from datetime import datetime, timedelta + + # Number of DAGs to generate + NUM_DAGS = 10 # Increase for more stress + DAG_PREFIX = "stress_dag_" + + default_args = { + 'owner': 'airflow', + 'start_date': datetime(2025, 1, 1), + 'retries': 1, + 'retry_delay': timedelta(seconds=5), + } + + def create_dag(dag_id): + with DAG( + dag_id=dag_id, + default_args=default_args, + schedule=None, + catchup=False, + tags=["stress_test"], + ) as dag: + start = EmptyOperator(task_id='start') + end = EmptyOperator(task_id='end') + start >> end + return dag + + for i in range(NUM_DAGS): + dag_id = f"{DAG_PREFIX}{i:04d}" + globals()[dag_id] = create_dag(dag_id) pyspark_pi.py: | """Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster""" from datetime import datetime, timedelta @@ -318,5 +345,3 @@ stringData: adminUser.password: "{{ airflowAdminPassword }}" connections.secretKey: "{{ airflowSecretKey }}" connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@postgresql-airflow/airflow - connections.celeryResultBackend: db+postgresql://airflow:airflow@postgresql-airflow/airflow - connections.celeryBrokerUrl: redis://:airflow@redis-airflow-master:6379/0 diff --git a/stacks/airflow/patch_airflow.yaml b/stacks/airflow/patch_airflow.yaml new file mode 100644 index 00000000..60889187 --- /dev/null +++ b/stacks/airflow/patch_airflow.yaml @@ -0,0 +1,36 @@ +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + clusterConfig: + volumeMounts: + - name: airflow-dags + mountPath: /dags/dag_factory.py + subPath: dag_factory.py + - name: airflow-dags + mountPath: /dags/date_demo.py + subPath: date_demo.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.py + subPath: pyspark_pi.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.yaml + subPath: pyspark_pi.yaml + webservers: + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" + AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" + AIRFLOW__DAG_PROCESSOR__PRINT_STATS_INTERVAL: "60" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + kubernetesExecutors: + envOverrides: *envOverrides + schedulers: + roleGroups: + default: + envOverrides: *envOverrides diff --git a/stacks/stacks-v2.yaml b/stacks/stacks-v2.yaml index 494cf7e5..09cbfef0 100644 --- a/stacks/stacks-v2.yaml +++ b/stacks/stacks-v2.yaml @@ -140,7 +140,6 @@ stacks: - airflow manifests: - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml - - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/redis-airflow.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml supportedNamespaces: [] resourceRequests: