Skip to content

Commit 1a7df2e

Browse files
authored
Remove pgdog.unique_id() from Postgres. Add default sticky routing for omnisharded tables (#727)
### Description - Removed `pgdog.unqiue_id()` inside Postgres introduced in #725. That can lead to data loss when used with omnisharded and sharded tables alike. - Added `omnisharded_sticky` setting to make omnisharded `SELECT` queries use the same shard for each client, to avoid schema drift from the schema cache (e.g. Rails) ```toml [general] omnisharded_sticky = true ```
1 parent a597083 commit 1a7df2e

File tree

13 files changed

+178
-89
lines changed

13 files changed

+178
-89
lines changed

integration/rust/tests/sqlx/unique_id.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ async fn test_unique_id_uniqueness() {
6363
/// Test that pgdog.unique_id() PL/pgSQL function produces IDs with the same
6464
/// bit layout as Rust's unique_id.rs implementation.
6565
#[tokio::test]
66+
#[ignore]
6667
async fn test_unique_id_bit_layout_matches_rust() {
6768
// Constants from Rust unique_id.rs - these must match the SQL implementation
6869
const SEQUENCE_BITS: u64 = 12;

pgdog-config/src/general.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ pub struct General {
197197
/// System catalogs are omnisharded?
198198
#[serde(default = "General::default_system_catalogs_omnisharded")]
199199
pub system_catalogs_omnisharded: bool,
200+
/// Omnisharded queries are sticky by default.
201+
#[serde(default)]
202+
pub omnisharded_sticky: bool,
200203
}
201204

202205
impl Default for General {
@@ -266,6 +269,7 @@ impl Default for General {
266269
lsn_check_delay: Self::lsn_check_delay(),
267270
unique_id_min: u64::default(),
268271
system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(),
272+
omnisharded_sticky: bool::default(),
269273
}
270274
}
271275
}

pgdog/src/backend/databases.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,11 @@ pub(crate) fn new_pool(
445445
.get(&user.database)
446446
.cloned()
447447
.unwrap_or(vec![]);
448-
let sharded_tables = ShardedTables::new(sharded_tables, omnisharded_tables);
448+
let sharded_tables = ShardedTables::new(
449+
sharded_tables,
450+
omnisharded_tables,
451+
general.omnisharded_sticky,
452+
);
449453
let sharded_schemas = ShardedSchemas::new(sharded_schemas);
450454

451455
let cluster_config = ClusterConfig::new(

pgdog/src/backend/pool/cluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,7 @@ mod test {
666666
sticky_routing: true,
667667
},
668668
],
669+
config.config.general.omnisharded_sticky,
669670
),
670671
sharded_schemas: ShardedSchemas::new(vec![
671672
ShardedSchema {

pgdog/src/backend/replication/sharded_tables.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ struct Inner {
1919
/// across all tables, i.e., 3 tables with the same data type
2020
/// and list/range/hash function.
2121
common_mapping: Option<CommonMapping>,
22+
omnisharded_sticky: bool,
2223
}
2324

2425
#[derive(Debug)]
@@ -46,12 +47,16 @@ impl Default for ShardedTables {
4647

4748
impl From<&[ShardedTable]> for ShardedTables {
4849
fn from(value: &[ShardedTable]) -> Self {
49-
Self::new(value.to_vec(), vec![])
50+
Self::new(value.to_vec(), vec![], false)
5051
}
5152
}
5253

5354
impl ShardedTables {
54-
pub fn new(tables: Vec<ShardedTable>, omnisharded_tables: Vec<OmnishardedTable>) -> Self {
55+
pub fn new(
56+
tables: Vec<ShardedTable>,
57+
omnisharded_tables: Vec<OmnishardedTable>,
58+
omnisharded_sticky: bool,
59+
) -> Self {
5560
let mut common_mapping = HashSet::new();
5661
for table in &tables {
5762
common_mapping.insert((
@@ -79,6 +84,7 @@ impl ShardedTables {
7984
.map(|table| (table.name, table.sticky_routing))
8085
.collect(),
8186
common_mapping,
87+
omnisharded_sticky,
8288
}),
8389
}
8490
}
@@ -95,6 +101,10 @@ impl ShardedTables {
95101
self.omnishards().get(name).cloned()
96102
}
97103

104+
pub fn is_omnisharded_sticky_default(&self) -> bool {
105+
self.inner.omnisharded_sticky
106+
}
107+
98108
/// The deployment has only one sharded table.
99109
pub fn common_mapping(&self) -> &Option<CommonMapping> {
100110
&self.inner.common_mapping

pgdog/src/backend/schema/setup.sql

Lines changed: 79 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -322,82 +322,82 @@ $body$ LANGUAGE plpgsql;
322322
-- Bit allocation: 41 timestamp + 10 node + 12 sequence = 63 bits (keeps sign bit clear)
323323
-- The sequence stores (elapsed_ms << 12) | sequence_within_ms, allowing
324324
-- automatic reset of the sequence counter when the millisecond changes.
325-
CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq;
326-
327-
CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$
328-
DECLARE
329-
sequence_bits CONSTANT INTEGER := 12;
330-
node_bits CONSTANT INTEGER := 10;
331-
max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023
332-
max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095
333-
max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1;
334-
pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00
335-
node_shift CONSTANT INTEGER := sequence_bits; -- 12
336-
timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22
337-
338-
node_id INTEGER;
339-
now_ms BIGINT;
340-
elapsed BIGINT;
341-
min_combined BIGINT;
342-
combined_seq BIGINT;
343-
seq INTEGER;
344-
timestamp_part BIGINT;
345-
node_part BIGINT;
346-
base_id BIGINT;
347-
BEGIN
348-
-- Get node_id from pgdog.config.shard
349-
SELECT pgdog.config.shard INTO node_id FROM pgdog.config;
350-
351-
IF node_id IS NULL THEN
352-
RAISE EXCEPTION 'pgdog.config.shard not set';
353-
END IF;
354-
355-
IF node_id < 0 OR node_id > max_node_id THEN
356-
RAISE EXCEPTION 'shard must be between 0 and %', max_node_id;
357-
END IF;
358-
359-
LOOP
360-
-- Get next combined sequence value
361-
combined_seq := nextval('pgdog.unique_id_seq');
362-
363-
-- Get current time in milliseconds since Unix epoch
364-
now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
365-
elapsed := now_ms - pgdog_epoch;
366-
367-
IF elapsed < 0 THEN
368-
RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)';
369-
END IF;
370-
371-
-- Minimum valid combined value for current millisecond
372-
min_combined := elapsed << 12;
373-
374-
-- If sequence is at or ahead of current time, we're good
375-
IF combined_seq >= min_combined THEN
376-
EXIT;
377-
END IF;
378-
379-
-- Sequence is behind current time, advance it
380-
PERFORM setval('pgdog.unique_id_seq', min_combined, false);
381-
END LOOP;
382-
383-
-- Decompose the combined sequence value
384-
seq := (combined_seq & max_sequence)::integer;
385-
elapsed := combined_seq >> 12;
386-
387-
IF elapsed > max_timestamp THEN
388-
RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp;
389-
END IF;
390-
391-
-- Compose the ID: timestamp | node | sequence
392-
timestamp_part := elapsed << timestamp_shift;
393-
node_part := node_id::bigint << node_shift;
394-
base_id := timestamp_part | node_part | seq;
395-
396-
RETURN base_id + id_offset;
397-
END;
398-
$body$ LANGUAGE plpgsql;
399-
400-
GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC;
401-
402-
-- Allow functions to be executed by anyone.
403-
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC;
325+
-- CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq;
326+
327+
-- CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$
328+
-- DECLARE
329+
-- sequence_bits CONSTANT INTEGER := 12;
330+
-- node_bits CONSTANT INTEGER := 10;
331+
-- max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023
332+
-- max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095
333+
-- max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1;
334+
-- pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00
335+
-- node_shift CONSTANT INTEGER := sequence_bits; -- 12
336+
-- timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22
337+
338+
-- node_id INTEGER;
339+
-- now_ms BIGINT;
340+
-- elapsed BIGINT;
341+
-- min_combined BIGINT;
342+
-- combined_seq BIGINT;
343+
-- seq INTEGER;
344+
-- timestamp_part BIGINT;
345+
-- node_part BIGINT;
346+
-- base_id BIGINT;
347+
-- BEGIN
348+
-- -- Get node_id from pgdog.config.shard
349+
-- SELECT pgdog.config.shard INTO node_id FROM pgdog.config;
350+
351+
-- IF node_id IS NULL THEN
352+
-- RAISE EXCEPTION 'pgdog.config.shard not set';
353+
-- END IF;
354+
355+
-- IF node_id < 0 OR node_id > max_node_id THEN
356+
-- RAISE EXCEPTION 'shard must be between 0 and %', max_node_id;
357+
-- END IF;
358+
359+
-- LOOP
360+
-- -- Get next combined sequence value
361+
-- combined_seq := nextval('pgdog.unique_id_seq');
362+
363+
-- -- Get current time in milliseconds since Unix epoch
364+
-- now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
365+
-- elapsed := now_ms - pgdog_epoch;
366+
367+
-- IF elapsed < 0 THEN
368+
-- RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)';
369+
-- END IF;
370+
371+
-- -- Minimum valid combined value for current millisecond
372+
-- min_combined := elapsed << 12;
373+
374+
-- -- If sequence is at or ahead of current time, we're good
375+
-- IF combined_seq >= min_combined THEN
376+
-- EXIT;
377+
-- END IF;
378+
379+
-- -- Sequence is behind current time, advance it
380+
-- PERFORM setval('pgdog.unique_id_seq', min_combined, false);
381+
-- END LOOP;
382+
383+
-- -- Decompose the combined sequence value
384+
-- seq := (combined_seq & max_sequence)::integer;
385+
-- elapsed := combined_seq >> 12;
386+
387+
-- IF elapsed > max_timestamp THEN
388+
-- RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp;
389+
-- END IF;
390+
391+
-- -- Compose the ID: timestamp | node | sequence
392+
-- timestamp_part := elapsed << timestamp_shift;
393+
-- node_part := node_id::bigint << node_shift;
394+
-- base_id := timestamp_part | node_part | seq;
395+
396+
-- RETURN base_id + id_offset;
397+
-- END;
398+
-- $body$ LANGUAGE plpgsql;
399+
400+
-- GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC;
401+
402+
-- -- Allow functions to be executed by anyone.
403+
-- GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC;

pgdog/src/frontend/router/parser/comment.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ mod tests {
159159

160160
let schema = ShardingSchema {
161161
shards: 2,
162-
tables: ShardedTables::new(vec![], vec![]),
162+
tables: ShardedTables::new(vec![], vec![], false),
163163
..Default::default()
164164
};
165165

@@ -174,7 +174,7 @@ mod tests {
174174

175175
let schema = ShardingSchema {
176176
shards: 3,
177-
tables: ShardedTables::new(vec![], vec![]),
177+
tables: ShardedTables::new(vec![], vec![], false),
178178
..Default::default()
179179
};
180180

@@ -190,7 +190,7 @@ mod tests {
190190

191191
let schema = ShardingSchema {
192192
shards: 2,
193-
tables: ShardedTables::new(vec![], vec![]),
193+
tables: ShardedTables::new(vec![], vec![], false),
194194
..Default::default()
195195
};
196196

@@ -205,7 +205,7 @@ mod tests {
205205

206206
let schema = ShardingSchema {
207207
shards: 2,
208-
tables: ShardedTables::new(vec![], vec![]),
208+
tables: ShardedTables::new(vec![], vec![], false),
209209
..Default::default()
210210
};
211211

@@ -220,7 +220,7 @@ mod tests {
220220

221221
let schema = ShardingSchema {
222222
shards: 2,
223-
tables: ShardedTables::new(vec![], vec![]),
223+
tables: ShardedTables::new(vec![], vec![], false),
224224
..Default::default()
225225
};
226226

@@ -244,7 +244,7 @@ mod tests {
244244

245245
let schema = ShardingSchema {
246246
shards: 2,
247-
tables: ShardedTables::new(vec![], vec![]),
247+
tables: ShardedTables::new(vec![], vec![], false),
248248
schemas: ShardedSchemas::new(vec![sales_schema]),
249249
..Default::default()
250250
};

pgdog/src/frontend/router/parser/insert.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ mod test {
252252
},
253253
],
254254
vec![],
255+
false,
255256
),
256257
..Default::default()
257258
};
@@ -343,6 +344,7 @@ mod test {
343344
..Default::default()
344345
}],
345346
vec![],
347+
false,
346348
),
347349
..Default::default()
348350
};

pgdog/src/frontend/router/parser/query/select.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ impl QueryParser {
145145
== Some(true)
146146
});
147147

148-
let (rr_index, explain) = if sticky {
148+
let (rr_index, explain) = if sticky
149+
|| context
150+
.sharding_schema
151+
.tables()
152+
.is_omnisharded_sticky_default()
153+
{
149154
(context.router_context.sticky.omni_index, "sticky")
150155
} else {
151156
(round_robin::next(), "round robin")

0 commit comments

Comments
 (0)