Skip to content

Commit 54da62c

Browse files
committed
working scheduled event with kafka/tls
1 parent 294f562 commit 54da62c

9 files changed

+161
-58
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
---
2+
apiVersion: rbac.authorization.k8s.io/v1
3+
kind: ClusterRole
4+
metadata:
5+
name: airflow-demo-clusterrole
6+
rules:
7+
- apiGroups:
8+
- spark.stackable.tech
9+
resources:
10+
- sparkapplications
11+
verbs:
12+
- create
13+
- get
14+
- list
15+
- apiGroups:
16+
- apps
17+
resources:
18+
- statefulsets
19+
verbs:
20+
- get
21+
- watch
22+
- list
23+
- apiGroups:
24+
- ""
25+
resources:
26+
- persistentvolumeclaims
27+
verbs:
28+
- list
29+
- apiGroups:
30+
- ""
31+
resources:
32+
- pods
33+
verbs:
34+
- get
35+
- watch
36+
- list
37+
- apiGroups:
38+
- ""
39+
resources:
40+
- pods/exec
41+
verbs:
42+
- create

demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml

Lines changed: 0 additions & 36 deletions
This file was deleted.

demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml renamed to demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
apiVersion: rbac.authorization.k8s.io/v1
33
kind: ClusterRoleBinding
44
metadata:
5-
name: airflow-spark-clusterrole-binding
5+
name: airflow-demo-clusterrole-binding
66
roleRef:
77
apiGroup: rbac.authorization.k8s.io
88
kind: ClusterRole
9-
name: airflow-spark-clusterrole
9+
name: airflow-demo-clusterrole
1010
subjects:
1111
- apiGroup: rbac.authorization.k8s.io
1212
kind: Group

demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ spec:
99
containers:
1010
- name: start-pyspark-job
1111
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12-
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
13-
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
14-
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
15-
# all pods are reachable (albeit independent of each other).
12+
# N.B. it is possible for the scheduler to report that a DAG exists,
13+
# only for the worker task to fail if a pod is unexpectedly
14+
# restarted. The wait/watch steps below are not "water-tight" but add
15+
# a layer of stability by at least ensuring that the cluster is
16+
# initialized and ready and that all pods are reachable (albeit
17+
# independent of each other).
1618
command:
1719
- bash
1820
- -euo

demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ spec:
99
containers:
1010
- name: start-date-job
1111
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12-
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
13-
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
14-
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
15-
# all pods are reachable (albeit independent of each other).
12+
# N.B. it is possible for the scheduler to report that a DAG exists,
13+
# only for the worker task to fail if a pod is unexpectedly
14+
# restarted. The wait/watch steps below are not "water-tight" but add
15+
# a layer of stability by at least ensuring that the cluster is
16+
# initialized and ready and that all pods are reachable (albeit
17+
# independent of each other).
1618
command:
1719
- bash
1820
- -euo
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
---
2+
apiVersion: batch/v1
3+
kind: Job
4+
metadata:
5+
name: start-kafka-job
6+
spec:
7+
template:
8+
spec:
9+
containers:
10+
- name: start-kafka-job
11+
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
12+
env:
13+
- name: NAMESPACE
14+
valueFrom:
15+
fieldRef:
16+
fieldPath: metadata.namespace
17+
# N.B. it is possible for the scheduler to report that a DAG exists,
18+
# only for the worker task to fail if a pod is unexpectedly
19+
# restarted. The wait/watch steps below are not "water-tight" but add
20+
# a layer of stability by at least ensuring that the cluster is
21+
# initialized and ready and that all pods are reachable (albeit
22+
# independent of each other).
23+
command:
24+
- bash
25+
- -euo
26+
- pipefail
27+
- -c
28+
- |
29+
# Kafka: wait for cluster
30+
kubectl rollout status --watch statefulset/kafka-broker-default
31+
kubectl rollout status --watch statefulset/kafka-controller-default
32+
33+
# Kafka: create consumer offsets topics (required for group coordinator)
34+
kubectl exec kafka-broker-default-0 -c kafka -- \
35+
/stackable/kafka/bin/kafka-topics.sh \
36+
--bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \
37+
--create \
38+
--topic __consumer_offsets \
39+
--partitions 50 \
40+
--replication-factor 1 \
41+
--config cleanup.policy=compact \
42+
--command-config /stackable/config/client.properties
43+
44+
# Airflow: wait for cluster
45+
kubectl rollout status --watch statefulset/airflow-webserver-default
46+
kubectl rollout status --watch statefulset/airflow-scheduler-default
47+
48+
# Airflow: activate DAG
49+
AIRFLOW_ADMIN_PASSWORD=$(cat /airflow-credentials/adminUser.password)
50+
ACCESS_TOKEN=$(curl -XPOST http://airflow-webserver-default-headless:8080/auth/token -H 'Content-Type: application/json' -d '{"username": "admin", "password": "'$AIRFLOW_ADMIN_PASSWORD'"}' | jq -r .access_token)
51+
curl -H "Authorization: Bearer $ACCESS_TOKEN" -H 'Content-Type: application/json' -XPATCH http://airflow-webserver-default-headless:8080/api/v2/dags/kafka_watcher -d '{"is_paused": false}' | jq
52+
53+
# Kafka: produce a message to create the topic
54+
kubectl exec kafka-broker-default-0 -c kafka -- bash -c \
55+
'echo "Hello World at: $(date)" | /stackable/kafka/bin/kafka-console-producer.sh \
56+
--bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \
57+
--producer.config /stackable/config/client.properties \
58+
--topic test-topic'
59+
volumeMounts:
60+
- name: airflow-credentials
61+
mountPath: /airflow-credentials
62+
volumes:
63+
- name: airflow-credentials
64+
secret:
65+
secretName: airflow-credentials
66+
restartPolicy: OnFailure
67+
backoffLimit: 20 # give some time for the Airflow cluster to be available

demos/demos-v2.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ demos:
4646
- airflow
4747
- job-scheduling
4848
manifests:
49-
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml
50-
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml
49+
- plainYaml: demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml
50+
- plainYaml: demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml
5151
#- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml
5252
- plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
53+
- plainYaml: demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml
5354
supportedNamespaces: []
5455
resourceRequests:
5556
cpu: 2401m

stacks/airflow/airflow.yaml

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ spec:
1616
- name: airflow-dags
1717
configMap:
1818
name: airflow-dags
19+
- name: kafka-tls-pem
20+
configMap:
21+
name: truststore-pem
1922
volumeMounts:
2023
- name: airflow-dags
2124
mountPath: /dags/date_demo.py
@@ -29,6 +32,8 @@ spec:
2932
- name: airflow-dags
3033
mountPath: /dags/kafka.py
3134
subPath: kafka.py
35+
- name: kafka-tls-pem
36+
mountPath: /stackable/kafka-tls-pem
3237
webservers:
3338
roleConfig:
3439
listenerClass: external-unstable
@@ -43,24 +48,25 @@ spec:
4348
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
4449
PYTHONPATH: "/stackable/app/log_config:/dags"
4550
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
51+
#AIRFLOW_CONN_KAFKA_CONN: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.{{ NAMESPACE }}.svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}"
4652
podOverrides: &podOverrides
4753
spec:
4854
containers:
4955
- name: airflow
56+
image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev
57+
imagePullPolicy: IfNotPresent
5058
env:
51-
- name: KAFKA_BOOTSTRAP
52-
valueFrom:
53-
configMapKeyRef:
54-
name: kafka
55-
key: KAFKA
56-
- name: AIRFLOW_CONN_KAFKA_CONN # $(KAFKA_BOOTSTRAP)
57-
value: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.demo.svc.cluster.local:9092\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}"
59+
- name: NAMESPACE
60+
valueFrom:
61+
fieldRef:
62+
fieldPath: metadata.namespace
63+
- name: AIRFLOW_CONN_KAFKA_CONN
64+
value: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}"
5865
roleGroups:
5966
default:
6067
replicas: 1
6168
kubernetesExecutors:
6269
envOverrides: *envOverrides
63-
podOverrides: *podOverrides
6470
schedulers:
6571
envOverrides: *envOverrides
6672
podOverrides: *podOverrides
@@ -118,7 +124,7 @@ data:
118124
# Define an asset that watches for messages on the queue
119125
asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)])
120126
121-
with DAG(dag_id="example_kafka_watcher", schedule=[asset]) as dag:
127+
with DAG(dag_id="kafka_watcher", schedule=[asset]) as dag:
122128
EmptyOperator(task_id="task")
123129
124130
date_demo.py: |

stacks/airflow/kafka.yaml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
---
2+
apiVersion: secrets.stackable.tech/v1alpha1
3+
kind: TrustStore
4+
metadata:
5+
name: truststore-pem
6+
spec:
7+
secretClassName: tls
8+
format: tls-pem
9+
targetKind: ConfigMap
10+
---
211
apiVersion: kafka.stackable.tech/v1alpha1
312
kind: KafkaCluster
413
metadata:
@@ -8,7 +17,7 @@ spec:
817
productVersion: 4.1.0
918
clusterConfig:
1019
tls:
11-
serverSecretClass: null
20+
serverSecretClass: tls
1221
controllers:
1322
roleGroups:
1423
default:
@@ -17,6 +26,16 @@ spec:
1726
config:
1827
bootstrapListenerClass: cluster-internal
1928
brokerListenerClass: cluster-internal
29+
podOverrides:
30+
spec:
31+
containers:
32+
- name: kafka
33+
env:
34+
- name: BOOTSTRAP_SERVER
35+
valueFrom:
36+
configMapKeyRef:
37+
name: kafka
38+
key: KAFKA
2039
roleGroups:
2140
default:
2241
replicas: 1

0 commit comments

Comments
 (0)