|
| 1 | +# We can't simply create the SparkApplication object here as we have to wait for Kafka to be ready because |
| 2 | +# * We currently don't restart failed Spark applications (see https://github.com/stackabletech/spark-k8s-operator/issues/157) |
| 3 | +# * We currently auto-create topics and we need all the brokers to be available so that the topic is distributed among all the brokers |
| 4 | +--- |
| 5 | +apiVersion: batch/v1 |
| 6 | +kind: Job |
| 7 | +metadata: |
| 8 | + name: create-spark-report |
| 9 | +spec: |
| 10 | + template: |
| 11 | + spec: |
| 12 | + serviceAccountName: demo-serviceaccount |
| 13 | + initContainers: |
| 14 | + - name: wait-for-trino-tables |
| 15 | + image: docker.stackable.tech/stackable/tools:1.0.0-stackable24.3.0 |
| 16 | + command: |
| 17 | + - bash |
| 18 | + - -euo |
| 19 | + - pipefail |
| 20 | + - -c |
| 21 | + - | |
| 22 | + echo "Waiting for Job create-tables-in-trino to complete" |
| 23 | + kubectl wait --timeout=30m --for=condition=complete job/create-tables-in-trino |
| 24 | + containers: |
| 25 | + - name: create-spark-report |
| 26 | + image: docker.stackable.tech/stackable/tools:1.0.0-stackable24.3.0 |
| 27 | + command: |
| 28 | + - bash |
| 29 | + - -euo |
| 30 | + - pipefail |
| 31 | + - -c |
| 32 | + - | |
| 33 | + echo "Submitting Spark report" |
| 34 | + kubectl apply -f /tmp/manifest/spark-report.yaml |
| 35 | + volumeMounts: |
| 36 | + - name: manifest |
| 37 | + mountPath: /tmp/manifest |
| 38 | + volumes: |
| 39 | + - name: manifest |
| 40 | + configMap: |
| 41 | + name: spark-report-manifest |
| 42 | + restartPolicy: OnFailure |
| 43 | + backoffLimit: 50 |
| 44 | +--- |
| 45 | +apiVersion: v1 |
| 46 | +kind: ConfigMap |
| 47 | +metadata: |
| 48 | + name: spark-report-manifest |
| 49 | +data: |
| 50 | + spark-report.yaml: | |
| 51 | + --- |
| 52 | + apiVersion: spark.stackable.tech/v1alpha1 |
| 53 | + kind: SparkApplication |
| 54 | + metadata: |
| 55 | + name: spark-report |
| 56 | + spec: |
| 57 | + sparkImage: |
| 58 | + productVersion: 3.5.1 |
| 59 | + mode: cluster |
| 60 | + mainApplicationFile: local:///stackable/spark/jobs/spark-report.py |
| 61 | + deps: |
| 62 | + packages: |
| 63 | + - org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0 |
| 64 | + sparkConf: |
| 65 | + spark.driver.extraClassPath: /stackable/config/hdfs |
| 66 | + spark.executor.extraClassPath: /stackable/config/hdfs |
| 67 | + spark.hadoop.hive.metastore.kerberos.principal: hive/[email protected] |
| 68 | + spark.hadoop.hive.metastore.sasl.enabled: "true" |
| 69 | + spark.kerberos.keytab: /stackable/kerberos/keytab |
| 70 | + spark.kerberos.principal: spark/[email protected] |
| 71 | + spark.sql.catalog.lakehouse: org.apache.iceberg.spark.SparkCatalog |
| 72 | + spark.sql.catalog.lakehouse.type: hive |
| 73 | + spark.sql.catalog.lakehouse.uri: thrift://hive-iceberg:9083 |
| 74 | + spark.sql.defaultCatalog: lakehouse |
| 75 | + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
| 76 | + job: |
| 77 | + config: |
| 78 | + volumeMounts: &volumeMounts |
| 79 | + - name: script |
| 80 | + mountPath: /stackable/spark/jobs |
| 81 | + - name: hdfs-config |
| 82 | + mountPath: /stackable/config/hdfs |
| 83 | + - name: kerberos |
| 84 | + mountPath: /stackable/kerberos |
| 85 | + # Yes, I'm too lazy to fiddle around with JVM arguments... (-Djava.security.krb5.conf=/example/path/krb5.conf) |
| 86 | + - name: kerberos |
| 87 | + mountPath: /etc/krb5.conf |
| 88 | + subPath: krb5.conf |
| 89 | + envOverrides: &envOverrides |
| 90 | + KERBEROS_REALM: KNAB.COM |
| 91 | + # As the envOverrides are not working |
| 92 | + podOverrides: |
| 93 | + spec: |
| 94 | + containers: |
| 95 | + - name: spark-submit |
| 96 | + env: |
| 97 | + - name: KERBEROS_REALM |
| 98 | + value: KNAB.COM |
| 99 | + driver: |
| 100 | + config: |
| 101 | + volumeMounts: *volumeMounts |
| 102 | + resources: # I would like to run this stack on my Laptop |
| 103 | + cpu: |
| 104 | + min: 100m |
| 105 | + envOverrides: *envOverrides |
| 106 | + podOverrides: &podOverrides |
| 107 | + spec: |
| 108 | + containers: |
| 109 | + - name: spark |
| 110 | + # As the envOverrides are not working |
| 111 | + env: |
| 112 | + - name: KERBEROS_REALM |
| 113 | + value: KNAB.COM |
| 114 | + executor: |
| 115 | + replicas: 1 |
| 116 | + config: |
| 117 | + volumeMounts: *volumeMounts |
| 118 | + resources: # I would like to run this stack on my Laptop |
| 119 | + cpu: |
| 120 | + min: 250m |
| 121 | + envOverrides: *envOverrides |
| 122 | + podOverrides: *podOverrides |
| 123 | + volumes: |
| 124 | + - name: script |
| 125 | + configMap: |
| 126 | + name: spark-report-script |
| 127 | + - name: hdfs-config |
| 128 | + configMap: |
| 129 | + name: hdfs |
| 130 | + - name: kerberos |
| 131 | + ephemeral: |
| 132 | + volumeClaimTemplate: |
| 133 | + metadata: |
| 134 | + annotations: |
| 135 | + secrets.stackable.tech/class: kerberos |
| 136 | + secrets.stackable.tech/kerberos.service.names: spark |
| 137 | + secrets.stackable.tech/scope: service=spark |
| 138 | + spec: |
| 139 | + accessModes: |
| 140 | + - ReadWriteOnce |
| 141 | + resources: |
| 142 | + requests: |
| 143 | + storage: "1" |
| 144 | + storageClassName: secrets.stackable.tech |
| 145 | +--- |
| 146 | +apiVersion: v1 |
| 147 | +kind: ConfigMap |
| 148 | +metadata: |
| 149 | + name: spark-report-script |
| 150 | +data: |
| 151 | + spark-report.py: | |
| 152 | + from pyspark.sql import SparkSession |
| 153 | + from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType, DoubleType, BooleanType, TimestampType, MapType, ArrayType |
| 154 | + from pyspark.sql.functions import col, from_json, expr |
| 155 | +
|
| 156 | + import time |
| 157 | +
|
| 158 | + spark = SparkSession.builder.appName("spark-report").getOrCreate() |
| 159 | +
|
| 160 | + spark.sql("show catalogs").show() |
| 161 | + spark.sql("show tables in lakehouse.default").show() |
| 162 | +
|
| 163 | + customer_table = "lakehouse.customer_analytics.customer" |
| 164 | + while not spark.catalog.tableExists(customer_table): |
| 165 | + print(f"Table {customer_table} not found, waiting for Trino to create it...") |
| 166 | + time.sleep(5) |
| 167 | +
|
| 168 | + print(f"Table {customer_table} found, starting report") |
| 169 | +
|
| 170 | + spark.sql(f"SELECT * FROM lakehouse.customer_analytics.customer").show() |
| 171 | + spark.sql(f"CREATE TABLE IF NOT EXISTS lakehouse.customer_analytics.spark_report AS SELECT c_birth_country, count(*) FROM {customer_table} group by c_birth_country order by c_birth_country").show() |
| 172 | +
|
| 173 | + print("Report written") |
0 commit comments