You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: kafka-questdb-connector-samples/stocks/readme.md
+16-16Lines changed: 16 additions & 16 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,6 +1,6 @@
1
1
# Sample Project: Feeding changes from Postgres to QuestDB
2
2
## What does this sample do?
3
-
This sample project demonstrates how to feed changes from a Postgres table to QuestDB. It uses the [Debezium Postgres connector](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html) to capture changes from a [Postgres database](https://www.postgresql.org/) and feed them to a [Kafka](https://kafka.apache.org/) topic. The [Kafka QuestDB connector](https://github.com/questdb/kafka-questdb-connector) then reads from the Kafka topic and writes the changes to a [QuestDB](questdb.io/) table. QuestDB is used for analytical queries on data and to feed the data to a Grafana dashboard for visualization.
3
+
This sample project demonstrates how to feed changes from a Postgres table to QuestDB. It uses the [Debezium Postgres connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html) to capture changes from a [Postgres database](https://www.postgresql.org/) and feed them to a [Kafka](https://kafka.apache.org/) topic. The [Kafka QuestDB connector](https://github.com/questdb/kafka-questdb-connector) then reads from the Kafka topic and writes the changes to a [QuestDB](questdb.io/) table. QuestDB is used for analytical queries on data and to feed the data to a Grafana dashboard for visualization.
4
4
5
5
The project can be seen as a reference architecture for a data pipeline that feeds changes from a Postgres database to QuestDB. Postgres is an excellent [transaction/OLTP](https://en.wikipedia.org/wiki/Online_transaction_processing) database. It excels with simple short-running queries. Hence, the `stock` table contains only the most recent snapshot of the data. It stores no history at all.
6
6
@@ -18,13 +18,13 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
18
18
## Running the sample
19
19
1. Clone this repository via `git clone https://github.com/questdb/kafka-questdb-connector.git`
20
20
2.`cd kafka-questdb-connector/kafka-questdb-connector-samples/stocks/` to enter the directory with this sample.
21
-
3. Run `docker-compose build` to build docker images with the sample project. This will take a few minutes.
22
-
4. Run `docker-compose up` to start Postgres, Java stock price updater app, Apache Kafka, Kafka Connect with Debezium and QuestDB connectors, QuestDB and Grafana. This will take a few minutes.
21
+
3. Run `dockercompose build` to build docker images with the sample project. This will take a few minutes.
22
+
4. Run `dockercompose up` to start Postgres, Java stock price updater app, Apache Kafka, Kafka Connect with Debezium and QuestDB connectors, QuestDB and Grafana. This will take a few minutes.
23
23
5. The previous command will generate a lot of log messages. Eventually logging should cease. This means all containers are running.
24
24
6. At this point we have all infrastructure running, the Java application keeps updating stock prices in Postgres. However, the rest of the pipeline is not yet running. We need to start the Kafka Connect connectors. Kafka Connect has a REST API, so we can use `curl` to start the connectors.
25
25
7. In a separate shell, execute following command to start Debezium connector:
26
26
```shell
27
-
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
27
+
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","topic.prefix":"dbserver1"}} ' localhost:8083/connectors
28
28
```
29
29
It starts the Debezium connector that will capture changes from Postgres and feed them to Kafka.
30
30
8. Execute following command to start QuestDB Kafka Connect sink:
@@ -49,7 +49,7 @@ Bear in mind the sample starts multiple containers. It's running fine on my mach
49
49
max(price)
50
50
FROM stock
51
51
where symbol = 'IBM'
52
-
SAMPLE by 1m align to calendar;
52
+
SAMPLE by 1m;
53
53
```
54
54
It returns the average, minimum and maximum stock price forIBMin each minute. You can change the `1m` to `1s` to get data aggregated by second. The `SAMPLE by` shows a bit of QuestDB syntax sugar to make time-related queries more readable.
55
55
13. Don't forget to stop the containers when you're done. The project generates a lot of data and you could run out of disk space.
@@ -64,15 +64,15 @@ If you like what you see and want to learn more about the internals of the proje
64
64
6. Grafana
65
65
66
66
### Postgres
67
-
The docker-compose start Postgres image. It's using a container image provided by the Debezium project as they maintain a Postgres which is preconfigured for Debezium.
67
+
The dockercompose start Postgres image. It's using a container image provided by the Debezium project as they maintain a Postgres which is preconfigured for Debezium.
68
68
69
69
### Java stock price updater
70
70
It's a Spring Boot application which during startup creates a table in Postgres and populates it with initial data.
71
71
You can see the SQL executed in the [schema.sql](src/main/resources/schema.sql) file. The table has always one row per each stock symbol.
72
72
73
73
Once the application is started, it starts updating stock prices in regular intervals. The `price` and `last_update` columns are updated every time a new price is received for the stock symbol. It mimics a real-world scenario where you would have a Postgres table with the latest prices for each stock symbol. Such table would be typically used by a transactional system to get the latest prices for each stock symbol. It our case the transactional system is simulated by a [simple Java application](src/main/java/io/questdb/kafka/samples/StockService.java) which is randomly updating prices for each stock symbol in the Postgres table. The application generates 1000s of updates each second.
74
74
75
-
The application is built and packaged as a container image when executing `docker-compose build`. Inside the [docker-compose file](docker-compose.yml) you can see the container called `producer`. That's our Java application.
75
+
The application is built and packaged as a container image when executing `dockercompose build`. Inside the [docker-compose file](docker-compose.yml) you can see the container called `producer`. That's our Java application.
It uses Maven to build the application and then copies the resulting JAR file to the container image. The container image is based on Zulu OpenJDK 11. The application is started with `java -jar /stocks.jar` command.
100
+
It uses Maven to build the application and then copies the resulting JAR file to the container image. The container image is based on Eclipse Temurin JDK 17. The application is started with `java -jar /stocks.jar` command.
101
101
102
102
### Debezium Postgres connector
103
103
Debezium is an open source project which provides connectors forvarious databases. It is used to capture changes from a database and feed them to a Kafka topic. In other words: Whenever there is a changein a database table, Debezium will read the change and feed it to a Kafka topic. This way it translates operations such as INSERT or UPDATE into events which can be consumed by other systems. Debezium supports a wide range of databases. In this sample we use the Postgres connector.
104
104
105
-
The Debezium Postgres connector is implemented as a Kafka Connect source connector. Inside the [docker-compose file](docker-compose.yml) it's called `connect` and its container image is also built during `docker-compose build`. The [Dockerfile](../../Dockerfile-Samples) uses Debezium image. The Debezium image contains Kafka Connect runtime and Debezium connectors. Our Dockerfile amends it with Kafka Connect QuestDB Sink.
105
+
The Debezium Postgres connector is implemented as a Kafka Connect source connector. Inside the [docker-compose file](docker-compose.yml) it's called `connect` and its container image is also built during `dockercompose build`. The [Dockerfile](../../Dockerfile-Samples) uses Debezium image. The Debezium image contains Kafka Connect runtime and Debezium connectors. Our Dockerfile amends it with Kafka Connect QuestDB Sink.
106
106
107
107
What's important: When this container start it just connects to Kafka broker, but it does not start any connectors. We need to start the connectors using `curl` command. This is how we started the Debezium connector:
108
108
```shell
109
-
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors
109
+
curl -X POST -H "Content-Type: application/json" -d '{"name":"debezium_source","config":{"tasks.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","topic.prefix":"dbserver1"}} ' localhost:8083/connectors
110
110
```
111
111
It uses Kafka Connect REST interface to start a new connector with a given configuration. Let's have a closer look at the configuration. This is how it looks like when formatted for readability:
112
112
```json
@@ -120,11 +120,11 @@ It uses Kafka Connect REST interface to start a new connector with a given confi
Most of the fields are self-explanatory. The only non-obvious one is `database.server.name`. It's a unique name of the database server. It's used by Kafka Connect to store offsets. It's important that it's unique for each database server. If you have multiple Postgres databases, you need to use different `database.server.name` for each of them. It's used by Debezium to generate Kafka topic names. The topic name is generated as `database.server.name`.`schema`.`table`. In our case it's `dbserver1.public.stock`.
127
+
Most of the fields are self-explanatory. The only non-obvious one is `topic.prefix`. It's used by Debezium to generate Kafka topic names. The topic name is generated as `topic.prefix`.`schema`.`table`. In our case it's `dbserver1.public.stock`. It's important that it's unique for each database server. If you have multiple Postgres databases, you need to use different `topic.prefix` for each of them.
128
128
129
129
### Kafka QuestDB connector
130
130
The Kafka QuestDB connector re-uses the same Kafka Connect runtime as the Debezium connector. It's also started using `curl` command. This is how we started the QuestDB connector:
@@ -172,7 +172,7 @@ Let's focus on the ExtractNewRecordState transform a bit more. Why is it needed
172
172
"last_update": 1666172978269856
173
173
},
174
174
"source": {
175
-
"version": "1.9.6.Final",
175
+
"version": "2.7.3.Final",
176
176
"connector": "postgresql",
177
177
"name": "dbserver1",
178
178
"ts_ms": 1666172978272,
@@ -207,7 +207,7 @@ This is the actual change in a table. It's a JSON object which contains the new
207
207
We cannot feed a full change object to Kafka Connect QuestDB Sink, because the sink would create a column foreach fieldin the change object, including all metadata, for example the source part of the JSON:
0 commit comments