Skip to content

Commit 1e9b8be

Browse files
authored
Add update/delete support to logical replication (#603)
1 parent 6599967 commit 1e9b8be

File tree

20 files changed

+584
-238
lines changed

20 files changed

+584
-238
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ jobs:
113113
sudo -u postgres createdb $USER
114114
sudo -u postgres psql -c 'ALTER SYSTEM SET max_connections TO 1000;'
115115
sudo -u postgres psql -c 'ALTER SYSTEM SET max_prepared_transactions TO 1000;'
116+
sudo -u postgres psql -c 'ALTER SYSTEM SET wal_level TO logical;'
116117
sudo service postgresql restart
117118
bash integration/setup.sh
118119
sudo apt update && sudo apt install -y python3-virtualenv mold
@@ -163,6 +164,8 @@ jobs:
163164
run: bash integration/rust/run.sh
164165
- name: Stop shared PgDog
165166
run: bash -lc 'source integration/common.sh; stop_pgdog'
167+
- name: Data sync
168+
run: bash integration/copy_data/dev.sh
166169
- name: Python
167170
run: bash integration/python/run.sh
168171
- name: Load balancer
@@ -190,6 +193,7 @@ jobs:
190193
plugin-unit-tests:
191194
runs-on: blacksmith-4vcpu-ubuntu-2404
192195
continue-on-error: true
196+
timeout-minutes: 30
193197
steps:
194198
- uses: actions/checkout@v4
195199
- uses: actions-rs/toolchain@v1
@@ -212,6 +216,7 @@ jobs:
212216
plugin-integration-tests:
213217
runs-on: blacksmith-4vcpu-ubuntu-2404
214218
continue-on-error: true
219+
timeout-minutes: 30
215220
steps:
216221
- uses: actions/checkout@v4
217222
- uses: actions-rs/toolchain@v1

integration/copy_data/dev.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/bash
2+
set -e
3+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
4+
DEFAULT_BIN="${SCRIPT_DIR}/../../target/release/pgdog"
5+
PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN}
6+
7+
export PGUSER=pgdog
8+
export PGDATABASE=pgdog
9+
export PGHOST=127.0.0.1
10+
export PGPORT=5432
11+
export PGPASSWORD=pgdog
12+
13+
pushd ${SCRIPT_DIR}
14+
15+
psql -f init.sql
16+
17+
${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog
18+
${PGDOG_BIN} data-sync --sync-only --from-database source --to-database destination --publication pgdog --replication-slot copy_data
19+
popd

integration/copy_data/init.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
\c pgdog1
2+
DROP SCHEMA IF EXISTS copy_data CASCADE;
3+
\c pgdog2
4+
DROP SCHEMA IF EXISTS copy_data CASCADE;
5+
\c pgdog
6+
DROP SCHEMA IF EXISTS copy_data CASCADE;
7+
SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;
8+
\i setup.sql

integration/copy_data/psql.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
PGPASSWORD=pgdog psql -h 127.0.0.1 -p 5432 -U pgdog $1

integration/copy_data/setup.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ CREATE TABLE IF NOT EXISTS copy_data.users (
55
tenant_id BIGINT NOT NULL,
66
email VARCHAR NOT NULL,
77
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
8-
settings JSONB NOT NULL DEFAULT '{}'::jsonb
8+
settings JSONB NOT NULL DEFAULT '{}'::jsonb,
9+
PRIMARY KEY(id, tenant_id)
910
) PARTITION BY HASH(tenant_id);
1011

1112
CREATE TABLE IF NOT EXISTS copy_data.users_0 PARTITION OF copy_data.users

pgdog-config/src/core.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::path::PathBuf;
55
use tracing::{info, warn};
66

77
use crate::sharding::ShardedSchema;
8-
use crate::{Memory, PassthoughAuth, PreparedStatements};
8+
use crate::{Memory, PassthoughAuth, PreparedStatements, RewriteMode};
99

1010
use super::database::Database;
1111
use super::error::Error;
@@ -326,6 +326,20 @@ impl Config {
326326
}
327327
_ => (),
328328
}
329+
330+
if !self.general.two_phase_commit {
331+
if self.rewrite.enabled {
332+
if self.rewrite.shard_key == RewriteMode::Rewrite {
333+
warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended"
334+
);
335+
}
336+
337+
if self.rewrite.split_inserts == RewriteMode::Rewrite {
338+
warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended"
339+
);
340+
}
341+
}
342+
}
329343
}
330344

331345
/// Multi-tenancy is enabled.

