Skip to content

Commit 8482bf2

Browse files
authored
fix: refactor of DELETE removed schema sharding (#661)
Issue introduced in #657.
1 parent 2b431c7 commit 8482bf2

File tree

3 files changed

+361
-21
lines changed

3 files changed

+361
-21
lines changed

integration/python/test_asyncpg.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,3 +483,26 @@ async def test_schema_sharding_default():
483483
raise Exception("table shouldn't exist on shard 1")
484484
except Exception as e:
485485
assert "relation \"test_schema_sharding_default\" does not exist" == str(e)
486+
487+
488+
@pytest.mark.asyncio
489+
async def test_schema_sharding_search_path():
490+
import asyncio
491+
492+
async def run_test():
493+
conn = await schema_sharded_async()
494+
495+
for _ in range(10):
496+
for schema in ["shard_0", "shard_1"]:
497+
await conn.execute(f"SET search_path TO {schema}")
498+
499+
async with conn.transaction():
500+
await conn.execute(f"DROP SCHEMA IF EXISTS {schema} CASCADE")
501+
await conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
502+
await conn.execute("SET LOCAL statement_timeout TO '10s'")
503+
await conn.execute(f"CREATE TABLE {schema}.test(id BIGINT, created_at TIMESTAMPTZ DEFAULT NOW())")
504+
await conn.fetch(f"SELECT * FROM {schema}.test WHERE id = $1", 1)
505+
506+
await conn.close()
507+
508+
await asyncio.gather(*[run_test() for _ in range(10)])

pgdog/src/frontend/router/parser/query/select.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use crate::frontend::router::{
2-
parser::{cache::CachedAst, from_clause::FromClause, where_clause::TablesSource},
3-
sharding::SchemaSharder,
1+
use crate::frontend::router::parser::{
2+
cache::CachedAst, from_clause::FromClause, where_clause::TablesSource,
43
};
54

65
use super::*;
@@ -63,22 +62,6 @@ impl QueryParser {
6362

6463
let from_clause = TablesSource::from(FromClause::new(&stmt.from_clause));
6564

66-
// Schema-based sharding.
67-
let mut schema_sharder = SchemaSharder::default();
68-
for table in cached_ast.tables() {
69-
let schema = table.schema();
70-
schema_sharder.resolve(schema, &context.sharding_schema.schemas);
71-
}
72-
if let Some((shard, schema)) = schema_sharder.get() {
73-
if let Some(recorder) = self.recorder_mut() {
74-
recorder.record_entry(
75-
Some(shard.clone()),
76-
format!("SELECT matched schema {}", schema),
77-
);
78-
}
79-
shards.insert(shard);
80-
}
81-
8265
// Shard by vector in ORDER BY clause.
8366
for order in &order_by {
8467
if let Some((vector, column_name)) = order.vector() {

0 commit comments

Comments
 (0)