You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to implement the Flink CDC connector on AWS EMR to write SQL server data into S3 in Hudi format.
When I make changes to the SQL server table I can see the changes reflected in the source table. However, the sink table does not reflect those changes. And due to this the parquet files on S3 are not updated as well.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am trying to implement the Flink CDC connector on AWS EMR to write SQL server data into S3 in Hudi format.
When I make changes to the SQL server table I can see the changes reflected in the source table. However, the sink table does not reflect those changes. And due to this the parquet files on S3 are not updated as well.
I referred to this documentation by AWS https://aws.amazon.com/blogs/big-data/build-a-unified-data-lake-with-apache-flink-on-amazon-emr/
Are there any additional configurations I need to add to see the incremental changes?
Below are the steps i tried
Start Flink on a YARN session
flink-yarn-session -d -jm 2048 -tm 4096 -s 2
-D state.backend=rocksdb
-D state.backend.incremental=true
-D state.checkpoint-storage=filesystem
-D state.checkpoints.dir=s3://xxxxx/flink-checkponts/
-D state.checkpoints.num-retained=10
-D execution.checkpointing.interval=10s
-D execution.checkpointing.mode=EXACTLY_ONCE
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
-D execution.checkpointing.max-concurrent-checkpoints=1
-D execution.runtime-mode=STREAMING
Start Flink SQL client
CREATE CATALOG glue_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/etc/hive/conf.dist'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS flink_cdc_db WITH ('hive.database.location-uri'= 's3://xxxx/flink-glue-for-hudi/warehouse/');
use flink_cdc_db;
CREATE TABLE
glue_catalog
.flink_cdc_db
.Persons
(ID INT NOT NULL,
FirstName STRING,
Age INT,
Dob TIMESTAMP,
PRIMARY KEY (
ID
) NOT ENFORCED) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'xxx',
'port' = '1433',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'flink_cdc_test1',
'table-name' = 'dbo.Persons'
);
CREATE CATALOG glue_catalog_for_hudi WITH (
'type' = 'hudi',
'mode' = 'hms',
'table.external' = 'true',
'default-database' = 'default',
'hive.conf.dir' = '/etc/hive/conf.dist',
'catalog.path' = 's3://xxxx/flink-glue-for-hudi/warehouse/'
);
USE CATALOG glue_catalog_for_hudi;
CREATE DATABASE IF NOT EXISTS flink_glue_hudi_db;
use flink_glue_hudi_db;
CREATE TABLE
glue_catalog_for_hudi
.flink_glue_hudi_db
.Persons
(ID INT NOT NULL,
FirstName STRING,
Age INT,
PRIMARY KEY (ID) NOT Enforced
)
WITH (
'connector' = 'hudi',
'write.tasks' = '2',
'path' = 's3://xxxx/flink-glue-for-hudi/warehouse/Persons_0928_3',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
INSERT INTO
glue_catalog_for_hudi
.flink_glue_hudi_db
.Persons
SELECT
ID,
FirstName,
Age
FROM
glue_catalog
.flink_cdc_db
.Persons
;Beta Was this translation helpful? Give feedback.
All reactions