Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Large diffs are not rendered by default.

107 changes: 106 additions & 1 deletion docs/modules/demos/pages/nifi-kafka-druid-earthquake-data.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Additionally, we have to use the FQDN service names (including the namespace), s
To run this demo, your system needs at least:

* 9 {k8s-cpu}[cpu units] (core/hyperthread)
* 42GiB memory
* 42GiB memory (minimum of 16GiB per node)
* 75GiB disk storage

== Overview
Expand Down Expand Up @@ -86,6 +86,111 @@ $ stackablectl stacklet list

include::partial$instance-hint.adoc[]

== Inspect the data in Kafka

Kafka is an event streaming platform to stream the data in near real-time.
All the messages put in and read from Kafka are structured in dedicated queues called topics.
The test data will be put into a topic called `earthquakes`.
The records are produced (written) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.

Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate.
The easiest way to obtain this is to shell into the `kafka-broker-default-0` Pod, as we will do in the following section for demonstration purposes.
For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.

=== List the available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

// In the following commands the kcat-prober container instead of the kafka container is used to send requests to Kafka.
// This is necessary because kcat cannot use key- and truststore files with empty passwords, which are mounted here to the kafka container.
// However, the kcat-prober container has TLS certificates mounted, which can be used by kcat to connect to Kafka.
[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-topics.sh \
--describe \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties
...
Topic: earthquakes TopicId: ND51v_XcQPK4Ilm7A35Pag PartitionCount: 8 ReplicationFactor: 1 Configs: min.insync.replicas=1,segment.bytes=100000000,retention.bytes=900000000
Topic: earthquakes Partition: 0 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 1 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 2 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 3 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 4 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 5 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 6 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
Topic: earthquakes Partition: 7 Leader: 1243966388 Replicas: 1243966388 Isr: 1243966388 Elr: LastKnownElr:
----

You can see that Kafka consists of one broker, and the topic `earthquakes` with eight partitions has been created. To
see some records sent to Kafka, run the following command. You can change the number of records to print via the `--max-messages`
parameter.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--consumer.config /stackable/config/client.properties \
--topic earthquakes \
--offset earliest \
--partition 0 \
--max-messages 1
----

Below is an example of the output of one record:

[source,json]
----
{
"time":"1950-02-07T10:37:29.240Z",
"latitude":45.949,
"longitude":151.59,
"depth":35.0,
"mag":5.94,
"magType":"mw",
"nst":null,
"gap":null,
"dmin":null,
"rms":null,
"net":"iscgem",
"id":"iscgem895202",
"updated":"2022-04-26T18:23:38.377Z",
"place":"Kuril Islands",
"type":"earthquake",
"horizontalError":null,
"depthError":12.6,
"magError":0.55,
"magNst":null,
"status":"reviewed",
"locationSource":"iscgem",
"magSource":"iscgem"
}
----

If you are interested in how many records have been produced to the Kafka topic so far, use the following command.

[source,console]
----
$ kubectl exec kafka-broker-default-0 -c kafka -- \
/stackable/kafka/bin/kafka-get-offsets.sh \
--bootstrap-server kafka-broker-default-headless.default.svc.cluster.local:9093 \
--command-config /stackable/config/client.properties \
--topic earthquakes
...
earthquakes:0:757379
earthquakes:1:759282
earthquakes:2:761924
earthquakes:3:761339
earthquakes:4:759059
earthquakes:5:767695
earthquakes:6:771457
earthquakes:7:768301
----

If you calculate `765,000` records * `8` partitions, you end up with ~ 6,120,000 records.

== NiFi

NiFi is used to fetch earthquake data from the internet and ingest it into Kafka.
Expand Down
6 changes: 5 additions & 1 deletion stacks/nifi-kafka-druid-superset-s3/nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
min: "500m"
max: "4"
memory:
limit: '6Gi'
limit: "6Gi"
storage:
contentRepo:
capacity: "10Gi"
Expand All @@ -33,6 +33,10 @@ spec:
capacity: "4Gi"
stateRepo:
capacity: "1Gi"
configOverrides:
nifi.properties:
nifi.web.https.sni.required: "false"
nifi.web.https.sni.host.check: "false"
roleGroups:
default:
replicas: 1
Expand Down