Skip to content

Commit b565243

Browse files
author
Sreekanth Iyer (Ushta Te Consultancy Services)
committed
Updated Flink 1.17
1 parent 8c16925 commit b565243

File tree

3 files changed

+72
-26
lines changed

3 files changed

+72
-26
lines changed

articles/hdinsight-aks/flink/monitor-changes-postgres-table-flink.md

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: Change Data Capture (CDC) of PostgreSQL table using Apache Flink®
33
description: Learn how to perform CDC on PostgreSQL table using Apache Flink®
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 10/27/2023
6+
ms.date: 03/28/2024
77
---
88

99
# Change Data Capture (CDC) of PostgreSQL table using Apache Flink®
@@ -12,7 +12,7 @@ ms.date: 10/27/2023
1212

1313
Change Data Capture (CDC) is a technique you can use to track row-level changes in database tables in response to create, update, and delete operations. In this article, we use [CDC Connectors for Apache Flink®](https://github.com/ververica/flink-cdc-connectors), which offer a set of source connectors for Apache Flink. The connectors integrate [Debezium®](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/#debezium-format) as the engine to capture the data changes.
1414

15-
Flink supports to interpret Debezium JSON and Avro messages as INSERT/UPDATE/DELETE messages into Apache Flink SQL system.
15+
Flink supports to interpret Debezium JSON and Avro messages as INSERT/UPDATE/DELETE messages into Apache Flink SQL system.
1616

1717
This support is useful in many cases to:
1818

@@ -91,7 +91,7 @@ Now, let's learn how to monitor changes on PostgreSQL table using Flink-SQL CDC.
9191
<dependency>
9292
<groupId>com.ververica</groupId>
9393
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
94-
<version>2.3.0</version>
94+
<version>2.4.2</version>
9595
</dependency>
9696
</dependencies>
9797
</project>
@@ -113,41 +113,87 @@ Now, let's learn how to monitor changes on PostgreSQL table using Flink-SQL CDC.
113113
114114
```sql
115115
/opt/flink-webssh/bin/sql-client.sh -j
116-
/opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.3.0.jar -j
116+
/opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j
117117
/opt/flink-webssh/target/slf4j-api-1.7.15.jar -j
118118
/opt/flink-webssh/target/hamcrest-2.1.jar -j
119-
/opt/flink-webssh/target/flink-shaded-guava-30.1.1-jre-16.0.jar -j
119+
/opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j
120120
/opt/flink-webssh/target/awaitility-4.0.1.jar -j
121121
/opt/flink-webssh/target/jsr308-all-1.1.2.jar
122122
```
123123
These commands start the sql client with the dependencies as,
124124
125-
:::image type="content" source="./media/monitor-changes-postgres-table-flink/start-the-sql-client.png" alt-text="Screenshot showing start-the-sql-client." border="true" lightbox="./media/monitor-changes-postgres-table-flink/start-the-sql-client.png":::
126-
127-
:::image type="content" source="./media/monitor-changes-postgres-table-flink/sql-client-status.png" alt-text="Screenshot showing sql-client-status." border="true" lightbox="./media/monitor-changes-postgres-table-flink/sql-client-status.png":::
128-
125+
```
126+
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar
127+
128+
????????
129+
????????????????
130+
??????? ??????? ?
131+
???? ????????? ?????
132+
??? ??????? ?????
133+
??? ??? ?????
134+
?? ???????????????
135+
?? ? ??? ?????? ?????
136+
????? ???? ????? ?????
137+
??????? ??? ??????? ???
138+
????????? ?? ?? ??????????
139+
???????? ?? ? ?? ???????
140+
???? ??? ? ?? ???????? ?????
141+
???? ? ?? ? ?? ???????? ???? ??
142+
???? ???? ?????????? ??? ?? ????
143+
???? ?? ??? ??????????? ???? ? ? ???
144+
??? ?? ??? ????????? ???? ???
145+
?? ? ??????? ???????? ??? ??
146+
??? ??? ???????????????????? ???? ?
147+
????? ??? ?????? ???????? ???? ??
148+
???????? ??????????????? ??
149+
?? ???? ??????? ??? ?????? ?? ???
150+
??? ??? ??? ??????? ???? ?????????????
151+
??? ????? ???? ?? ?? ???? ???
152+
?? ??? ? ?? ?? ??
153+
?? ?? ?? ?? ????????
154+
?? ????? ?? ??????????? ??
155+
?? ???? ? ??????? ??
156+
??? ????? ?? ???????????
157+
???? ???? ??????? ????????
158+
????? ?? ???? ?????
159+
????????????????????????????????? ?????
160+
161+
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
162+
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
163+
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
164+
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
165+
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
166+
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
167+
168+
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
169+
170+
Command history file path: /home/xcao/.flink-sql-history
171+
172+
Flink SQL>
173+
```
129174
130175
- Create a Flink PostgreSQL CDC table using CDC connector
131176
132177
```
133178
CREATE TABLE shipments (
134-
shipment_id INT,
135-
order_id INT,
136-
origin STRING,
137-
destination STRING,
138-
is_arrived BOOLEAN,
139-
PRIMARY KEY (shipment_id) NOT ENFORCED
140-
) WITH (
141-
'connector' = 'postgres-cdc',
142-
'hostname' = 'flinkpostgres.postgres.database.azure.com',
143-
'port' = '5432',
144-
'username' = 'username',
145-
'password' = 'admin',
146-
'database-name' = 'postgres',
147-
'schema-name' = 'public',
148-
'table-name' = 'shipments',
149-
'decoding.plugin.name' = 'pgoutput'
150-
);
179+
shipment_id INT,
180+
order_id INT,
181+
origin STRING,
182+
destination STRING,
183+
is_arrived BOOLEAN,
184+
PRIMARY KEY (shipment_id) NOT ENFORCED
185+
) WITH (
186+
'connector' = 'postgres-cdc',
187+
'hostname' = 'flinkpostgres.postgres.database.azure.com',
188+
'port' = '5432',
189+
'username' = 'username',
190+
....
191+
'database-name' = 'postgres',
192+
'schema-name' = 'public',
193+
'table-name' = 'shipments',
194+
'decoding.plugin.name' = 'pgoutput',
195+
'slot.name' = 'flink'
196+
);
151197
```
152198
## Validation
153199

0 commit comments

Comments
 (0)