Skip to content

Commit 874c044

Browse files
authored
Copy partitioned table data (#602)
1 parent cf3461c commit 874c044

File tree

9 files changed

+282
-31
lines changed

9 files changed

+282
-31
lines changed

integration/copy_data/pgdog.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[[databases]]
2+
name = "source"
3+
host = "127.0.0.1"
4+
database_name = "pgdog"
5+
6+
[[databases]]
7+
name = "destination"
8+
host = "127.0.0.1"
9+
database_name = "pgdog1"
10+
shard = 0
11+
12+
[[databases]]
13+
name = "destination"
14+
host = "127.0.0.1"
15+
database_name = "pgdog2"
16+
shard = 1
17+
18+
[[sharded_tables]]
19+
database = "destination"
20+
column = "tenant_id"
21+
data_type = "bigint"

integration/copy_data/setup.sql

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
CREATE SCHEMA IF NOT EXISTS copy_data;
2+
3+
CREATE TABLE IF NOT EXISTS copy_data.users (
4+
id BIGINT NOT NULL,
5+
tenant_id BIGINT NOT NULL,
6+
email VARCHAR NOT NULL,
7+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
8+
settings JSONB NOT NULL DEFAULT '{}'::jsonb
9+
) PARTITION BY HASH(tenant_id);
10+
11+
CREATE TABLE IF NOT EXISTS copy_data.users_0 PARTITION OF copy_data.users
12+
FOR VALUES WITH (MODULUS 2, REMAINDER 0);
13+
14+
CREATE TABLE IF NOT EXISTS copy_data.users_1 PARTITION OF copy_data.users
15+
FOR VALUES WITH (MODULUS 2, REMAINDER 1);
16+
17+
TRUNCATE TABLE copy_data.users;
18+
19+
INSERT INTO copy_data.users (id, tenant_id, email, created_at, settings)
20+
SELECT
21+
gs.id,
22+
((gs.id - 1) % 20) + 1 AS tenant_id, -- distribute across 20 tenants
23+
format('user_%s_tenant_%[email protected]', gs.id, ((gs.id - 1) % 20) + 1) AS email,
24+
NOW() - (random() * interval '365 days') AS created_at, -- random past date
25+
jsonb_build_object(
26+
'theme', CASE (random() * 3)::int
27+
WHEN 0 THEN 'light'
28+
WHEN 1 THEN 'dark'
29+
ELSE 'auto'
30+
END,
31+
'notifications', (random() > 0.5)
32+
) AS settings
33+
FROM generate_series(1, 10000) AS gs(id);
34+
35+
DROP TABLE copy_data.orders;
36+
CREATE TABLE IF NOT EXISTS copy_data.orders (
37+
id BIGSERIAL PRIMARY KEY,
38+
user_id BIGINT NOT NULL,
39+
tenant_id BIGINT NOT NULL,
40+
amount DOUBLE PRECISION NOT NULL DEFAULT 0.0,
41+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
42+
refunded_at TIMESTAMPTZ
43+
);
44+
45+
CREATE TABLE IF NOT EXISTS copy_data.order_items (
46+
user_id BIGINT NOT NULL,
47+
tenant_id BIGINT NOT NULL,
48+
order_id BIGINT NOT NULL REFERENCES copy_data.orders(id),
49+
amount DOUBLE PRECISION NOT NULL DEFAULT 0.0,
50+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
51+
refunded_at TIMESTAMPTZ
52+
);
53+
54+
-- --- Fix/define schema (safe to run if you're starting fresh) ---
55+
-- Adjust/drop statements as needed if the tables already exist.
56+
TRUNCATE TABLE copy_data.order_items CASCADE;
57+
TRUNCATE TABLE copy_data.orders CASCADE;
58+
59+
WITH u AS (
60+
-- Pull the 10k users we inserted earlier
61+
SELECT id AS user_id, tenant_id
62+
FROM copy_data.users
63+
WHERE id BETWEEN 1 AND 10000
64+
ORDER BY id
65+
),
66+
orders_base AS (
67+
-- One order per user (10k orders), deterministic order_id = user_id
68+
SELECT
69+
u.user_id AS order_id,
70+
u.user_id,
71+
u.tenant_id,
72+
-- random created_at in last 365 days
73+
NOW() - (random() * INTERVAL '365 days') AS created_at,
74+
-- ~10% refunded
75+
CASE WHEN random() < 0.10
76+
THEN NOW() - (random() * INTERVAL '180 days')
77+
ELSE NULL
78+
END AS refunded_at
79+
FROM u
80+
),
81+
items_raw AS (
82+
-- 1–5 items per order, random amounts $5–$200
83+
SELECT
84+
ob.order_id,
85+
ob.user_id,
86+
ob.tenant_id,
87+
-- skew item counts 1..5 (uniform)
88+
gs.i AS item_index,
89+
-- random item amount with cents
90+
ROUND((5 + random() * 195)::numeric, 2)::float8 AS item_amount,
91+
-- item created_at: on/after order created_at by up to 3 hours
92+
ob.created_at + (random() * INTERVAL '3 hours') AS item_created_at,
93+
-- if order refunded, item refunded too (optionally jitter within 2 hours)
94+
CASE WHEN ob.refunded_at IS NOT NULL
95+
THEN ob.refunded_at + (random() * INTERVAL '2 hours')
96+
ELSE NULL
97+
END AS item_refunded_at
98+
FROM orders_base ob
99+
CROSS JOIN LATERAL generate_series(1, 1 + (floor(random()*5))::int) AS gs(i)
100+
),
101+
order_totals AS (
102+
SELECT
103+
order_id,
104+
user_id,
105+
tenant_id,
106+
MIN(item_created_at) AS created_at,
107+
-- sum of item amounts per order
108+
ROUND(SUM(item_amount)::numeric, 2)::float8 AS order_amount,
109+
-- carry refund state from items_raw (same per order)
110+
MAX(item_refunded_at) AS refunded_at
111+
FROM items_raw
112+
GROUP BY order_id, user_id, tenant_id
113+
),
114+
ins_orders AS (
115+
INSERT INTO copy_data.orders (id, user_id, tenant_id, amount, created_at, refunded_at)
116+
SELECT
117+
ot.order_id, -- id = user_id = 1..10000
118+
ot.user_id,
119+
ot.tenant_id,
120+
ot.order_amount,
121+
ot.created_at,
122+
ot.refunded_at
123+
FROM order_totals ot
124+
RETURNING id
125+
)
126+
INSERT INTO copy_data.order_items (user_id, tenant_id, order_id, amount, created_at, refunded_at)
127+
SELECT
128+
ir.user_id,
129+
ir.tenant_id,
130+
ir.order_id,
131+
ir.item_amount,
132+
ir.item_created_at,
133+
ir.item_refunded_at
134+
FROM items_raw ir;
135+
136+
DROP PUBLICATION IF EXISTS pgdog;
137+
CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data;

integration/copy_data/users.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[[users]]
2+
database = "source"
3+
name = "pgdog"
4+
password = "pgdog"
5+
schema_admin = true
6+
7+
[[users]]
8+
database = "destination"
9+
name = "pgdog"
10+
password = "pgdog"
11+
schema_admin = true

pgdog/src/backend/replication/logical/copy_statement.rs

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
//! Generate COPY statement for table synchronization.
33
//!
44
5+
use super::publisher::PublicationTable;
6+
57
/// COPY statement generator.
68
#[derive(Debug, Clone)]
79
pub struct CopyStatement {
8-
schema: String,
9-
table: String,
10+
table: PublicationTable,
1011
columns: Vec<String>,
1112
}
1213

@@ -19,10 +20,9 @@ impl CopyStatement {
1920
/// * `table`: Name of the table.
2021
/// * `columns`: Table column names.
2122
///
22-
pub fn new(schema: &str, table: &str, columns: &[String]) -> CopyStatement {
23+
pub fn new(table: &PublicationTable, columns: &[String]) -> CopyStatement {
2324
CopyStatement {
24-
schema: schema.to_owned(),
25-
table: table.to_owned(),
25+
table: table.clone(),
2626
columns: columns.to_vec(),
2727
}
2828
}
@@ -37,12 +37,36 @@ impl CopyStatement {
3737
self.copy(false)
3838
}
3939

40+
fn schema_name(&self, out: bool) -> &str {
41+
if out {
42+
&self.table.schema
43+
} else {
44+
if self.table.parent_schema.is_empty() {
45+
&self.table.schema
46+
} else {
47+
&self.table.parent_schema
48+
}
49+
}
50+
}
51+
52+
fn table_name(&self, out: bool) -> &str {
53+
if out {
54+
&self.table.name
55+
} else {
56+
if self.table.parent_name.is_empty() {
57+
&self.table.name
58+
} else {
59+
&self.table.parent_name
60+
}
61+
}
62+
}
63+
4064
// Generate the statement.
4165
fn copy(&self, out: bool) -> String {
4266
format!(
4367
r#"COPY "{}"."{}" ({}) {} WITH (FORMAT binary)"#,
44-
self.schema,
45-
self.table,
68+
self.schema_name(out),
69+
self.table_name(out),
4670
self.columns
4771
.iter()
4872
.map(|c| format!(r#""{}""#, c))
@@ -59,16 +83,42 @@ mod test {
5983

6084
#[test]
6185
fn test_copy_stmt() {
62-
let copy = CopyStatement::new("public", "test", &["id".into(), "email".into()]).copy_in();
86+
let table = PublicationTable {
87+
schema: "public".into(),
88+
name: "test".into(),
89+
..Default::default()
90+
};
91+
92+
let copy = CopyStatement::new(&table, &["id".into(), "email".into()]);
93+
let copy_in = copy.copy_in();
6394
assert_eq!(
64-
copy.to_string(),
95+
copy_in,
6596
r#"COPY "public"."test" ("id", "email") FROM STDIN WITH (FORMAT binary)"#
6697
);
6798

68-
let copy = CopyStatement::new("public", "test", &["id".into(), "email".into()]).copy_out();
6999
assert_eq!(
70-
copy.to_string(),
100+
copy.copy_out(),
71101
r#"COPY "public"."test" ("id", "email") TO STDOUT WITH (FORMAT binary)"#
72102
);
103+
104+
let table = PublicationTable {
105+
schema: "public".into(),
106+
name: "test_0".into(),
107+
parent_name: "test".into(),
108+
parent_schema: "public".into(),
109+
..Default::default()
110+
};
111+
112+
let copy = CopyStatement::new(&table, &["id".into(), "email".into()]);
113+
let copy_in = copy.copy_in();
114+
assert_eq!(
115+
copy_in,
116+
r#"COPY "public"."test" ("id", "email") FROM STDIN WITH (FORMAT binary)"#
117+
);
118+
119+
assert_eq!(
120+
copy.copy_out(),
121+
r#"COPY "public"."test_0" ("id", "email") TO STDOUT WITH (FORMAT binary)"#
122+
);
73123
}
74124
}

pgdog/src/backend/replication/logical/publisher/copy.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
backend::Server,
33
net::{CopyData, ErrorResponse, FromBytes, Protocol, Query, ToBytes},
44
};
5-
use tracing::trace;
5+
use tracing::{debug, trace};
66

77
use super::{
88
super::{CopyStatement, Error},
@@ -17,8 +17,7 @@ pub struct Copy {
1717
impl Copy {
1818
pub fn new(table: &Table) -> Self {
1919
let stmt = CopyStatement::new(
20-
&table.table.schema,
21-
&table.table.name,
20+
&table.table,
2221
&table
2322
.columns
2423
.iter()
@@ -34,9 +33,10 @@ impl Copy {
3433
return Err(Error::TransactionNotStarted);
3534
}
3635

37-
server
38-
.send(&vec![Query::new(self.stmt.copy_out()).into()].into())
39-
.await?;
36+
let query = Query::new(self.stmt.copy_out());
37+
debug!("{} [{}]", query.query(), server.addr());
38+
39+
server.send(&vec![query.into()].into()).await?;
4040
let result = server.read().await?;
4141
match result.code() {
4242
'E' => return Err(ErrorResponse::from_bytes(result.to_bytes()?)?.into()),

pgdog/src/backend/replication/logical/publisher/queries.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,33 @@ use crate::{
1313
use super::super::Error;
1414

1515
/// Get list of tables in publication.
16-
static TABLES: &str = "SELECT DISTINCT n.nspname, c.relname, gpt.attrs
16+
static TABLES: &str = "SELECT DISTINCT
17+
n.nspname,
18+
c.relname,
19+
gpt.attrs,
20+
COALESCE(pn.nspname::text, '') AS parent_schema,
21+
COALESCE(p.relname::text, '') AS parent_table
1722
FROM pg_class c
1823
JOIN pg_namespace n ON n.oid = c.relnamespace
19-
JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*
20-
FROM pg_publication
21-
WHERE pubname IN ($1)) AS gpt
22-
ON gpt.relid = c.oid
23-
ORDER BY n.nspname, c.relname";
24+
JOIN (
25+
SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*
26+
FROM pg_publication
27+
WHERE pubname IN ($1)
28+
) AS gpt
29+
ON gpt.relid = c.oid
30+
LEFT JOIN pg_inherits i ON i.inhrelid = c.oid -- only present if c is a child partition
31+
LEFT JOIN pg_class p ON p.oid = i.inhparent -- immediate parent partitioned table
32+
LEFT JOIN pg_namespace pn ON pn.oid = p.relnamespace
33+
ORDER BY n.nspname, c.relname;";
2434

2535
/// Table included in a publication.
26-
#[derive(Debug, Clone, PartialEq)]
36+
#[derive(Debug, Clone, PartialEq, Default)]
2737
pub struct PublicationTable {
2838
pub schema: String,
2939
pub name: String,
3040
pub attributes: String,
41+
pub parent_schema: String,
42+
pub parent_name: String,
3143
}
3244

3345
impl Display for PublicationTable {
@@ -53,6 +65,8 @@ impl From<DataRow> for PublicationTable {
5365
schema: value.get(0, Format::Text).unwrap_or_default(),
5466
name: value.get(1, Format::Text).unwrap_or_default(),
5567
attributes: value.get(2, Format::Text).unwrap_or_default(),
68+
parent_schema: value.get(3, Format::Text).unwrap_or_default(),
69+
parent_name: value.get(4, Format::Text).unwrap_or_default(),
5670
}
5771
}
5872
}

0 commit comments

Comments
 (0)