Skip to content

Commit 50c5993

Browse files
committed
DBZ-313:Demo: Keep *all* change events in a downstream database [DBZ-2101] + Modyfying README.md and connector configuration for .add.fields.prefix
1 parent 3bec836 commit 50c5993

File tree

4 files changed

+104
-74
lines changed

4 files changed

+104
-74
lines changed

auditlog/README.md

Lines changed: 103 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,37 @@ It accompanies the blog post [Building Audit Logs with Change Data Capture and S
55

66
There are two applications (based on [Quarkus](https://quarkus.io/)):
77

8-
* _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database;
9-
as part of its processing, it will not only update its actual "business table" `vegetable`,
10-
but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`:
11-
the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header)
12-
and a use case identifier (as specified in an annotation on the REST API methods).
13-
* _log-enricher_: a Kafka Streams application,
14-
which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table;
15-
this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`.
16-
The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic.
8+
- _vegetables-service_: a simple REST service for inserting and updating vegetable data into a Postgres database;
9+
as part of its processing, it will not only update its actual "business table" `vegetable`,
10+
but also insert some auditing metadata into a dedicated metadata table `transaction_context_data`:
11+
the user (as obtained from the passed JWT token), the client's date (as passed via the HTTP 1.1 `Date` header)
12+
and a use case identifier (as specified in an annotation on the REST API methods).
13+
- _log-enricher_: a Kafka Streams application,
14+
which joins the CDC topic holding the `vegetable` change events (`dbserver1.inventory.vegetable`) with the corresponding metadata in the `dbserver1.inventory.transaction_context_data` topic sourced from the `transaction_context_data` table;
15+
this table is keyed by transaction id, allowing for joining the vegetable `KStream` with the metadata `KTable`.
16+
The enriched vegetable change events are written to the `dbserver1.inventory.vegetable.enriched` topic.
1717

1818
## Building the Demo
1919

2020
```console
21-
$ mvn clean package
21+
mvn clean package
2222
```
2323

2424
```console
25-
$ export DEBEZIUM_VERSION=1.8
26-
$ docker-compose up --build
25+
export DEBEZIUM_VERSION=1.8
26+
docker-compose up --build
2727
```
2828

2929
## Deploy the Debezium Postgres Connector
3030

3131
```console
32-
$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
32+
http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
3333
```
3434

3535
## Modifying Some Data and Observing the Audit Log
3636

3737
```console
38-
$ http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy!
38+
http POST http://localhost:8080/vegetables 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Yummy!
3939
```
4040

4141
This uses a pre-generated JWT token (with expiration date set to 2099-12-31 and user set to "farmerbob").
@@ -44,13 +44,13 @@ To regenerate the token with different data, use the [Jwtenizr](https://github.c
4444
You can also update an existing vegetable record (this token is for "farmermargaret"):
4545

4646
```console
47-
$ http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty!
47+
http PUT http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJtYXJnYXJldCIsInVwbiI6ImZhcm1lcm1hcmdhcmV0IiwiYXV0aF90aW1lIjoxNTY5ODM1Mzk5LCJpc3MiOiJmYXJtc2hvcCIsImdyb3VwcyI6WyJmYXJtZXJzIiwiY3VzdG9tZXJzIl0sImV4cCI6NDEwMjQ0NDc5OSwiaWF0IjoxNTY5ODM1Mzk5LCJqdGkiOiI0MiJ9.DTEUA3p-xyK5nveoJIVhjfKNFdVszYIb55Qj4Xrm70DDbAXuOU2FMkffuUAUm2s7ACkp2KEmg6brRwSjvA-zhW61kDR9ZgEb9NWeDjr6Eue08xcSODKt7SGV-M7h3yhuDIhU7uaZrxRUAQTWqm1vxd2rmN_QH0frhKMUNFFsLIOGLG0zHcLosRcwZ4tAKXSSB9VE0fth6srIQCUebDkF7ucA_WSYjPRvahCBd8JvnV4VUGQxZW8zcRhTEwcaLq20ODO-dr85xgWI2Yr_1A7PDuDL4oUjCb90YyhtzaIzs2vQMjcxJ6TWmTcqJpgCfkjE-TeVwjaafcNJu0fBmcP8jA' 'Date:Thu, 22 Aug 2019 08:12:31 GMT' name=Tomatoe description=Tasty!
4848
```
4949

5050
Or delete a record (again using the "farmerbob" token):
5151

5252
```console
53-
$ http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT'
53+
http DELETE http://localhost:8080/vegetables/10 'Authorization:Bearer eyJraWQiOiJqd3Qua2V5IiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJmYXJtZXJib2IiLCJ1cG4iOiJmYXJtZXJib2IiLCJhdXRoX3RpbWUiOjE1NjY0NTgxMTMsImlzcyI6ImZhcm1zaG9wIiwiZ3JvdXBzIjpbImZhcm1lcnMiLCJjdXN0b21lcnMiXSwiZXhwIjo0MTAyNDQ0Nzk5LCJpYXQiOjE1NjY0NTgxMTMsImp0aSI6IjQyIn0.CscbJN8amqKryYvnVO1184J8F67HN2iTEjVN2VOPodcnoeOd7_iQVKUjC3h-ye5apkJjvAsQKrjzlrGCHRfl-n6jC9F7IkOtjoWnJ4wQ9BBo1SAtPw_Czt1I_Ujm-Kb1p5-BWACCBCVVFgYZTWP_laz5JZS7dIvs6VqoNnw7A4VpA6iPfTVfYlNY3u86-k1FvEg_hW-N9Y9RuihMsPuTdpHK5xdjCrJiD0VJ7-0eRQ8RXpycHuHN4xfmV8MqXBYjYSYDOhbnYbdQVbf0YJoFFqfb75my5olN-97ITsi2MS62W_y-RNT0qZrbytqINA3fF3VQsSY6VcaqRAeygrKm_Q' 'Date:Thu, 22 Aug 2019 08:12:31 GMT'
5454
```
5555

5656
Doing so, observe the contents of the `dbserver1.inventory.vegetable`, `dbserver1.inventory.transaction_context_data` and `dbserver1.inventory.vegetable.enriched` topics:
@@ -144,19 +144,102 @@ http POST http://localhost:8085/vegetables/{uuid}/auditData/{tuuid} audit:='{"us
144144

145145
This would then fix the missing event in the transaction context data topic and trigger the enricher to provide a new log entry.
146146

147-
## Stopping All Services
147+
## Fowarding Events to a Downstream Postgres Database
148148

149149
```console
150-
$ docker-compose down
150+
$ docker run -it --rm \
151+
--network auditlog_default \
152+
quay.io/debezium/tooling:1.2 \
153+
/bin/bash -c "kafkacat -b kafka:9092 \
154+
-C -o beginning -q -u -t dbserver1.inventory.vegetable.enriched | jq ."
155+
```
156+
157+
This will show you the enriched events fowarded to the `dbserver1.inventory.vegetable.enriched` topic.
158+
We need to pass this events to lightweight `postgres-sink` database defined in `docker-compose.yaml` using the [Debezium JDBC connector](https://debezium.io/documentation/reference/stable/connectors/jdbc.html).
159+
160+
Download the connector plugin `.tar.gz` from the Debzium [plugin archive](https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/) and save it to the `/config/plugins/debezium-connector-jdbc` directory.
161+
162+
Restart your Kafka connect container to pick up the new plugin
163+
164+
```console
165+
docker compose down -v connect
166+
docker compose up -d connect
167+
```
168+
169+
Create your debezium sink connector by running
170+
171+
```console
172+
http POST http://localhost:8083/connectors < connector-config/config/jdbc-connector-config.json
173+
```
174+
175+
The connector should read from the `dbserver1.inventory.vegetable.enriched` topic and populate the `dbserver1_inventory_vegetable` table.
176+
177+
Confirm the changes by running the `exec` command into your `postgres-sink` container
178+
179+
```console
180+
docker compose exec postgres-sink psql -U postgresuser -d postgres
181+
```
182+
183+
View the rows in your `dbserver1_inventory_vegetable` table:
184+
185+
```sql
186+
SELECT * FROM dbserver1_inventory_vegetable;
187+
```
188+
189+
An exmaple of the rows should be like this:
190+
191+
```console
192+
| __connect_topic | __connect_partition | __connect_offset | id | description | name | __deleted | op | lsn | ts_ms | tx_id | client_date | usecase | user_name |
193+
|------------------------------------------|---------------------|------------------|----|-------------|---------|-----------|----|----------|---------------|-------|------------------|-------------------|-----------------|
194+
| dbserver1.inventory.vegetable.enriched | 0 | 0 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob |
195+
| dbserver1.inventory.vegetable.enriched | 0 | 2 | 10 | Yummy! | Tomatoe | false | c | 36689976 | 1773921791527 | 769 | 1566461551000000 | CREATE VEGETABLE | farmerbob |
196+
| dbserver1.inventory.vegetable.enriched | 0 | 4 | 10 | Tasty! | Tomatoe | false | u | 36690432 | 1773921804910 | 770 | 1566461551000000 | UPDATE VEGETABLE | farmermargaret |
197+
| dbserver1.inventory.vegetable.enriched | 0 | 5 | 10 | | | true | d | 36690800 | 1773921809082 | 771 | 1566461551000000 | DELETE VEGETABLE | farmerbob |
198+
```
199+
200+
With this our audit log events have been successfully propagated to our downstream database.
201+
202+
The table `dbserver_inventory_vegetable` can be defined as:
203+
204+
```sql
205+
-- This version is used with the Debezium JDBC Sink Connector
206+
CREATE TABLE dbserver1_inventory_vegetable_enriched (
207+
-- Business Data
208+
id INTEGER,
209+
name VARCHAR(255) NULL,
210+
description TEXT NULL,
211+
212+
-- Audit Metadata
213+
tx_id VARCHAR(255),
214+
user_name VARCHAR(255) NULL,
215+
usecase VARCHAR(255) NULL,
216+
client_date BIGINT NULL,
217+
218+
-- Change Metadata (for uniqueness)
219+
op VARCHAR(1),
220+
lsn BIGINT,
221+
ts_ms BIGINT,
222+
223+
-- Kafka Metadata to prevent squashing of events
224+
__connect_partition INTEGER,
225+
__connect_offset BIGINT,
226+
__connect_topic VARCHAR(255),
227+
228+
-- To ensure that all events are kept we will use kafka metadata to
229+
-- To ensure that all events are kept we will use kafka metadata to
230+
-- identify unique rows
231+
PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic)
232+
);
233+
151234
```
152235

153-
## Running the Quarkus Applications Locally
236+
## Running the Quarkus applications locally
154237

155238
Set `ADVERTISED_HOST_NAME` of the `kafka` service in _docker-compose.yaml_ to the IP address of your host machine.
156239
Start all services except the `vegetables-service` and the `log-enricher`:
157240

158241
```console
159-
$ docker-compose up --scale vegetables-service=0 --scale log-enricher=0
242+
docker-compose up --scale vegetables-service=0 --scale log-enricher=0
160243
```
161244

162245
Then start the three services via the Quarkus dev mode:

auditlog/connector-config/config/camel-sink-connector-config.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

auditlog/connector-config/config/jdbc-connector-config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"transforms": "unwrap",
1616
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
1717
"transforms.unwrap.add.fields": "op,lsn,source.ts_ms:ts_ms,txId:tx_id,audit.client_date:client_date,audit.usecase:usecase,audit.user_name:user_name",
18+
"transforms.unwrap.add.fields.prefix": "",
1819
"transforms.unwrap.delete.tombstone.handling.mode": "rewrite"
1920
}
2021
}

auditlog/connector-config/scripts/init.sql

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,31 +26,3 @@ CREATE TABLE dbserver1_inventory_vegetable_enriched (
2626
PRIMARY KEY (__connect_offset, __connect_partition, __connect_topic)
2727
);
2828

29-
30-
-- This version is used with the Camel JDBC Sink Connector
31-
CREATE TABLE enriched_vegetables (
32-
-- Business Data
33-
id INTEGER,
34-
name VARCHAR(255) NULL,
35-
description TEXT NULL,
36-
37-
-- Audit Metadata
38-
tx_id VARCHAR(255),
39-
user_name VARCHAR(255) NULL,
40-
use_case VARCHAR(255) NULL,
41-
client_date BIGINT NULL,
42-
43-
-- Change Metadata (for uniqueness)
44-
op VARCHAR(1),
45-
lsn BIGINT,
46-
ts_ms BIGINT,
47-
48-
-- Kafka Metadata to prevent squashing of events
49-
kafka_partion INTEGER,
50-
kafka_offset BIGINT,
51-
kafka_topic VARCHAR(255),
52-
53-
-- To ensure that all events are kept we will use kafka metadata to
54-
--
55-
PRIMARY KEY (kafka_topic, kafka_partition, kafka_offset)
56-
);

0 commit comments

Comments
 (0)