@@ -26,44 +26,101 @@ spec:
2626 - name : airflow-dags
2727 mountPath : /dags/pyspark_pi.yaml
2828 subPath : pyspark_pi.yaml
29+ - name : airflow-dags
30+ mountPath : /dags/kafka.py
31+ subPath : kafka.py
2932 webservers :
3033 roleConfig :
3134 listenerClass : external-unstable
3235 config :
3336 resources :
3437 cpu :
35- min : 400m
36- max : " 1 "
38+ min : " 2 "
39+ max : " 3 "
3740 memory :
38- limit : 2Gi
39- gracefulShutdownTimeout : 30s
41+ limit : 3Gi
42+ envOverrides : &envOverrides
43+ AIRFLOW__CORE__DAGS_FOLDER : " /dags"
44+ PYTHONPATH : " /stackable/app/log_config:/dags"
45+ AIRFLOW_CONN_KUBERNETES_IN_CLUSTER : " kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
46+ podOverrides : &podOverrides
47+ spec :
48+ containers :
49+ - name : airflow
50+ env :
51+ - name : KAFKA_BOOTSTRAP
52+ valueFrom :
53+ configMapKeyRef :
54+ name : kafka
55+ key : KAFKA
56+ - name : AIRFLOW_CONN_KAFKA_CONN # $(KAFKA_BOOTSTRAP)
57+ value : " {\" conn_type\" : \" kafka\" , \" extra\" : {\" bootstrap.servers\" : \" kafka-broker-default-0-listener-broker.demo.svc.cluster.local:9092\" , \" group.id\" : \" airflow_group\" , \" auto.offset.reset\" : \" latest\" }}"
4058 roleGroups :
4159 default :
42- envOverrides : &envOverrides
43- AIRFLOW__CORE__DAGS_FOLDER : " /dags"
44- AIRFLOW_CONN_KUBERNETES_IN_CLUSTER : " kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
4560 replicas : 1
4661 kubernetesExecutors :
4762 envOverrides : *envOverrides
63+ podOverrides : *podOverrides
4864 schedulers :
49- config :
50- gracefulShutdownTimeout : 30s
51- resources :
52- cpu :
53- min : 400m
54- max : " 1"
55- memory :
56- limit : 1Gi
65+ envOverrides : *envOverrides
66+ podOverrides : *podOverrides
67+ roleGroups :
68+ default :
69+ replicas : 1
70+ dagProcessors :
71+ envOverrides : *envOverrides
72+ podOverrides : *podOverrides
73+ roleGroups :
74+ default :
75+ replicas : 1
76+ triggerers :
77+ envOverrides : *envOverrides
78+ podOverrides : *podOverrides
5779 roleGroups :
5880 default :
59- envOverrides : *envOverrides
6081 replicas : 1
6182---
6283apiVersion : v1
6384kind : ConfigMap
6485metadata :
6586 name : airflow-dags
6687data :
88+ kafka.py : |
89+ from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger
90+ from airflow.providers.standard.operators.empty import EmptyOperator
91+ from airflow.sdk import DAG, Asset, AssetWatcher
92+
93+ import logging
94+ logger = logging.getLogger(__name__)
95+
96+ logger.info("✅ kafka.apply_function module imported")
97+
98+ def apply_function(message):
99+ try:
100+ logger.info("apply_function called")
101+ logger.info("message payload: %r", message.value())
102+ return True
103+ except Exception:
104+ logger.exception("apply_function failed")
105+ return False
106+
107+ # Define a trigger that listens to an Apache Kafka message queue
108+ trigger = KafkaMessageQueueTrigger(
109+ topics=["test-topic"],
110+ apply_function="kafka.apply_function",
111+ kafka_config_id="kafka_conn",
112+ apply_function_args=None,
113+ apply_function_kwargs=None,
114+ poll_timeout=1,
115+ poll_interval=5,
116+ )
117+
118+ # Define an asset that watches for messages on the queue
119+ asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])
120+
121+ with DAG(dag_id="example_kafka_watcher", schedule=[asset]) as dag:
122+ EmptyOperator(task_id="task")
123+
67124 date_demo.py : |
68125 """Example DAG returning the current date"""
69126 from datetime import datetime, timedelta
0 commit comments