Skip to content

Commit f25f092

Browse files
authored
Handle NULL sharding key in query router (#624)
- feat: handle null sharding key in query router - fix: `TupleData` to `Bind` conversion incorrectly handled nulls - fix: handle null sharding key in `data-sync` (consequence of query router fix)
1 parent eb777eb commit f25f092

File tree

5 files changed

+127
-29
lines changed

5 files changed

+127
-29
lines changed

integration/copy_data/setup.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,5 +134,20 @@ SELECT
134134
ir.item_refunded_at
135135
FROM items_raw ir;
136136

137+
CREATE TABLE copy_data.log_actions (
138+
id BIGSERIAL PRIMARY KEY,
139+
tenant_id BIGINT,
140+
action VARCHAR,
141+
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
142+
);
143+
144+
INSERT INTO copy_data.log_actions (tenant_id, action)
145+
SELECT
146+
CASE WHEN random() < 0.2 THEN NULL ELSE (floor(random() * 10000) + 1)::bigint END AS tenant_id,
147+
(ARRAY['login', 'logout', 'click', 'purchase', 'view', 'error'])[
148+
floor(random() * 6 + 1)::int
149+
] AS action
150+
FROM generate_series(1, 10000);
151+
137152
DROP PUBLICATION IF EXISTS pgdog;
138153
CREATE PUBLICATION pgdog FOR TABLES IN SCHEMA copy_data;

pgdog/src/backend/pool/cluster.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A collection of replicas and a primary.
22
33
use parking_lot::{Mutex, RwLock};
4-
use pgdog_config::{PreparedStatements, Rewrite};
4+
use pgdog_config::{PreparedStatements, Rewrite, RewriteMode};
55
use std::{
66
sync::{
77
atomic::{AtomicBool, Ordering},
@@ -265,6 +265,8 @@ impl Cluster {
265265
let mut cluster = self.clone();
266266
// Disable rewrites, we are only sending valid statements.
267267
cluster.rewrite.enabled = false;
268+
cluster.rewrite.shard_key = RewriteMode::Ignore;
269+
cluster.rewrite.split_inserts = RewriteMode::Ignore;
268270
cluster
269271
}
270272

pgdog/src/frontend/router/parser/insert.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,17 @@ impl<'a> Insert<'a> {
116116
if let Some(key) = key {
117117
if let Some(bind) = bind {
118118
if let Ok(Some(param)) = bind.parameter(key.position) {
119-
// Arrays not supported as sharding keys at the moment.
120-
let value = ShardingValue::from_param(&param, key.table.data_type)?;
121-
let ctx = ContextBuilder::new(key.table)
122-
.value(value)
123-
.shards(schema.shards)
124-
.build()?;
125-
return Ok(InsertRouting::Routed(ctx.apply()?));
119+
if param.is_null() {
120+
return Ok(InsertRouting::Routed(Shard::All));
121+
} else {
122+
// Arrays not supported as sharding keys at the moment.
123+
let value = ShardingValue::from_param(&param, key.table.data_type)?;
124+
let ctx = ContextBuilder::new(key.table)
125+
.value(value)
126+
.shards(schema.shards)
127+
.build()?;
128+
return Ok(InsertRouting::Routed(ctx.apply()?));
129+
}
126130
}
127131
}
128132

@@ -845,4 +849,34 @@ mod test {
845849
_ => panic!("not an insert"),
846850
}
847851
}
852+
853+
#[test]
854+
fn test_null_sharding_key_routes_to_all() {
855+
let query = parse("INSERT INTO sharded (id, value) VALUES ($1, 'test')").unwrap();
856+
let select = query.protobuf.stmts.first().unwrap().stmt.as_ref().unwrap();
857+
let schema = ShardingSchema {
858+
shards: 3,
859+
tables: ShardedTables::new(
860+
vec![ShardedTable {
861+
name: Some("sharded".into()),
862+
column: "id".into(),
863+
..Default::default()
864+
}],
865+
vec![],
866+
),
867+
..Default::default()
868+
};
869+
870+
match &select.node {
871+
Some(NodeEnum::InsertStmt(stmt)) => {
872+
let insert = Insert::new(stmt);
873+
let bind = Bind::new_params("", &[Parameter::new_null()]);
874+
let routing = insert
875+
.shard(&schema, Some(&bind), false, RewriteMode::Error)
876+
.unwrap();
877+
assert!(matches!(routing.shard(), Shard::All));
878+
}
879+
_ => panic!("not an insert"),
880+
}
881+
}
848882
}

pgdog/src/frontend/router/parser/query/shared.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -105,26 +105,44 @@ impl QueryParser {
105105
break;
106106
} else if let Some(params) = params {
107107
if let Some(param) = params.parameter(pos)? {
108-
let value = ShardingValue::from_param(&param, table.data_type)?;
109-
let ctx = ContextBuilder::new(table)
110-
.value(value)
111-
.shards(sharding_schema.shards)
112-
.build()?;
113-
let shard = ctx.apply()?;
114-
record_column(
115-
recorder,
116-
Some(shard.clone()),
117-
table_name,
118-
&table.column,
119-
|col| {
120-
format!(
121-
"matched sharding key {} using parameter ${}",
122-
col,
123-
pos + 1
124-
)
125-
},
126-
);
127-
shards.insert(shard);
108+
if param.is_null() {
109+
let shard = Shard::All;
110+
shards.insert(shard.clone());
111+
record_column(
112+
recorder,
113+
Some(shard),
114+
table_name,
115+
&table.column,
116+
|col| {
117+
format!(
118+
"sharding key {} (parameter ${}) is null",
119+
col,
120+
pos + 1
121+
)
122+
},
123+
);
124+
} else {
125+
let value = ShardingValue::from_param(&param, table.data_type)?;
126+
let ctx = ContextBuilder::new(table)
127+
.value(value)
128+
.shards(sharding_schema.shards)
129+
.build()?;
130+
let shard = ctx.apply()?;
131+
record_column(
132+
recorder,
133+
Some(shard.clone()),
134+
table_name,
135+
&table.column,
136+
|col| {
137+
format!(
138+
"matched sharding key {} using parameter ${}",
139+
col,
140+
pos + 1
141+
)
142+
},
143+
);
144+
shards.insert(shard);
145+
}
128146
}
129147
}
130148
}

pgdog/src/net/messages/replication/logical/tuple_data.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl TupleData {
7474
.columns
7575
.iter()
7676
.map(|c| {
77-
if c.data.is_empty() {
77+
if c.identifier == Identifier::Null {
7878
Parameter::new_null()
7979
} else {
8080
Parameter::new(&c.data)
@@ -128,3 +128,32 @@ impl FromBytes for TupleData {
128128
Self::from_buffer(&mut bytes)
129129
}
130130
}
131+
132+
#[cfg(test)]
133+
mod test {
134+
use super::*;
135+
136+
#[test]
137+
fn test_null_conversion() {
138+
let data = TupleData {
139+
columns: vec![
140+
Column {
141+
identifier: Identifier::Null,
142+
len: 0,
143+
data: Bytes::new(),
144+
},
145+
Column {
146+
identifier: Identifier::Format(Format::Text),
147+
len: 4,
148+
data: Bytes::from(String::from("1234")),
149+
},
150+
],
151+
};
152+
153+
let bind = data.to_bind("__pgdog_1");
154+
assert_eq!(bind.statement(), "__pgdog_1");
155+
assert!(bind.parameter(0).unwrap().unwrap().is_null());
156+
assert!(!bind.parameter(1).unwrap().unwrap().is_null());
157+
assert_eq!(bind.parameter(1).unwrap().unwrap().bigint().unwrap(), 1234);
158+
}
159+
}

0 commit comments

Comments
 (0)