|
| 1 | +# Transaction-aware Aggregation of CDC Events |
| 2 | + |
| 3 | +This is an _experimental_ Flink job showing how to aggregate CDC events while respecting transactional boundaries in the source database. It's sample code accompanying the discussion in this [blog post](https://decodable.co/blog/aggregating-change-data-capture-events-based-on-transactional-boundaries). |
| 4 | + |
| 5 | +## Infrastructure Setup |
| 6 | + |
| 7 | +There is a [Compose](./compose.yaml) file to locally spin up containers for the following systems by running `docker compose -f compose.yaml up` directly from within this example's base folder: |
| 8 | + |
| 9 | +* MySQL as source database |
| 10 | +* Apache Kafka |
| 11 | +* Kafka Connect for running the Debezium Source Connector for MySQL |
| 12 | + |
| 13 | +Checking with `docker ps` should thus show three containers being up and running fine: |
| 14 | + |
| 15 | +```text |
| 16 | +CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES |
| 17 | +8affed92db39 quay.io/debezium/connect:3.0.1.Final "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp flink-tx-cdc-aggregation-connect-1 |
| 18 | +891a791834fc quay.io/strimzi/kafka:0.43.0-kafka-3.8.0 "sh -c './bin/kafka-…" 2 minutes ago Up 2 minutes 0.0.0.0:9092->9092/tcp flink-tx-cdc-aggregation-kafka-1 |
| 19 | +6f9a4e43243a quay.io/debezium/example-mysql:3.0.1.Final "docker-entrypoint.s…" 2 minutes ago Up 2 minutes (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp flink-tx-cdc-aggregation-mysql-1 |
| 20 | +``` |
| 21 | + |
| 22 | +## Set up Debezium MySQL Source Connector |
| 23 | + |
| 24 | +The next step is to run an instance of the Debezium MySQL source connector in Kafka Connect which can be done by running the following connector configuration against the REST API of Connect like so: |
| 25 | + |
| 26 | +```bash |
| 27 | +curl --location 'localhost:8083/connectors' \ |
| 28 | +--header 'Content-Type: application/json' \ |
| 29 | +--data '{ |
| 30 | + "name": "demo-connector-001", |
| 31 | + "config": { |
| 32 | + "connector.class": "io.debezium.connector.mysql.MySqlConnector", |
| 33 | + "database.hostname": "mysql", |
| 34 | + "database.port": "3306", |
| 35 | + "database.user": "root", |
| 36 | + "database.password": "123456", |
| 37 | + "database.server.id": "12345", |
| 38 | + "topic.prefix": "demodb", |
| 39 | + "database.include.list": "inventory", |
| 40 | + "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", |
| 41 | + "schema.history.internal.kafka.topic": "schemahistory.demodb.inventory", |
| 42 | + "include.schema.changes": "true", |
| 43 | + "provide.transaction.metadata": "true", |
| 44 | + "tombstones.on.delete": "false" |
| 45 | + } |
| 46 | +}' |
| 47 | +``` |
| 48 | + |
| 49 | +This configuration will capture any changes for all five tables which are found in the MySQL example database named `inventory`. Important is the configuration property `"provide.transaction.metadata": "true"` which instructs Debezium to expose all the necessary metadata related to transaction handling in the MySQL database. Without this additional information the transaction-aware aggregation of CDC events would not work. |
| 50 | + |
| 51 | +## Execute Database Transaction in MySQL |
| 52 | + |
| 53 | +To verify if the setup is working, a simple example transaction is executed in the `inventory` database which touches two tables, namely `customers` and `addresses`. Debezium will capture the changes and propagate them into the separate Kafka topics. Additionally, the transaction metadata is written into a dedicated Kafka topic. |
| 54 | + |
| 55 | + |
| 56 | + |
| 57 | +Run this command to enter a CLI session inside the MySQL container: |
| 58 | + |
| 59 | +```bash |
| 60 | +docker compose exec mysql mysql -u root -p123456 |
| 61 | +``` |
| 62 | + |
| 63 | +Then inside the MySQL CLI run the transaction which inserts a new customer together with a single address record: |
| 64 | + |
| 65 | +```sql |
| 66 | +-- INSERTS 1 customer with 1 address in a transaction |
| 67 | +START TRANSACTION; |
| 68 | +INSERT INTO inventory.customers VALUES |
| 69 | + (default, 'Issac', 'Fletcher', 'ifletcher@example.com'); |
| 70 | +SET @customer_id = LAST_INSERT_ID(); |
| 71 | +INSERT INTO inventory.addresses VALUES |
| 72 | + (default, @customer_id, '1234 Nowhere Street', 'Great City', 'SomeState', '12345', 'LIVING'); |
| 73 | +COMMIT; |
| 74 | +``` |
| 75 | + |
| 76 | +To verify the transaction-related metadata in the corresponding Kafka topic `demodb.transaction` run this command: |
| 77 | + |
| 78 | +```bash |
| 79 | +docker compose exec kafka ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demodb.transaction --from-beginning |
| 80 | +``` |
| 81 | + |
| 82 | +which should show two metadata events, one for each transaction marker: |
| 83 | + |
| 84 | +* **BEGIN marker** |
| 85 | + |
| 86 | +```json5 |
| 87 | +{ |
| 88 | + "schema": { |
| 89 | + /* ... */ |
| 90 | + }, |
| 91 | + "payload": { |
| 92 | + "status": "BEGIN", |
| 93 | + "id": "file=binlog.000003,pos=236", |
| 94 | + "event_count": null, |
| 95 | + "data_collections": null, |
| 96 | + "ts_ms": 1733763172000 |
| 97 | + } |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +* **END marker** |
| 102 | + |
| 103 | +```json5 |
| 104 | +{ |
| 105 | + "schema": { |
| 106 | + /* ... */ |
| 107 | + }, |
| 108 | + "payload": { |
| 109 | + "status": "END", |
| 110 | + "id": "file=binlog.000003,pos=236", |
| 111 | + "event_count": 2, |
| 112 | + "data_collections": [ |
| 113 | + { |
| 114 | + "data_collection": "inventory.customers", |
| 115 | + "event_count": 1 |
| 116 | + }, |
| 117 | + { |
| 118 | + "data_collection": "inventory.addresses", |
| 119 | + "event_count": 1 |
| 120 | + } |
| 121 | + ], |
| 122 | + "ts_ms": 1733763168000 |
| 123 | + } |
| 124 | +} |
| 125 | +``` |
| 126 | + |
| 127 | +## Build & Run Apache Flink Job |
| 128 | + |
| 129 | +With all this in place it's time to build and run the Flink job and verify that the two CDC events which are part of this transaction are successfully aggregated and written into the Kafka output topic. |
| 130 | + |
| 131 | + |
| 132 | + |
| 133 | +> [!NOTE] |
| 134 | +> NOTE: Building the experimental Flink code requires that you have JDK 17 and installed locally on your machine !_ |
| 135 | +
|
| 136 | +From within the `flink-datastream-poc` folder run: |
| 137 | + |
| 138 | +```bash |
| 139 | +./mvnw clean package |
| 140 | +``` |
| 141 | + |
| 142 | +which should result in a successful build of a self-contained JAR file that you can run as is like so: |
| 143 | + |
| 144 | +```bash |
| 145 | +java --add-opens=java.base/java.util=ALL-UNNAMED -jar target/flink-datastream-tx-buffering-1.0-SNAPSHOT.jar |
| 146 | +``` |
| 147 | + |
| 148 | +With the job still running, you can switch into another terminal window to verify that the resulting transactional buffer has been written into the Kafka output topic named `cdc.tx.buffers`: |
| 149 | + |
| 150 | +```bash |
| 151 | +docker compose exec kafka ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdc.tx.buffers --from-beginning |
| 152 | +``` |
| 153 | + |
| 154 | +```json5 |
| 155 | +{ |
| 156 | + "beginMarker": { |
| 157 | + "id": "file=binlog.000003,pos=236", |
| 158 | + "status": "BEGIN", |
| 159 | + "event_count": 0, |
| 160 | + "data_collections": null, |
| 161 | + "ts_ms": 1733763172000 |
| 162 | + }, |
| 163 | + "endMarker": { |
| 164 | + "id": "file=binlog.000003,pos=236", |
| 165 | + "status": "END", |
| 166 | + "event_count": 2, |
| 167 | + "data_collections": [ |
| 168 | + { |
| 169 | + "data_collection": "inventory.customers", |
| 170 | + "event_count": 1 |
| 171 | + }, |
| 172 | + { |
| 173 | + "data_collection": "inventory.addresses", |
| 174 | + "event_count": 1 |
| 175 | + } |
| 176 | + ], |
| 177 | + "ts_ms": 1733763168000 |
| 178 | + }, |
| 179 | + "buffer": { |
| 180 | + "inventory.customers": [ |
| 181 | + { |
| 182 | + "key": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"demodb.inventory.customers.Key\"},\"payload\":{\"id\":1005}}", |
| 183 | + "value": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"}],\"optional\":true,\"name\":\"demodb.inventory.customers.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"}],\"optional\":true,\"name\":\"demodb.inventory.customers.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"}],\"optional\":false,\"name\":\"demodb.inventory.customers.Envelope\",\"version\":2},\"payload\":{\"before\":null,\"after\":{\"id\":1005,\"first_name\":\"Issac\",\"last_name\":\"Fletcher\",\"email\":\"ifletcher@example.com\"},\"source\":{\"version\":\"3.0.1.Final\",\"connector\":\"mysql\",\"name\":\"demodb\",\"ts_ms\":1733763168000,\"snapshot\":\"false\",\"db\":\"inventory\",\"sequence\":null,\"ts_us\":1733763168000000,\"ts_ns\":1733763168000000000,\"table\":\"customers\",\"server_id\":12345,\"gtid\":null,\"file\":\"binlog.000003\",\"pos\":383,\"row\":0,\"thread\":187,\"query\":null},\"transaction\":{\"id\":\"file=binlog.000003,pos=236\",\"total_order\":1,\"data_collection_order\":1},\"op\":\"c\",\"ts_ms\":1733763172327,\"ts_us\":1733763172327398,\"ts_ns\":1733763172327398676}}" |
| 184 | + } |
| 185 | + ], |
| 186 | + "inventory.addresses": [ |
| 187 | + { |
| 188 | + "key": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}],\"optional\":false,\"name\":\"demodb.inventory.addresses.Key\"},\"payload\":{\"id\":17}}", |
| 189 | + "value": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"customer_id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"street\"},{\"type\":\"string\",\"optional\":false,\"field\":\"city\"},{\"type\":\"string\",\"optional\":false,\"field\":\"state\"},{\"type\":\"string\",\"optional\":false,\"field\":\"zip\"},{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"SHIPPING,BILLING,LIVING\"},\"field\":\"type\"}],\"optional\":true,\"name\":\"demodb.inventory.addresses.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"customer_id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"street\"},{\"type\":\"string\",\"optional\":false,\"field\":\"city\"},{\"type\":\"string\",\"optional\":false,\"field\":\"state\"},{\"type\":\"string\",\"optional\":false,\"field\":\"zip\"},{\"type\":\"string\",\"optional\":false,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"SHIPPING,BILLING,LIVING\"},\"field\":\"type\"}],\"optional\":true,\"name\":\"demodb.inventory.addresses.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_us\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ns\"}],\"optional\":false,\"name\":\"demodb.inventory.addresses.Envelope\",\"version\":2},\"payload\":{\"before\":null,\"after\":{\"id\":17,\"customer_id\":1005,\"street\":\"1234 Nowhere Street\",\"city\":\"Great City\",\"state\":\"SomeState\",\"zip\":\"12345\",\"type\":\"LIVING\"},\"source\":{\"version\":\"3.0.1.Final\",\"connector\":\"mysql\",\"name\":\"demodb\",\"ts_ms\":1733763168000,\"snapshot\":\"false\",\"db\":\"inventory\",\"sequence\":null,\"ts_us\":1733763168000000,\"ts_ns\":1733763168000000000,\"table\":\"addresses\",\"server_id\":12345,\"gtid\":null,\"file\":\"binlog.000003\",\"pos\":544,\"row\":0,\"thread\":187,\"query\":null},\"transaction\":{\"id\":\"file=binlog.000003,pos=236\",\"total_order\":2,\"data_collection_order\":1},\"op\":\"c\",\"ts_ms\":1733763172329,\"ts_us\":1733763172329898,\"ts_ns\":1733763172329898468}}" |
| 190 | + } |
| 191 | + ] |
| 192 | + } |
| 193 | +} |
| 194 | +``` |
| 195 | + |
| 196 | +And that's it! Feel free to try out other database transactions which could touch several rows spanning across any subset of the five tables found in the `inventory` database. |
0 commit comments