Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Large diffs are not rendered by default.

This file was deleted.

Large diffs are not rendered by default.

107 changes: 106 additions & 1 deletion docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Additionally, we have to use the FQDN service names (including the namespace), s
To run this demo, your system needs at least:

* 9 {k8s-cpu}[cpu units] (core/hyperthread)
* 42GiB memory
* 42GiB memory (minimum of 16GiB per node)
* 75GiB disk storage

== Overview
Expand Down Expand Up @@ -86,6 +86,111 @@ $ stackablectl stacklet list

include::partial$instance-hint.adoc[]

== Inspect the data in Kafka

Kafka is an event streaming platform to stream the data in near real-time.
All the messages put in and read from Kafka are structured in dedicated queues called topics.
The test data will be put into a topic called `earthquakes`.
The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.

Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate.
The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes.
For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.

=== List the available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-topics.sh \
--describe \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties
...
Topic: earthquakes TopicId: ND51v_XcQPK4Ilm7A35Pag PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
Topic: earthquakes Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
----

You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To
see some records sent to Kafka, run the following command. You can change the number of records to print via the `--max-messages`
parameter.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--consumer.config /stackable/config/client.properties \
--topic earthquakes \
--offset earliest \
--partition 0 \
--max-messages 1
----

Below is an example of the output of one record:

[source,json]
----
{
"time":"1950-02-07T10:37:29.240Z",
"latitude":45.949,
"longitude":151.59,
"depth":35.0,
"mag":5.94,
"magType":"mw",
"nst":null,
"gap":null,
"dmin":null,
"rms":null,
"net":"iscgem",
"id":"iscgem895202",
"updated":"2022-04-26T18:23:38.377Z",
"place":"Kuril Islands",
"type":"earthquake",
"horizontalError":null,
"depthError":12.6,
"magError":0.55,
"magNst":null,
"status":"reviewed",
"locationSource":"iscgem",
"magSource":"iscgem"
}
----

If you are interested in how many records have been produced to the Kafka topic so far, use the following command.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-get-offsets.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties \
--topic earthquakes
...
earthquakes:0:757379
earthquakes:1:759282
earthquakes:2:761924
earthquakes:3:761339
earthquakes:4:759059
earthquakes:5:767695
earthquakes:6:771457
earthquakes:7:768301
----

If you calculate `765,000` records * `8` partitions, you end up with ~ 6,120,000 records.

== NiFi

NiFi is used to fetch earthquake data from the internet and ingest it into Kafka.
Expand Down
209 changes: 209 additions & 0 deletions docs/modules/demos/pages/nifi-kafka-druid-water-level-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,215 @@ $ stackablectl stacklet list

include::partial$instance-hint.adoc[]

== Inspect the data in Kafka

Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka
are structured in dedicated queues called topics. The test data will be put into topics called stations and measurements. The records
are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were
created.

To interact with Kafka you will use the client scripts shipped with the Kafka image. Kafka uses mutual TLS, so clients
wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the
`kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes. For a production setup,
you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the
Kafka Pod.

=== List the available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

[source,console]
----
$ kubectl k exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-topics.sh \
--describe \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties
...
Topic: measurements TopicId: w9qYb3GaTvCMZj4G8pkPPQ PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
Topic: measurements Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: measurements Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations TopicId: QkKmvOagQkG4QbeS0IZ_Tg PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
Topic: stations Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: stations Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
----

You can see that Kafka consists of one broker, and the topics `stations` and `measurements` have been created with eight
partitions each.

=== Show Sample Records

To see some records sent to Kafka, run the following commands. You can change the number of records to
print via the `--max-messages` parameter.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--consumer.config /stackable/config/client.properties \
--topic stations \
--offset earliest \
--partition 0 \
--max-messages 2
----

Below is an example of the output of two records:

[source,json]
----
{
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
"number": 48900237,
"shortname": "EITZE",
"longname": "EITZE",
"km": 9.56,
"agency": "VERDEN",
"longitude": 9.2767694354,
"latitude": 52.9040654474,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
}
}
{
"uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
"number": 48900204,
"shortname": "RETHEM",
"longname": "RETHEM",
"km": 34.22,
"agency": "VERDEN",
"longitude": 9.3828408101,
"latitude": 52.7890975921,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
}
}
----

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--consumer.config /stackable/config/client.properties \
--topic measurements \
--offset earliest \
--partition 0 \
--max-messages 3
----

Below is an example of the output of three records:

[source,json]
----
{
"timestamp": 1658151900000,
"value": 221,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
"timestamp": 1658152800000,
"value": 220,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
"timestamp": 1658153700000,
"value": 220,
"station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
----

The records of the two topics only contain the needed data. The measurement records contain a `station_uuid` for the
measuring station. The relationship is illustrated below.

image::nifi-kafka-druid-water-level-data/topics.png[]

The reason for splitting the data up into two different topics is the improved performance. One more straightforward
solution would be to use a single topic and produce records like the following:

[source,json]
----
{
"uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
"number": 48900237,
"shortname": "EITZE",
"longname": "EITZE",
"km": 9.56,
"agency": "VERDEN",
"longitude": 9.2767694354,
"latitude": 52.9040654474,
"water": {
"shortname": "ALLER",
"longname": "ALLER"
},
"timestamp": 1658151900000,
"value": 221
}
----

Notice the two last attributes that differ from the previously shown `stations` records. The obvious downside is that
every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This
often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased
network traffic and storage usage. The solution is only to send a station's known/needed data or measurement data. This
process is called data normalization. The downside is that when analyzing the data, you need to combine the records from
multiple tables in Druid (`stations` and `measurements`).

If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It
will print the last record produced to the topic partition, formatted with the pattern specified in the `-f` parameter.
The given pattern will print some metadata of the record.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-get-offsets.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties \
--topic measurements
...
measurements:0:1366665
measurements:1:1364930
measurements:2:1395607
measurements:3:1390762
measurements:4:1368829
measurements:5:1362539
measurements:6:1344362
measurements:7:1369651
----

Multiplying `1,324,098` records by `8` partitions, we end up with ~ 10,592,784 records.

To inspect the last produced records, use the following command. Here, we consume the last three records from partition
`0` of the `measurements` topic.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--consumer.config /stackable/config/client.properties \
--topic measurements \
--offset latest \
--partition 0 \
--max-messages 3
-...
{"timestamp":"2025-10-21T11:00:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
{"timestamp":"2025-10-21T11:15:00+02:00","value":369.54,"station_uuid":"5cdc6555-87d7-4fcd-834d-cbbe24c9d08b"}
{"timestamp":"2025-10-21T11:00:00+02:00","value":8.0,"station_uuid":"7deedc21-2878-40cc-ab47-f6da0d9002f1"}
----
== NiFi

NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow
Expand Down
6 changes: 5 additions & 1 deletion stacks/nifi-kafka-druid-superset-s3/nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
min: "500m"
max: "4"
memory:
limit: '6Gi'
limit: "6Gi"
storage:
contentRepo:
capacity: "10Gi"
Expand All @@ -33,6 +33,10 @@ spec:
capacity: "4Gi"
stateRepo:
capacity: "1Gi"
configOverrides:
nifi.properties:
nifi.web.https.sni.required: "false"
nifi.web.https.sni.host.check: "false"
roleGroups:
default:
replicas: 1
Expand Down