pgdog/src/backend/databases.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@ pub(crate) fn new_pool(
430430
sharded_tables,
431431
config.multi_tenant(),
432432
sharded_schemas,
433+
&config.rewrite,
433434
);
434435

435436
Some((

pgdog/src/backend/pool/cluster.rs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! A collection of replicas and a primary.
22
33
use parking_lot::{Mutex, RwLock};
4+
use pgdog_config::{PreparedStatements, Rewrite};
45
use std::sync::{
56
atomic::{AtomicBool, Ordering},
67
Arc,
@@ -52,6 +53,12 @@ pub struct Cluster {
5253
two_phase_commit: bool,
5354
two_phase_commit_auto: bool,
5455
online: Arc<AtomicBool>,
56+
rewrite: Rewrite,
57+
prepared_statements: PreparedStatements,
58+
dry_run: bool,
59+
expanded_explain: bool,
60+
pub_sub_channel_size: usize,
61+
query_parser_enabled: bool,
5562
}
5663

5764
/// Sharding configuration from the cluster.
@@ -94,6 +101,12 @@ pub struct ClusterConfig<'a> {
94101
pub two_pc: bool,
95102
pub two_pc_auto: bool,
96103
pub sharded_schemas: ShardedSchemas,
104+
pub rewrite: &'a Rewrite,
105+
pub prepared_statements: &'a PreparedStatements,
106+
pub dry_run: bool,
107+
pub expanded_explain: bool,
108+
pub pub_sub_channel_size: usize,
109+
pub query_parser_enabled: bool,
97110
}
98111

99112
impl<'a> ClusterConfig<'a> {
@@ -104,6 +117,7 @@ impl<'a> ClusterConfig<'a> {
104117
sharded_tables: ShardedTables,
105118
multi_tenant: &'a Option<MultiTenant>,
106119
sharded_schemas: ShardedSchemas,
120+
rewrite: &'a Rewrite,
107121
) -> Self {
108122
Self {
109123
name: &user.database,
@@ -126,6 +140,12 @@ impl<'a> ClusterConfig<'a> {
126140
.two_phase_commit_auto
127141
.unwrap_or(general.two_phase_commit_auto.unwrap_or(false)), // Disable by default.
128142
sharded_schemas,
143+
rewrite,
144+
prepared_statements: &general.prepared_statements,
145+
dry_run: general.dry_run,
146+
expanded_explain: general.expanded_explain,
147+
pub_sub_channel_size: general.pub_sub_channel_size,
148+
query_parser_enabled: general.query_parser_enabled,
129149
}
130150
}
131151
}
@@ -150,6 +170,12 @@ impl Cluster {
150170
two_pc,
151171
two_pc_auto,
152172
sharded_schemas,
173+
rewrite,
174+
prepared_statements,
175+
dry_run,
176+
expanded_explain,
177+
pub_sub_channel_size,
178+
query_parser_enabled,
153179
} = config;
154180

155181
Self {
@@ -175,9 +201,23 @@ impl Cluster {
175201
two_phase_commit: two_pc && shards.len() > 1,
176202
two_phase_commit_auto: two_pc_auto && shards.len() > 1,
177203
online: Arc::new(AtomicBool::new(false)),
204+
rewrite: rewrite.clone(),
205+
prepared_statements: prepared_statements.clone(),
206+
dry_run,
207+
expanded_explain,
208+
pub_sub_channel_size,
209+
query_parser_enabled,
178210
}
179211
}
180212

213+
/// Change config to work with logical replication streaming.
214+
pub fn logical_stream(&self) -> Self {
215+
let mut cluster = self.clone();
216+
// Disable rewrites, we are only sending valid statements.
217+
cluster.rewrite.enabled = false;
218+
cluster
219+
}
220+
181221
/// Get a connection to a primary of the given shard.
182222
pub async fn primary(&self, shard: usize, request: &Request) -> Result<Guard, Error> {
183223
let shard = self.shards.get(shard).ok_or(Error::NoShard(shard))?;
@@ -251,6 +291,31 @@ impl Cluster {
251291
self.sharded_tables.tables()
252292
}
253293

294+
/// Get query rewrite config.
295+
pub fn rewrite(&self) -> &Rewrite {
296+
&self.rewrite
297+
}
298+
299+
pub fn query_parser_enabled(&self) -> bool {
300+
self.query_parser_enabled
301+
}
302+
303+
pub fn prepared_statements(&self) -> &PreparedStatements {
304+
&self.prepared_statements
305+
}
306+
307+
pub fn dry_run(&self) -> bool {
308+
self.dry_run
309+
}
310+
311+
pub fn expanded_explain(&self) -> bool {
312+
self.expanded_explain
313+
}
314+
315+
pub fn pub_sub_enabled(&self) -> bool {
316+
self.pub_sub_channel_size > 0
317+
}
318+
254319
/// Find sharded column position, if the table and columns match the configuration.
255320
pub fn sharded_column(&self, table: &str, columns: &[&str]) -> Option<ShardedColumn> {
256321
self.sharded_tables.sharded_column(table, columns)
@@ -412,10 +477,12 @@ mod test {
412477
use std::sync::Arc;
413478

414479
use crate::{
415-
backend::pool::{Address, Config, PoolConfig},
416-
backend::{Shard, ShardedTables},
480+
backend::{
481+
pool::{Address, Config, PoolConfig},
482+
Shard, ShardedTables,
483+
},
417484
config::{
418-
DataType, Hasher, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy,
485+
config, DataType, Hasher, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy,
419486
ShardedTable,
420487
},
421488
};
@@ -424,6 +491,7 @@ mod test {
424491

425492
impl Cluster {
426493
pub fn new_test() -> Self {
494+
let config = config();
427495
Cluster {
428496
sharded_tables: ShardedTables::new(
429497
vec![ShardedTable {
@@ -470,6 +538,13 @@ mod test {
470538
user: "pgdog".into(),
471539
database: "pgdog".into(),
472540
}),
541+
prepared_statements: config.config.general.prepared_statements,
542+
dry_run: config.config.general.dry_run,
543+
expanded_explain: config.config.general.expanded_explain,
544+
query_parser_enabled: config.config.general.query_parser_enabled,
545+
rewrite: config.config.rewrite.clone(),
546+
two_phase_commit: config.config.general.two_phase_commit,
547+
two_phase_commit_auto: config.config.general.two_phase_commit_auto.unwrap_or(false),
473548
..Default::default()
474549
}
475550
}

pgdog/src/backend/replication/logical/error.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ use crate::{backend::replication::publisher::PublicationTable, net::ErrorRespons
66

77
#[derive(Debug, Error)]
88
pub enum Error {
9-
#[error("{0}")]
9+
#[error("backend: {0}")]
1010
Backend(#[from] crate::backend::Error),
1111

12-
#[error("{0}")]
12+
#[error("pool: {0}")]
1313
Pool(#[from] crate::backend::pool::Error),
1414

15-
#[error("{0}")]
15+
#[error("router: {0}")]
16+
Router(#[from] crate::frontend::router::Error),
17+
18+
#[error("net: {0}")]
1619
Net(#[from] crate::net::Error),
1720

1821
#[error("transaction not started")]
@@ -21,6 +24,12 @@ pub enum Error {
2124
#[error("out of sync, got {0}")]
2225
OutOfSync(char),
2326

27+
#[error("out of sync during commit, got {0}")]
28+
CommitOutOfSync(char),
29+
30+
#[error("out of sync during relation prepare, got {0}")]
31+
RelationOutOfSync(char),
32+
2433
#[error("missing data")]
2534
MissingData,
2635

@@ -30,7 +39,7 @@ pub enum Error {
3039
#[error("copy error")]
3140
Copy,
3241

33-
#[error("{0}")]
42+
#[error("pg_error: {0}")]
3443
PgError(Box<ErrorResponse>),
3544

3645
#[error("table \"{0}\".\"{1}\" has no replica identity")]
@@ -39,13 +48,16 @@ pub enum Error {
3948
#[error("lsn decode")]
4049
LsnDecode,
4150

51+
#[error("replication slot \"{0}\" doesn't exist, but it should")]
52+
MissingReplicationSlot(String),
53+
4254
#[error("parse int")]
4355
ParseInt(#[from] ParseIntError),
4456

4557
#[error("shard has no primary")]
4658
NoPrimary,
4759

48-
#[error("{0}")]
60+
#[error("parser: {0}")]
4961
Parser(#[from] crate::frontend::router::parser::Error),
5062

5163
#[error("not connected")]
@@ -68,6 +80,9 @@ pub enum Error {
6880

6981
#[error("table {0} doesn't have a primary key")]
7082
NoPrimaryKey(PublicationTable),
83+
84+
#[error("router returned incorrect command")]
85+
IncorrectCommand,
7186
}
7287

7388
impl From<ErrorResponse> for Error {

0 commit comments

Comments
 (0)