Skip to content

Commit 7888242

Browse files
authored
chore: update demos to Kafka 4 (#313)
* bump kafka version * docs: remove documentation referencing kcat commands
1 parent f9a1525 commit 7888242

File tree

6 files changed

+12
-295
lines changed

6 files changed

+12
-295
lines changed

demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ spec:
2626
echo 'Waiting for all nifi instances to be ready'
2727
kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=nifi,app.kubernetes.io/instance=nifi
2828
- name: wait-for-kafka-topics
29-
image: oci.stackable.tech/sdp/kafka:3.9.1-stackable0.0.0-dev
29+
image: oci.stackable.tech/sdp/kafka:4.1.0-stackable0.0.0-dev
3030
command:
3131
- bash
3232
- -euo
Binary file not shown.

docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -86,103 +86,6 @@ $ stackablectl stacklet list
8686

8787
include::partial$instance-hint.adoc[]
8888

89-
== Inspect the data in Kafka
90-
91-
Kafka is an event streaming platform to stream the data in near real-time.
92-
All the messages put in and read from Kafka are structured in dedicated queues called topics.
93-
The test data will be put into a topic called earthquakes.
94-
The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.
95-
96-
As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat].
97-
Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate.
98-
The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes.
99-
For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.
100-
101-
=== List the available Topics
102-
103-
You can execute a command on the Kafka broker to list the available topics as follows:
104-
105-
// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
106-
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
107-
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
108-
[source,console]
109-
----
110-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L"
111-
Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap):
112-
1 brokers:
113-
broker 1001 at 172.19.0.4:32321 (controller)
114-
1 topics:
115-
topic "earthquakes" with 8 partitions:
116-
partition 0, leader 1001, replicas: 1001, isrs: 1001
117-
partition 1, leader 1001, replicas: 1001, isrs: 1001
118-
partition 2, leader 1001, replicas: 1001, isrs: 1001
119-
partition 3, leader 1001, replicas: 1001, isrs: 1001
120-
partition 4, leader 1001, replicas: 1001, isrs: 1001
121-
partition 5, leader 1001, replicas: 1001, isrs: 1001
122-
partition 6, leader 1001, replicas: 1001, isrs: 1001
123-
partition 7, leader 1001, replicas: 1001, isrs: 1001
124-
----
125-
126-
You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To
127-
see some records sent to Kafka, run the following command. You can change the number of records to print via the `-c`
128-
parameter.
129-
130-
[source,console]
131-
----
132-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -c 1"
133-
----
134-
135-
Below is an example of the output of one record:
136-
137-
[source,json]
138-
----
139-
{
140-
"time":"1950-02-07T10:37:29.240Z",
141-
"latitude":45.949,
142-
"longitude":151.59,
143-
"depth":35.0,
144-
"mag":5.94,
145-
"magType":"mw",
146-
"nst":null,
147-
"gap":null,
148-
"dmin":null,
149-
"rms":null,
150-
"net":"iscgem",
151-
"id":"iscgem895202",
152-
"updated":"2022-04-26T18:23:38.377Z",
153-
"place":"Kuril Islands",
154-
"type":"earthquake",
155-
"horizontalError":null,
156-
"depthError":12.6,
157-
"magError":0.55,
158-
"magNst":null,
159-
"status":"reviewed",
160-
"locationSource":"iscgem",
161-
"magSource":"iscgem"
162-
}
163-
----
164-
165-
If you are interested in how many records have been produced to the Kafka topic so far, use the following command.
166-
It will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
167-
The given pattern will print some metadata of the record.
168-
169-
[source,console]
170-
----
171-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t earthquakes -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'"
172-
Topic earthquakes / Partition 0 / Offset: 378859 / Timestamp: 1752584024936
173-
Topic earthquakes / Partition 0 / Offset: 378860 / Timestamp: 1752584024936
174-
Topic earthquakes / Partition 0 / Offset: 378861 / Timestamp: 1752584024936
175-
Topic earthquakes / Partition 0 / Offset: 378862 / Timestamp: 1752584024936
176-
Topic earthquakes / Partition 0 / Offset: 378863 / Timestamp: 1752584024936
177-
Topic earthquakes / Partition 0 / Offset: 378864 / Timestamp: 1752584024936
178-
Topic earthquakes / Partition 0 / Offset: 378865 / Timestamp: 1752584024936
179-
Topic earthquakes / Partition 0 / Offset: 378866 / Timestamp: 1752584024936
180-
----
181-
182-
If you calculate `379,000` records * `8` partitions, you end up with ~ 3,032,000 records.
183-
The output also shows that the last measurement record was produced at the timestamp `1752584024936`, which translates to `Tuesday, 15 July 2025 14:53:44.936 GMT+02:00`
184-
(using e.g. the command `date -d @1752584024`).
185-
18689
== NiFi
18790

