Postgres is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes in tables (a process often referred to as Change Data Capture or CDC). It uses logical decoding and the standard built-in pgoutput output plugin.
Resources that can be loaded using this verified source are:
| Name | Description |
|---|---|
| replication_resource | Load published messages from a replication slot |
| init_replication | Initialize replication and optionally return snapshot resources for initial data load |
dlt init pg_replication duckdbThis uses duckdb as destination, but you can choose any of the supported destinations.
The Postgres user needs to have the LOGIN and REPLICATION attributes assigned:
CREATE ROLE replication_user WITH LOGIN REPLICATION;It also needs CREATE privilege on the database:
GRANT CREATE ON DATABASE dlt_data TO replication_user;If not a superuser, the user must have ownership of the tables that need to be replicated:
ALTER TABLE your_table OWNER TO replication_user; - You must enable replication for RDS Postgres instance via Parameter Group: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
WITH LOGIN REPLICATION;does not work on RDS, instead do:
GRANT rds_replication TO replication_user;- Do not fallback to non SSL connection by setting connection parameters:
sources.pg_replication.credentials="postgresql://loader:password@host.rds.amazonaws.com:5432/dlt_data?sslmode=require&connect_timeout=300"-
Open
.dlt/secrets.toml. -
Enter your Postgres credentials:
[sources.pg_replication] credentials="postgresql://replication_user:<<password>>@localhost:5432/dlt_data"
-
Enter credentials for your chosen destination as per the docs.
-
Install the necessary dependencies by running the following command:
pip install -r requirements.txt
-
Now the pipeline can be run by using the command:
python pg_replication_pipeline.py
-
To make sure that everything is loaded as expected, use the command:
dlt pipeline pg_replication_pipeline show