18891
NiFi is used to fetch earthquake data from the internet and ingest it into Kafka.

docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc

Lines changed: 0 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -92,182 +92,6 @@ $ stackablectl stacklet list
9292

9393
include::partial$instance-hint.adoc[]
9494

95-
== Inspect the data in Kafka
96-
97-
Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka
98-
are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records
99-
are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were
100-
created.
101-
102-
As Kafka has no web interface, you must use a Kafka client like {kcat}[kcat]. Kafka uses mutual TLS, so clients
103-
wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the
104-
`kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup,
105-
you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the
106-
Kafka Pod.
107-
108-
=== List the available Topics
109-
110-
You can execute a command on the Kafka broker to list the available topics as follows:
111-
112-
// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
113-
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
114-
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
115-
[source,console]
116-
----
117-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L"
118-
Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap):
119-
1 brokers:
120-
broker 1001 at 172.19.0.3:31041 (controller)
121-
2 topics:
122-
topic "stations" with 8 partitions:
123-
partition 0, leader 1001, replicas: 1001, isrs: 1001
124-
partition 1, leader 1001, replicas: 1001, isrs: 1001
125-
partition 2, leader 1001, replicas: 1001, isrs: 1001
126-
partition 3, leader 1001, replicas: 1001, isrs: 1001
127-
partition 4, leader 1001, replicas: 1001, isrs: 1001
128-
partition 5, leader 1001, replicas: 1001, isrs: 1001
129-
partition 6, leader 1001, replicas: 1001, isrs: 1001
130-
partition 7, leader 1001, replicas: 1001, isrs: 1001
131-
topic "measurements" with 8 partitions:
132-
partition 0, leader 1001, replicas: 1001, isrs: 1001
133-
partition 1, leader 1001, replicas: 1001, isrs: 1001
134-
partition 2, leader 1001, replicas: 1001, isrs: 1001
135-
partition 3, leader 1001, replicas: 1001, isrs: 1001
136-
partition 4, leader 1001, replicas: 1001, isrs: 1001
137-
partition 5, leader 1001, replicas: 1001, isrs: 1001
138-
partition 6, leader 1001, replicas: 1001, isrs: 1001
139-
partition 7, leader 1001, replicas: 1001, isrs: 1001
140-
----
141-
142-
You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight
143-
partitions each.
144-
145-
=== Show Sample Records
146-
147-
To see some records sent to Kafka, run the following commands. You can change the number of records to
148-
print via the `-c` parameter.
149-
150-
[source,console]
151-
----
152-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t stations -c 2"
153-
----
154-
155-
Below is an example of the output of two records:
156-
157-
[source,json]
158-
----
159-
{
160-
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
161-
"number": 48900237,
162-
"shortname": "EITZE",
163-
"longname": "EITZE",
164-
"km": 9.56,
165-
"agency": "VERDEN",
166-
"longitude": 9.2767694354,
167-
"latitude": 52.9040654474,
168-
"water": {
169-
"shortname": "ALLER",
170-
"longname": "ALLER"
171-
}
172-
}
173-
{
174-
"uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
175-
"number": 48900204,
176-
"shortname": "RETHEM",
177-
"longname": "RETHEM",
178-
"km": 34.22,
179-
"agency": "VERDEN",
180-
"longitude": 9.3828408101,
181-
"latitude": 52.7890975921,
182-
"water": {
183-
"shortname": "ALLER",
184-
"longname": "ALLER"
185-
}
186-
}
187-
----
188-
189-
[source,console]
190-
----
191-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -c 3"
192-
----
193-
194-
Below is an example of the output of three records:
195-
196-
[source,json]
197-
----
198-
{
199-
"timestamp": 1658151900000,
200-
"value": 221,
201-
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
202-
}
203-
{
204-
"timestamp": 1658152800000,
205-
"value": 220,
206-
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
207-
}
208-
{
209-
"timestamp": 1658153700000,
210-
"value": 220,
211-
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
212-
}
213-
----
214-
215-
The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the
216-
measuring station. The relationship is illustrated below.
217-
218-
image::nifi-kafka-druid-water-level-data/topics.png[]
219-
220-
The reason for splitting the data up into two different topics is the improved performance. One more straightforward
221-
solution would be to use a single topic and produce records like the following:
222-
223-
[source,json]
224-
----
225-
{
226-
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
227-
"number": 48900237,
228-
"shortname": "EITZE",
229-
"longname": "EITZE",
230-
"km": 9.56,
231-
"agency": "VERDEN",
232-
"longitude": 9.2767694354,
233-
"latitude": 52.9040654474,
234-
"water": {
235-
"shortname": "ALLER",
236-
"longname": "ALLER"
237-
},
238-
"timestamp": 1658151900000,
239-
"value": 221
240-
}
241-
----
242-
243-
Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that
244-
every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This
245-
often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased
246-
network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This
247-
process is called data normalization. The downside is that when analyzing the data, you need to combine the records from
248-
multiple tables in Druid (`stations` and `measurements`).
249-
250-
If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It
251-
will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
252-
The given pattern will print some metadata of the record.
253-
254-
[source,console]
255-
----
256-
$ kubectl exec -it kafka-broker-default-0 -c kcat-prober -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -C -t measurements -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'"
257-
Topic measurements / Partition 0 / Offset: 1324098 / Timestamp: 1680606104652
258-
Topic measurements / Partition 1 / Offset: 1346816 / Timestamp: 1680606100462
259-
Topic measurements / Partition 2 / Offset: 1339363 / Timestamp: 1680606100461
260-
Topic measurements / Partition 3 / Offset: 1352787 / Timestamp: 1680606104652
261-
Topic measurements / Partition 4 / Offset: 1330144 / Timestamp: 1680606098368
262-
Topic measurements / Partition 5 / Offset: 1340226 / Timestamp: 1680606104652
263-
Topic measurements / Partition 6 / Offset: 1320125 / Timestamp: 1680606100462
264-
Topic measurements / Partition 7 / Offset: 1317719 / Timestamp: 1680606098368
265-
----
266-
267-
If you calculate `1,324,098` records * `8` partitions, you end up with ~ 10,592,784 records. The output also shows that
268-
the last measurement record was produced at the timestamp `1680606104652`, translating to
269-
`Di 4. Apr 13:01:44 CEST 2023` (using the command `date -d @1680606104`).
270-
27195
== NiFi
27296

27397
NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow

stacks/data-lakehouse-iceberg-trino-spark/kafka.yaml

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ spec:
77
image:
88
productVersion: 3.9.1
99
clusterConfig:
10-
zookeeperConfigMapName: kafka-znode
1110
authentication:
1211
- authenticationClass: kafka-client-tls
12+
controllers:
13+
roleGroups:
14+
default:
15+
replicas: 1
1316
brokers:
1417
config:
1518
resources:
@@ -25,7 +28,7 @@ spec:
2528
default:
2629
replicas: 5
2730
configOverrides:
28-
server.properties:
31+
broker.properties:
2932
num.partitions: "27"
3033
log.segment.bytes: "50000000" # 0.5GB
3134
log.retention.bytes: "2000000000" # 2 GB. Should keep between 2.0 and 2.5GB
@@ -38,11 +41,3 @@ spec:
3841
provider:
3942
tls:
4043
clientCertSecretClass: tls
41-
---
42-
apiVersion: zookeeper.stackable.tech/v1alpha1
43-
kind: ZookeeperZnode
44-
metadata:
45-
name: kafka-znode
46-
spec:
47-
clusterRef:
48-
name: zookeeper

stacks/nifi-kafka-druid-superset-s3/kafka.yaml

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,18 @@
11
---
2-
apiVersion: zookeeper.stackable.tech/v1alpha1
3-
kind: ZookeeperZnode
4-
metadata:
5-
name: kafka-znode
6-
spec:
7-
clusterRef:
8-
name: zookeeper
9-
---
102
apiVersion: kafka.stackable.tech/v1alpha1
113
kind: KafkaCluster
124
metadata:
135
name: kafka
146
spec:
157
image:
16-
productVersion: 3.9.1
8+
productVersion: 4.1.0
179
clusterConfig:
18-
zookeeperConfigMapName: kafka-znode
1910
authentication:
2011
- authenticationClass: kafka-client-tls
12+
controllers:
13+
roleGroups:
14+
default:
15+
replicas: 1
2116
brokers:
2217
config:
2318
bootstrapListenerClass: external-stable
@@ -35,7 +30,7 @@ spec:
3530
default:
3631
replicas: 1
3732
configOverrides:
38-
server.properties:
33+
broker.properties:
3934
num.partitions: "8"
4035
# We have
4136
# 1 brokers

0 commit comments

Comments
 (0)