diff --git a/Cargo.lock b/Cargo.lock index 3349c4ce5..77e82fa61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,6 +114,15 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "ar_archive_writer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c269894b6fe5e9d7ada0cf69b5bf847ff35bc25fc271f08e1d080fce80339a" +dependencies = [ + "object 0.32.2", +] + [[package]] name = "arc-swap" version = "1.7.1" @@ -200,7 +209,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide", - "object", + "object 0.36.7", "rustc-demangle", "windows-targets 0.52.6", ] @@ -433,10 +442,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.22" +version = "1.2.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" +checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -1037,6 +1047,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" + [[package]] name = "finl_unicode" version = "1.3.0" @@ -1748,15 +1764,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.15" @@ -2173,6 +2180,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "object" version = "0.36.7" @@ -2365,7 +2381,7 @@ dependencies = [ [[package]] name = "pg_query" version = "6.1.1" -source = "git+https://github.com/pgdogdev/pg_query.rs.git#85a65482cd112f9702ef895fc793e9ae3d102d7f" +source = "git+https://github.com/pgdogdev/pg_query.rs.git?rev=55cd0ee0adb4f62edfb9c2afe7fbad526de25b0c#55cd0ee0adb4f62edfb9c2afe7fbad526de25b0c" dependencies = [ "bindgen 0.66.1", "cc", @@ -2376,6 +2392,7 @@ dependencies = [ "prost-build", "serde", "serde_json", + "stacker", "thiserror 1.0.69", ] @@ -2410,7 +2427,7 @@ dependencies = [ "pgdog-plugin", "pgdog-vector", "pin-project", - "rand 0.8.5", + "rand 0.9.2", "ratatui", "regex", "rmp-serde", @@ -2441,7 +2458,7 @@ name = "pgdog-config" version = "0.1.0" dependencies = [ "pgdog-vector", - "rand 0.8.5", + "rand 0.9.2", "serde", "serde_json", "tempfile", @@ -2715,7 +2732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -2735,7 +2752,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.101", @@ -2750,6 +2767,16 @@ dependencies = [ "prost", ] +[[package]] +name = "psm" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d11f2fedc3b7dafdc2851bc52f277377c5473d378859be234bc7ebb593144d01" +dependencies = [ + "ar_archive_writer", + "cc", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -3831,6 +3858,20 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stacker" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1f8b29fb42aafcea4edeeb6b2f2d7ecd0d969c48b4cf0d2e64aafc471dd6e59" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "windows-sys 0.52.0", + "windows-sys 0.59.0", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 626fa9a38..c4d3b6477 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ resolver = "2" # [patch.crates-io] # tokio = { path = "../tokio/tokio" } +# [patch."https://github.com/pgdogdev/pg_query.rs.git"] +# pg_query = { path = "../pg_query.rs" } + [profile.release] codegen-units = 1 lto = true diff --git a/integration/pgdog.toml b/integration/pgdog.toml index e5774a472..01d789e2c 100644 --- a/integration/pgdog.toml +++ b/integration/pgdog.toml @@ -20,6 +20,7 @@ two_phase_commit = false healthcheck_port = 8080 tls_certificate = "integration/tls/cert.pem" tls_private_key = "integration/tls/key.pem" +query_parser_engine = "pg_query_raw" [memory] net_buffer = 8096 diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index dc7891feb..44d369347 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -7,7 +7,7 @@ use tracing::{info, warn}; use crate::sharding::ShardedSchema; use crate::{ EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements, - QueryParserLevel, ReadWriteSplit, RewriteMode, Role, + QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role, }; use super::database::Database; @@ -426,6 +426,15 @@ impl Config { warn!(r#""query_parser_enabled" is deprecated, use "query_parser" = "on" instead"#); self.general.query_parser = QueryParserLevel::On; } + + if self.general.query_parser_engine == QueryParserEngine::PgQueryRaw { + if self.memory.stack_size < 32 * 1024 * 1024 { + self.memory.stack_size = 32 * 1024 * 1024; + warn!( + r#""pg_query_raw" parser engine requires a large thread stack, setting it to 32MiB for each Tokio worker"# + ); + } + } } /// Multi-tenancy is enabled. diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index ac2399e2d..dff1e092b 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use std::time::Duration; use crate::pooling::ConnectionRecovery; -use crate::QueryParserLevel; +use crate::{QueryParserEngine, QueryParserLevel}; use super::auth::{AuthType, PassthoughAuth}; use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy}; @@ -100,6 +100,9 @@ pub struct General { /// Query parser. #[serde(default)] pub query_parser: QueryParserLevel, + /// Query parser engine. + #[serde(default)] + pub query_parser_engine: QueryParserEngine, /// Limit on the number of prepared statements in the server cache. #[serde(default = "General::prepared_statements_limit")] pub prepared_statements_limit: usize, @@ -227,6 +230,7 @@ impl Default for General { prepared_statements: Self::prepared_statements(), query_parser_enabled: Self::query_parser_enabled(), query_parser: QueryParserLevel::default(), + query_parser_engine: QueryParserEngine::default(), prepared_statements_limit: Self::prepared_statements_limit(), query_cache_limit: Self::query_cache_limit(), passthrough_auth: Self::default_passthrough_auth(), diff --git a/pgdog-config/src/sharding.rs b/pgdog-config/src/sharding.rs index 257f4a4e6..4dd177e40 100644 --- a/pgdog-config/src/sharding.rs +++ b/pgdog-config/src/sharding.rs @@ -306,3 +306,11 @@ pub enum QueryParserLevel { Auto, Off, } + +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum QueryParserEngine { + #[default] + PgQueryProtobuf, + PgQueryRaw, +} diff --git a/pgdog-config/src/util.rs b/pgdog-config/src/util.rs index db20368a5..a7f3c61ec 100644 --- a/pgdog-config/src/util.rs +++ b/pgdog-config/src/util.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use rand::{distributions::Alphanumeric, Rng}; +use rand::{distr::Alphanumeric, Rng}; pub fn human_duration_optional(duration: Option) -> String { if let Some(duration) = duration { @@ -46,7 +46,7 @@ pub fn human_duration(duration: Duration) -> String { /// Generate a random string of length n. pub fn random_string(n: usize) -> String { - rand::thread_rng() + rand::rng() .sample_iter(&Alphanumeric) .take(n) .map(char::from) diff --git a/pgdog-plugin/Cargo.toml b/pgdog-plugin/Cargo.toml index 41ec09861..638242bc8 100644 --- a/pgdog-plugin/Cargo.toml +++ b/pgdog-plugin/Cargo.toml @@ -17,7 +17,7 @@ crate-type = ["rlib", "cdylib"] libloading = "0.8" libc = "0.2" tracing = "0.1" -pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git" } +pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "55cd0ee0adb4f62edfb9c2afe7fbad526de25b0c" } pgdog-macros = { path = "../pgdog-macros", version = "0.1.1" } toml = "0.9" diff --git a/pgdog/Cargo.lock b/pgdog/Cargo.lock index 35650a353..4bdb3d17d 100644 --- a/pgdog/Cargo.lock +++ b/pgdog/Cargo.lock @@ -546,7 +546,7 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 8e448353c..47560b081 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -28,7 +28,7 @@ clap = { version = "4", features = ["derive"] } serde = { version = "1", features = ["derive"] } serde_json = "1" async-trait = "0.1" -rand = "0.8" +rand = "0.9.2" once_cell = "1" tokio-rustls = "0.26" rustls-native-certs = "0.8" @@ -43,7 +43,7 @@ base64 = "0.22" md5 = "0.7" futures = "0.3" csv-core = "0.1" -pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git" } +pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "55cd0ee0adb4f62edfb9c2afe7fbad526de25b0c" } regex = "1" uuid = { version = "1", features = ["v4", "serde"] } url = "2" diff --git a/pgdog/src/auth/md5.rs b/pgdog/src/auth/md5.rs index 51a917a0f..7c95ddf66 100644 --- a/pgdog/src/auth/md5.rs +++ b/pgdog/src/auth/md5.rs @@ -22,7 +22,7 @@ impl<'a> Client<'a> { Self { password, user, - salt: rand::thread_rng().gen(), + salt: rand::rng().random(), } } diff --git a/pgdog/src/auth/scram/server.rs b/pgdog/src/auth/scram/server.rs index 42d43713a..34fdeabe8 100644 --- a/pgdog/src/auth/scram/server.rs +++ b/pgdog/src/auth/scram/server.rs @@ -52,7 +52,7 @@ impl AuthenticationProvider for UserPassword { fn get_password_for(&self, _user: &str) -> Option { // TODO: This is slow. We should move it to its own thread pool. let iterations = 4096; - let salt = rand::thread_rng().gen::<[u8; 16]>().to_vec(); + let salt = rand::rng().random::<[u8; 16]>().to_vec(); let hash = hash_password(&self.password, NonZeroU32::new(iterations).unwrap(), &salt); Some(PasswordInfo::new(hash.to_vec(), iterations as u16, salt)) } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 7350bbc75..8fb2e2b1f 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -1,7 +1,7 @@ //! A collection of replicas and a primary. use parking_lot::{Mutex, RwLock}; -use pgdog_config::{PreparedStatements, QueryParserLevel, Rewrite, RewriteMode}; +use pgdog_config::{PreparedStatements, QueryParserEngine, QueryParserLevel, Rewrite, RewriteMode}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -64,6 +64,7 @@ pub struct Cluster { pub_sub_channel_size: usize, query_parser: QueryParserLevel, connection_recovery: ConnectionRecovery, + query_parser_engine: QueryParserEngine, } /// Sharding configuration from the cluster. @@ -77,6 +78,8 @@ pub struct ShardingSchema { pub schemas: ShardedSchemas, /// Rewrite config. pub rewrite: Rewrite, + /// Query parser engine. + pub query_parser_engine: QueryParserEngine, } impl ShardingSchema { @@ -131,6 +134,7 @@ pub struct ClusterConfig<'a> { pub expanded_explain: bool, pub pub_sub_channel_size: usize, pub query_parser: QueryParserLevel, + pub query_parser_engine: QueryParserEngine, pub connection_recovery: ConnectionRecovery, pub lsn_check_interval: Duration, } @@ -177,6 +181,7 @@ impl<'a> ClusterConfig<'a> { expanded_explain: general.expanded_explain, pub_sub_channel_size: general.pub_sub_channel_size, query_parser: general.query_parser, + query_parser_engine: general.query_parser_engine, connection_recovery: general.connection_recovery, lsn_check_interval: Duration::from_millis(general.lsn_check_interval), } @@ -211,6 +216,7 @@ impl Cluster { query_parser, connection_recovery, lsn_check_interval, + query_parser_engine, } = config; let identifier = Arc::new(DatabaseUser { @@ -256,6 +262,7 @@ impl Cluster { pub_sub_channel_size, query_parser, connection_recovery, + query_parser_engine, } } @@ -449,6 +456,7 @@ impl Cluster { tables: self.sharded_tables.clone(), schemas: self.sharded_schemas.clone(), rewrite: self.rewrite.clone(), + query_parser_engine: self.query_parser_engine, } } diff --git a/pgdog/src/backend/pool/connection/mirror/handler.rs b/pgdog/src/backend/pool/connection/mirror/handler.rs index 92c2e406c..6b53ed0cd 100644 --- a/pgdog/src/backend/pool/connection/mirror/handler.rs +++ b/pgdog/src/backend/pool/connection/mirror/handler.rs @@ -68,7 +68,7 @@ impl MirrorHandler { } MirrorHandlerState::Idle => { let roll = if self.exposure < 1.0 { - thread_rng().gen_range(0.0..1.0) + rng().random_range(0.0..1.0) } else { 0.99 }; diff --git a/pgdog/src/backend/pool/connection/mirror/mod.rs b/pgdog/src/backend/pool/connection/mirror/mod.rs index 644f9df92..a162bbe1a 100644 --- a/pgdog/src/backend/pool/connection/mirror/mod.rs +++ b/pgdog/src/backend/pool/connection/mirror/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; -use rand::{thread_rng, Rng}; +use rand::{rng, Rng}; use tokio::select; use tokio::time::{sleep, Instant}; use tokio::{spawn, sync::mpsc::*}; diff --git a/pgdog/src/backend/pool/lb/mod.rs b/pgdog/src/backend/pool/lb/mod.rs index 761e77696..fabd54fa4 100644 --- a/pgdog/src/backend/pool/lb/mod.rs +++ b/pgdog/src/backend/pool/lb/mod.rs @@ -276,7 +276,7 @@ impl LoadBalancer { } match self.lb_strategy { - Random => candidates.shuffle(&mut rand::thread_rng()), + Random => candidates.shuffle(&mut rand::rng()), RoundRobin => { let first = self.round_robin.fetch_add(1, Ordering::Relaxed) % candidates.len(); let mut reshuffled = vec![]; diff --git a/pgdog/src/backend/pool/test/mod.rs b/pgdog/src/backend/pool/test/mod.rs index 08f553f56..61f48db13 100644 --- a/pgdog/src/backend/pool/test/mod.rs +++ b/pgdog/src/backend/pool/test/mod.rs @@ -98,7 +98,7 @@ async fn test_concurrency() { let pool = pool.clone(); tracker.spawn(async move { let _conn = pool.get(&Request::default()).await.unwrap(); - let duration = rand::thread_rng().gen_range(0..10); + let duration = rand::rng().random_range(0..10); sleep(Duration::from_millis(duration)).await; }); } @@ -131,7 +131,7 @@ async fn test_concurrency_with_gas() { let pool = pool.clone(); tracker.spawn(async move { let _conn = pool.get(&Request::default()).await.unwrap(); - let duration = rand::thread_rng().gen_range(0..10); + let duration = rand::rng().random_range(0..10); assert!(pool.lock().checked_out() > 0); assert!(pool.lock().total() <= 10); sleep(Duration::from_millis(duration)).await; diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 13fe85e4b..d20aff42d 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::time::Duration; +use pgdog_config::QueryParserEngine; use tokio::{select, spawn}; use tracing::{debug, error, info}; @@ -27,15 +28,22 @@ pub struct Publisher { tables: HashMap>, /// Replication slots. slots: HashMap, + /// Query parser engine. + query_parser_engine: QueryParserEngine, } impl Publisher { - pub fn new(cluster: &Cluster, publication: &str) -> Self { + pub fn new( + cluster: &Cluster, + publication: &str, + query_parser_engine: QueryParserEngine, + ) -> Self { Self { cluster: cluster.clone(), publication: publication.to_string(), tables: HashMap::new(), slots: HashMap::new(), + query_parser_engine, } } @@ -44,7 +52,8 @@ impl Publisher { for (number, shard) in self.cluster.shards().iter().enumerate() { // Load tables from publication. let mut primary = shard.primary(&Request::default()).await?; - let tables = Table::load(&self.publication, &mut primary).await?; + let tables = + Table::load(&self.publication, &mut primary, self.query_parser_engine).await?; self.tables.insert(number, tables); } @@ -103,7 +112,7 @@ impl Publisher { .get(&number) .ok_or(Error::NoReplicationTables(number))?; // Handles the logical replication stream messages. - let mut stream = StreamSubscriber::new(dest, tables); + let mut stream = StreamSubscriber::new(dest, tables, self.query_parser_engine); // Take ownership of the slot for replication. let mut slot = self @@ -182,7 +191,8 @@ impl Publisher { for (number, shard) in self.cluster.shards().iter().enumerate() { let mut primary = shard.primary(&Request::default()).await?; - let tables = Table::load(&self.publication, &mut primary).await?; + let tables = + Table::load(&self.publication, &mut primary, self.query_parser_engine).await?; let include_primary = !shard.has_replicas(); let replicas = shard diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index 4dd973d47..c109b43e5 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -2,6 +2,8 @@ use std::time::Duration; +use pgdog_config::QueryParserEngine; + use crate::backend::pool::Address; use crate::backend::replication::publisher::progress::Progress; use crate::backend::replication::publisher::Lsn; @@ -26,10 +28,16 @@ pub struct Table { pub columns: Vec, /// Table data as of this LSN. pub lsn: Lsn, + /// Query parser engine. + pub query_parser_engine: QueryParserEngine, } impl Table { - pub async fn load(publication: &str, server: &mut Server) -> Result, Error> { + pub async fn load( + publication: &str, + server: &mut Server, + query_parser_engine: QueryParserEngine, + ) -> Result, Error> { let tables = PublicationTable::load(publication, server).await?; let mut results = vec![]; @@ -43,6 +51,7 @@ impl Table { identity, columns, lsn: Lsn::default(), + query_parser_engine, }); } @@ -181,7 +190,7 @@ impl Table { // Create new standalone connection for the copy. // let mut server = Server::connect(source, ServerOptions::new_replication()).await?; - let mut copy_sub = CopySubscriber::new(copy.statement(), dest)?; + let mut copy_sub = CopySubscriber::new(copy.statement(), dest, self.query_parser_engine)?; copy_sub.connect().await?; // Create sync slot. @@ -232,6 +241,7 @@ impl Table { mod test { use crate::backend::replication::logical::publisher::test::setup_publication; + use crate::config::config; use super::*; @@ -240,9 +250,13 @@ mod test { crate::logger(); let mut publication = setup_publication().await; - let tables = Table::load("publication_test", &mut publication.server) - .await - .unwrap(); + let tables = Table::load( + "publication_test", + &mut publication.server, + config().config.general.query_parser_engine, + ) + .await + .unwrap(); assert_eq!(tables.len(), 2); diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 2a8331533..63e1b498b 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -1,7 +1,8 @@ //! Shard COPY stream from one source //! between N shards. -use pg_query::NodeEnum; +use pg_query::{parse_raw, NodeEnum}; +use pgdog_config::QueryParserEngine; use tracing::debug; use crate::{ @@ -34,8 +35,17 @@ impl CopySubscriber { /// 1. What kind of encoding we use. /// 2. Which column is used for sharding. /// - pub fn new(copy_stmt: &CopyStatement, cluster: &Cluster) -> Result { - let stmt = pg_query::parse(copy_stmt.clone().copy_in().as_str())?; + pub fn new( + copy_stmt: &CopyStatement, + cluster: &Cluster, + query_parser_engine: QueryParserEngine, + ) -> Result { + let stmt = match query_parser_engine { + QueryParserEngine::PgQueryProtobuf => { + pg_query::parse(copy_stmt.clone().copy_in().as_str()) + } + QueryParserEngine::PgQueryRaw => parse_raw(copy_stmt.clone().copy_in().as_str()), + }?; let stmt = stmt .protobuf .stmts @@ -228,7 +238,9 @@ mod test { .await .unwrap(); - let mut subscriber = CopySubscriber::new(©, &cluster).unwrap(); + let mut subscriber = + CopySubscriber::new(©, &cluster, config().config.general.query_parser_engine) + .unwrap(); subscriber.start_copy().await.unwrap(); let header = CopyData::new(&Header::new().to_bytes().unwrap()); diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index d6f011b76..5eb11a039 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -11,9 +11,11 @@ use std::{ use once_cell::sync::Lazy; use pg_query::{ + parse_raw, protobuf::{InsertStmt, ParseResult}, NodeEnum, }; +use pgdog_config::QueryParserEngine; use tracing::{debug, trace}; use super::super::{publisher::Table, Error}; @@ -74,8 +76,12 @@ impl Statement { &self.parse } - fn new(query: &str) -> Result { - let ast = pg_query::parse(query)?.protobuf; + fn new(query: &str, query_parser_engine: QueryParserEngine) -> Result { + let ast = match query_parser_engine { + QueryParserEngine::PgQueryProtobuf => pg_query::parse(query), + QueryParserEngine::PgQueryRaw => parse_raw(query), + }? + .protobuf; let name = statement_name(); Ok(Self { ast, @@ -138,10 +144,17 @@ pub struct StreamSubscriber { // Bytes sharded bytes_sharded: usize, + + // Query parser engine. + query_parser_engine: QueryParserEngine, } impl StreamSubscriber { - pub fn new(cluster: &Cluster, tables: &[Table]) -> Self { + pub fn new( + cluster: &Cluster, + tables: &[Table], + query_parser_engine: QueryParserEngine, + ) -> Self { let cluster = cluster.logical_stream(); Self { cluster, @@ -165,6 +178,7 @@ impl StreamSubscriber { lsn: 0, // Unknown, bytes_sharded: 0, lsn_changed: true, + query_parser_engine, } } @@ -396,10 +410,10 @@ impl StreamSubscriber { return Ok(()); } - let insert = Statement::new(&table.insert(false))?; - let upsert = Statement::new(&table.insert(true))?; - let update = Statement::new(&table.update())?; - let delete = Statement::new(&table.delete())?; + let insert = Statement::new(&table.insert(false), self.query_parser_engine)?; + let upsert = Statement::new(&table.insert(true), self.query_parser_engine)?; + let update = Statement::new(&table.update(), self.query_parser_engine)?; + let delete = Statement::new(&table.delete(), self.query_parser_engine)?; for server in &mut self.connections { for stmt in &[&insert, &upsert, &update, &delete] { diff --git a/pgdog/src/backend/schema/sync/pg_dump.rs b/pgdog/src/backend/schema/sync/pg_dump.rs index 5dc99346c..fd94006cf 100644 --- a/pgdog/src/backend/schema/sync/pg_dump.rs +++ b/pgdog/src/backend/schema/sync/pg_dump.rs @@ -7,6 +7,7 @@ use pg_query::{ protobuf::{AlterTableType, ConstrType, ObjectType, ParseResult}, NodeEnum, }; +use pgdog_config::QueryParserEngine; use regex::Regex; use tracing::{info, trace, warn}; @@ -17,6 +18,20 @@ use crate::{ frontend::router::parser::{sequence::Sequence, Column, Table}, }; +fn deparse_node(node: NodeEnum) -> Result { + match config().config.general.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => node.deparse(), + QueryParserEngine::PgQueryRaw => node.deparse_raw(), + } +} + +fn parse(query: &str) -> Result { + match config().config.general.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => pg_query::parse(query), + QueryParserEngine::PgQueryRaw => pg_query::parse_raw(query), + } +} + use tokio::process::Command; #[derive(Debug, Clone)] @@ -117,7 +132,7 @@ impl PgDump { let cleaned = Self::clean(&original); trace!("[pg_dump (clean)] {}", cleaned); - let stmts = pg_query::parse(&cleaned)?.protobuf; + let stmts = parse(&cleaned)?.protobuf; Ok(PgDumpOutput { stmts, @@ -222,7 +237,7 @@ impl PgDumpOutput { let sql = { let mut stmt = stmt.clone(); stmt.if_not_exists = true; - NodeEnum::CreateStmt(stmt).deparse()? + deparse_node(NodeEnum::CreateStmt(stmt))? }; if state == SyncState::PreData { // CREATE TABLE is always good. @@ -235,7 +250,7 @@ impl PgDumpOutput { NodeEnum::CreateSeqStmt(stmt) => { let mut stmt = stmt.clone(); stmt.if_not_exists = true; - let sql = NodeEnum::CreateSeqStmt(stmt).deparse()?; + let sql = deparse_node(NodeEnum::CreateSeqStmt(stmt))?; if state == SyncState::PreData { // Bring sequences over. result.push(sql.into()); @@ -245,7 +260,7 @@ impl PgDumpOutput { NodeEnum::CreateExtensionStmt(stmt) => { let mut stmt = stmt.clone(); stmt.if_not_exists = true; - let sql = NodeEnum::CreateExtensionStmt(stmt).deparse()?; + let sql = deparse_node(NodeEnum::CreateExtensionStmt(stmt))?; if state == SyncState::PreData { result.push(sql.into()); } @@ -254,7 +269,7 @@ impl PgDumpOutput { NodeEnum::CreateSchemaStmt(stmt) => { let mut stmt = stmt.clone(); stmt.if_not_exists = true; - let sql = NodeEnum::CreateSchemaStmt(stmt).deparse()?; + let sql = deparse_node(NodeEnum::CreateSchemaStmt(stmt))?; if state == SyncState::PreData { result.push(sql.into()); } @@ -399,7 +414,7 @@ impl PgDumpOutput { stmt.replace = true; if state == SyncState::PreData { - result.push(NodeEnum::CreateTrigStmt(stmt).deparse()?.into()); + result.push(deparse_node(NodeEnum::CreateTrigStmt(stmt))?.into()); } } @@ -446,7 +461,7 @@ impl PgDumpOutput { .map(|relation| relation.inh) // ONLY used for partitioned tables, which can't be created concurrently. .unwrap_or(false); stmt.if_not_exists = true; - NodeEnum::IndexStmt(stmt).deparse()? + deparse_node(NodeEnum::IndexStmt(stmt))? }; let table = @@ -466,7 +481,7 @@ impl PgDumpOutput { if state == SyncState::PreData { result.push(Statement::Other { - sql: NodeEnum::ViewStmt(stmt).deparse()?, + sql: deparse_node(NodeEnum::ViewStmt(stmt))?, idempotent: true, }); } @@ -478,7 +493,7 @@ impl PgDumpOutput { if state == SyncState::PreData { result.push(Statement::Other { - sql: NodeEnum::CreateTableAsStmt(stmt).deparse()?, + sql: deparse_node(NodeEnum::CreateTableAsStmt(stmt))?, idempotent: true, }); } @@ -490,7 +505,7 @@ impl PgDumpOutput { if state == SyncState::PreData { result.push(Statement::Other { - sql: NodeEnum::CreateFunctionStmt(stmt).deparse()?, + sql: deparse_node(NodeEnum::CreateFunctionStmt(stmt))?, idempotent: true, }); } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 1a0fde8a9..4ed0d5a1f 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -2236,16 +2236,16 @@ pub mod test { #[tokio::test] async fn test_drain_chaos() { use crate::net::bind::Parameter; - use rand::{thread_rng, Rng}; + use rand::Rng; let mut server = test_server().await; - let mut rng = thread_rng(); + let mut rng = rand::rng(); for iteration in 0..1000 { let name = format!("chaos_test_{}", iteration); - let use_sync = rng.gen_bool(0.5); + let use_sync = rng.random_bool(0.5); - if rng.gen_bool(0.2) { + if rng.random_bool(0.2) { let bad_parse = Parse::named(&name, "SELECT invalid syntax"); server .send( @@ -2260,7 +2260,7 @@ pub mod test { .await .unwrap(); - let messages_to_read = rng.gen_range(0..=1); + let messages_to_read = rng.random_range(0..=1); for _ in 0..messages_to_read { let _ = server.read().await; } @@ -2304,7 +2304,7 @@ pub mod test { } else { vec!['1', '2', 'D', 'C'] }; - let messages_to_read = rng.gen_range(0..expected_messages.len()); + let messages_to_read = rng.random_range(0..expected_messages.len()); for i in 0..messages_to_read { let msg = server.read().await.unwrap(); diff --git a/pgdog/src/cli.rs b/pgdog/src/cli.rs index 86801523d..11212c43c 100644 --- a/pgdog/src/cli.rs +++ b/pgdog/src/cli.rs @@ -10,7 +10,7 @@ use tracing::{error, info}; use crate::backend::schema::sync::config::ShardConfig; use crate::backend::schema::sync::pg_dump::{PgDump, SyncState}; use crate::backend::{databases::databases, replication::logical::Publisher}; -use crate::config::{Config, Users}; +use crate::config::{config, Config, Users}; use crate::frontend::router::cli::RouterCli; /// PgDog is a PostgreSQL pooler, proxy, load balancer and query router. @@ -252,7 +252,11 @@ pub async fn data_sync(commands: Commands) -> Result<(), Box(); + let val = rng().random::(); (format!("'val_{}'", val), val) }) .map(|tuple| format!("({}, {})", tuple.1, tuple.0)) diff --git a/pgdog/src/frontend/client/query_engine/multi_step/test/update.rs b/pgdog/src/frontend/client/query_engine/multi_step/test/update.rs index 80adcca46..89765a662 100644 --- a/pgdog/src/frontend/client/query_engine/multi_step/test/update.rs +++ b/pgdog/src/frontend/client/query_engine/multi_step/test/update.rs @@ -1,4 +1,4 @@ -use rand::{thread_rng, Rng}; +use rand::{rng, Rng}; use crate::{ expect_message, @@ -171,7 +171,7 @@ async fn test_row_same_shard_no_transaction() { #[tokio::test] async fn test_no_rows_updated() { let mut client = TestClient::new_rewrites(Parameters::default()).await; - let id = thread_rng().gen::(); + let id = rng().random::(); // Transaction not required because // it'll check for existing row first (on the same shard). diff --git a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs index ae79e1b3c..f6f9042a6 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs @@ -1,4 +1,4 @@ -use rand::{thread_rng, Rng}; +use rand::{rng, Rng}; use std::fmt::Display; #[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] @@ -8,7 +8,7 @@ impl TwoPcTransaction { pub(crate) fn new() -> Self { // Transactions have random identifiers, // so multiple instances of PgDog don't create an identical transaction. - Self(thread_rng().gen()) + Self(rng().random_range(0..usize::MAX)) } } diff --git a/pgdog/src/frontend/client/sticky.rs b/pgdog/src/frontend/client/sticky.rs index 9c165ecc3..1b274e7a0 100644 --- a/pgdog/src/frontend/client/sticky.rs +++ b/pgdog/src/frontend/client/sticky.rs @@ -2,7 +2,7 @@ //! default routing behavior determined by the query parser. use pgdog_config::Role; -use rand::{thread_rng, Rng}; +use rand::{rng, Rng}; use crate::net::{parameter::ParameterValue, Parameters}; @@ -49,7 +49,7 @@ impl Sticky { }); Self { - omni_index: thread_rng().gen_range(1..usize::MAX), + omni_index: rng().random_range(1..usize::MAX), role, } } diff --git a/pgdog/src/frontend/client/test/test_client.rs b/pgdog/src/frontend/client/test/test_client.rs index 159b94dc1..c828e5053 100644 --- a/pgdog/src/frontend/client/test/test_client.rs +++ b/pgdog/src/frontend/client/test/test_client.rs @@ -2,7 +2,7 @@ use std::{fmt::Debug, ops::Deref}; use bytes::{BufMut, Bytes, BytesMut}; use pgdog_config::RewriteMode; -use rand::{thread_rng, Rng}; +use rand::{rng, Rng}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, @@ -199,7 +199,7 @@ impl TestClient { let cluster = self.engine.backend().cluster().unwrap().clone(); loop { - let id: i64 = thread_rng().gen(); + let id: i64 = rng().random(); let calc = ContextBuilder::new(cluster.sharded_tables().first().unwrap()) .data(id) .shards(cluster.shards().len()) diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index c141a5531..d04a8546c 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -1,4 +1,5 @@ -use pg_query::{parse, protobuf::ObjectType, NodeEnum, NodeRef, ParseResult}; +use pg_query::{parse, parse_raw, protobuf::ObjectType, NodeEnum, NodeRef, ParseResult}; +use pgdog_config::QueryParserEngine; use std::fmt::Debug; use std::{collections::HashSet, ops::Deref}; @@ -68,9 +69,14 @@ impl Ast { schema: &ShardingSchema, prepared_statements: &mut PreparedStatements, ) -> Result { - let mut ast = parse(query).map_err(Error::PgQuery)?; + let mut ast = match schema.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => parse(query), + QueryParserEngine::PgQueryRaw => parse_raw(query), + } + .map_err(Error::PgQuery)?; let (comment_shard, comment_role) = comment(query, schema)?; - let fingerprint = Fingerprint::new(query).map_err(Error::PgQuery)?; + let fingerprint = + Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; // Don't rewrite statements that will be // sent to a direct shard. @@ -101,8 +107,12 @@ impl Ast { } /// Record new AST entry, without rewriting or comment-routing. - pub fn new_record(query: &str) -> Result { - let ast = parse(query).map_err(Error::PgQuery)?; + pub fn new_record(query: &str, query_parser_engine: QueryParserEngine) -> Result { + let ast = match query_parser_engine { + QueryParserEngine::PgQueryProtobuf => parse(query), + QueryParserEngine::PgQueryRaw => parse_raw(query), + } + .map_err(Error::PgQuery)?; Ok(Self { cached: true, diff --git a/pgdog/src/frontend/router/parser/cache/cache_impl.rs b/pgdog/src/frontend/router/parser/cache/cache_impl.rs index 87cc91d61..a08f9a51b 100644 --- a/pgdog/src/frontend/router/parser/cache/cache_impl.rs +++ b/pgdog/src/frontend/router/parser/cache/cache_impl.rs @@ -1,6 +1,7 @@ use lru::LruCache; use once_cell::sync::Lazy; use pg_query::normalize; +use pgdog_config::QueryParserEngine; use std::collections::HashMap; use parking_lot::Mutex; @@ -143,7 +144,12 @@ impl Cache { /// Used by dry run mode to keep stats on what queries are routed correctly, /// and which are not. /// - pub fn record_normalized(&self, query: &str, route: &Route) -> Result<(), Error> { + pub fn record_normalized( + &self, + query: &str, + route: &Route, + query_parser_engine: QueryParserEngine, + ) -> Result<(), Error> { let normalized = normalize(query).map_err(Error::PgQuery)?; { @@ -155,7 +161,7 @@ impl Cache { } } - let entry = Ast::new_record(&normalized)?; + let entry = Ast::new_record(&normalized, query_parser_engine)?; entry.update_stats(route); let mut guard = self.inner.lock(); diff --git a/pgdog/src/frontend/router/parser/cache/fingerprint.rs b/pgdog/src/frontend/router/parser/cache/fingerprint.rs index c7c6ef83b..7b7907a28 100644 --- a/pgdog/src/frontend/router/parser/cache/fingerprint.rs +++ b/pgdog/src/frontend/router/parser/cache/fingerprint.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, ops::Deref}; -use pg_query::fingerprint; +use pg_query::{fingerprint, fingerprint_raw}; +use pgdog_config::QueryParserEngine; /// Query fingerprint. pub struct Fingerprint { @@ -9,9 +10,12 @@ pub struct Fingerprint { impl Fingerprint { /// Fingerprint a query. - pub(crate) fn new(query: &str) -> Result { + pub(crate) fn new(query: &str, engine: QueryParserEngine) -> Result { Ok(Self { - fingerprint: fingerprint(query)?, + fingerprint: match engine { + QueryParserEngine::PgQueryProtobuf => fingerprint(query), + QueryParserEngine::PgQueryRaw => fingerprint_raw(query), + }?, }) } } diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index 57ef95fb5..85343c8c6 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -1,5 +1,7 @@ use once_cell::sync::Lazy; +use pg_query::scan_raw; use pg_query::{protobuf::Token, scan}; +use pgdog_config::QueryParserEngine; use regex::Regex; use crate::backend::ShardingSchema; @@ -33,7 +35,11 @@ pub fn comment( query: &str, schema: &ShardingSchema, ) -> Result<(Option, Option), Error> { - let tokens = scan(query).map_err(Error::PgQuery)?; + let tokens = match schema.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => scan(query), + QueryParserEngine::PgQueryRaw => scan_raw(query), + } + .map_err(Error::PgQuery)?; let mut role = None; for token in tokens.tokens.iter() { diff --git a/pgdog/src/frontend/router/parser/prepare.rs b/pgdog/src/frontend/router/parser/prepare.rs index 7feaff225..f727677e5 100644 --- a/pgdog/src/frontend/router/parser/prepare.rs +++ b/pgdog/src/frontend/router/parser/prepare.rs @@ -1,5 +1,7 @@ -use super::Error; use pg_query::protobuf::PrepareStmt; +use pgdog_config::QueryParserEngine; + +use super::Error; #[derive(Debug, Clone, PartialEq)] pub struct Prepare { @@ -7,16 +9,17 @@ pub struct Prepare { statement: String, } -impl TryFrom<&PrepareStmt> for Prepare { - type Error = super::Error; - - fn try_from(value: &PrepareStmt) -> Result { - let statement = value - .query - .as_ref() - .ok_or(Error::EmptyQuery)? - .deparse() - .map_err(|_| Error::EmptyQuery)?; +impl Prepare { + pub fn from_stmt( + value: &PrepareStmt, + query_parser_engine: QueryParserEngine, + ) -> Result { + let query = value.query.as_ref().ok_or(Error::EmptyQuery)?; + let statement = match query_parser_engine { + QueryParserEngine::PgQueryProtobuf => query.deparse(), + QueryParserEngine::PgQueryRaw => query.deparse_raw(), + } + .map_err(|_| Error::EmptyQuery)?; Ok(Self { name: value.name.to_string(), @@ -44,7 +47,8 @@ mod test { .unwrap(); match ast.node.unwrap() { NodeEnum::PrepareStmt(stmt) => { - let prepare = Prepare::try_from(stmt.as_ref()).unwrap(); + let prepare = + Prepare::from_stmt(stmt.as_ref(), QueryParserEngine::PgQueryProtobuf).unwrap(); assert_eq!(prepare.name, "test"); assert_eq!(prepare.statement, "SELECT $1, $2"); } diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 50e1078f2..e953b4aeb 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -439,7 +439,11 @@ impl QueryParser { // Record statement in cache with normalized parameters. if !statement.cached { let query = context.query()?.query(); - Cache::get().record_normalized(query, command.route())?; + Cache::get().record_normalized( + query, + command.route(), + context.sharding_schema.query_parser_engine, + )?; } Ok(command.dry_run()) } else { diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/insert.rs b/pgdog/src/frontend/router/parser/rewrite/statement/insert.rs index 855a95dab..d1c4077c7 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/insert.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/insert.rs @@ -1,5 +1,5 @@ use pg_query::{Node, NodeEnum}; -use pgdog_config::RewriteMode; +use pgdog_config::{QueryParserEngine, RewriteMode}; use crate::frontend::router::parser::Cache; use crate::frontend::router::Ast; @@ -234,7 +234,11 @@ impl StatementRewrite<'_> { } } - let stmt = ast.deparse()?; + let stmt = match self.schema.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => ast.deparse(), + QueryParserEngine::PgQueryRaw => ast.deparse_raw(), + }?; + Ok((params, stmt)) } @@ -295,7 +299,6 @@ mod tests { use pgdog_config::Rewrite; use super::*; - use crate::backend::replication::{ShardedSchemas, ShardedTables}; use crate::backend::ShardingSchema; use crate::frontend::router::parser::StatementRewriteContext; use crate::frontend::PreparedStatements; @@ -303,13 +306,12 @@ mod tests { fn default_schema() -> ShardingSchema { ShardingSchema { shards: 2, - tables: ShardedTables::default(), - schemas: ShardedSchemas::default(), rewrite: Rewrite { enabled: true, split_inserts: RewriteMode::Rewrite, ..Default::default() }, + ..Default::default() } } diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs b/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs index 40ad11c05..2081643a6 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs @@ -2,6 +2,7 @@ use pg_query::protobuf::ParseResult; use pg_query::Node; +use pgdog_config::QueryParserEngine; use crate::backend::ShardingSchema; use crate::frontend::PreparedStatements; @@ -107,7 +108,10 @@ impl<'a> StatementRewrite<'a> { self.rewrite_aggregates(&mut plan)?; if self.rewritten { - plan.stmt = Some(self.stmt.deparse()?); + plan.stmt = Some(match self.schema.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => self.stmt.deparse(), + QueryParserEngine::PgQueryRaw => self.stmt.deparse_raw(), + }?); } self.split_insert(&mut plan)?; diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/simple_prepared.rs b/pgdog/src/frontend/router/parser/rewrite/statement/simple_prepared.rs index c6d6627cf..54b306a34 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/simple_prepared.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/simple_prepared.rs @@ -1,7 +1,8 @@ use pg_query::{Error as PgQueryError, NodeEnum}; +use pgdog_config::QueryParserEngine; -use crate::frontend::PreparedStatements; use crate::net::Parse; +use crate::{backend::ShardingSchema, frontend::PreparedStatements}; use super::{Error, StatementRewrite}; @@ -48,7 +49,7 @@ impl StatementRewrite<'_> { for stmt in &mut self.stmt.stmts { if let Some(ref mut node) = stmt.stmt { if let Some(ref mut inner) = node.node { - match rewrite_single_prepared(inner, self.prepared_statements)? { + match rewrite_single_prepared(inner, self.prepared_statements, self.schema)? { SimplePreparedRewrite::Prepared => { result.rewritten = true; } @@ -70,6 +71,7 @@ impl StatementRewrite<'_> { fn rewrite_single_prepared( node: &mut NodeEnum, prepared_statements: &mut PreparedStatements, + schema: &ShardingSchema, ) -> Result { match node { NodeEnum::PrepareStmt(stmt) => { @@ -78,9 +80,12 @@ fn rewrite_single_prepared( .as_ref() .ok_or(Error::PgQuery(PgQueryError::Parse( "missing query in PREPARE".into(), - )))? - .deparse() - .map_err(Error::PgQuery)?; + )))?; + let query = match schema.query_parser_engine { + QueryParserEngine::PgQueryProtobuf => query.deparse(), + QueryParserEngine::PgQueryRaw => query.deparse_raw(), + } + .map_err(Error::PgQuery)?; let mut parse = Parse::named(&stmt.name, &query); prepared_statements.insert_anyway(&mut parse); @@ -116,7 +121,6 @@ fn rewrite_single_prepared( mod tests { use super::super::{RewritePlan, StatementRewrite, StatementRewriteContext}; use super::*; - use crate::backend::replication::{ShardedSchemas, ShardedTables}; use crate::backend::ShardingSchema; use crate::config::PreparedStatements as PreparedStatementsLevel; use pg_query::parse; @@ -136,12 +140,11 @@ mod tests { ps, schema: ShardingSchema { shards: 1, - tables: ShardedTables::default(), - schemas: ShardedSchemas::default(), rewrite: Rewrite { enabled: true, ..Default::default() }, + ..Default::default() }, } } diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/unique_id.rs b/pgdog/src/frontend/router/parser/rewrite/statement/unique_id.rs index 480d3eb97..054d72fa2 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/unique_id.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/unique_id.rs @@ -116,7 +116,6 @@ mod tests { use pgdog_config::Rewrite; use super::*; - use crate::backend::replication::{ShardedSchemas, ShardedTables}; use crate::backend::ShardingSchema; use crate::frontend::router::parser::StatementRewriteContext; use crate::frontend::PreparedStatements; @@ -124,12 +123,11 @@ mod tests { fn default_schema() -> ShardingSchema { ShardingSchema { shards: 1, - tables: ShardedTables::default(), - schemas: ShardedSchemas::default(), rewrite: Rewrite { enabled: true, ..Default::default() }, + ..Default::default() } } diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs index f5a401c0e..c4fda855c 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs @@ -8,7 +8,7 @@ use pg_query::{ }, Node, NodeEnum, }; -use pgdog_config::RewriteMode; +use pgdog_config::{QueryParserEngine, RewriteMode}; use crate::{ frontend::{ @@ -313,7 +313,8 @@ impl<'a> StatementRewrite<'a> { if stmt.where_clause.is_none() { return Err(Error::WhereClauseMissing); } - plan.sharding_key_update = Some(create_stmts(stmt, value)?); + plan.sharding_key_update = + Some(create_stmts(stmt, value, self.schema.query_parser_engine)?); } Ok(()) @@ -364,7 +365,7 @@ impl<'a> StatementRewrite<'a> { let expr = res .val .as_ref() - .map(|node| deparse_expr(node)) + .map(|node| deparse_expr(node, self.schema.query_parser_engine)) .transpose()? .unwrap_or_else(|| "".to_string()); Err(Error::UnsupportedShardingKeyUpdate(format!( @@ -433,6 +434,7 @@ pub(super) enum UpdateValue { /// fn res_targets_to_insert_res_targets( stmt: &UpdateStmt, + query_parser_engine: QueryParserEngine, ) -> Result, Error> { let mut result = HashMap::new(); for target in &stmt.target_list { @@ -446,7 +448,10 @@ fn res_targets_to_insert_res_targets( let value = if valid { UpdateValue::Value(*target.val.clone().unwrap()) } else { - UpdateValue::Expr(deparse_expr(target.val.as_ref().unwrap())?) + UpdateValue::Expr(deparse_expr( + target.val.as_ref().unwrap(), + query_parser_engine, + )?) }; result.insert(target.name.clone(), value); } @@ -502,7 +507,7 @@ fn select_star() -> Vec { fn parse_result(node: NodeEnum) -> ParseResult { ParseResult { - version: 170005, + version: pg_query::PG_VERSION_NUM as i32, stmts: vec![RawStmt { stmt: Some(Box::new(Node { node: Some(node), @@ -515,18 +520,24 @@ fn parse_result(node: NodeEnum) -> ParseResult { } /// Deparse an expression node by wrapping it in a SELECT statement. -fn deparse_expr(node: &Node) -> Result { - Ok(deparse_list(&[Node { - node: Some(NodeEnum::ResTarget(Box::new(ResTarget { - val: Some(Box::new(node.clone())), - ..Default::default() - }))), - }])? +fn deparse_expr(node: &Node, query_parser_engine: QueryParserEngine) -> Result { + Ok(deparse_list( + &[Node { + node: Some(NodeEnum::ResTarget(Box::new(ResTarget { + val: Some(Box::new(node.clone())), + ..Default::default() + }))), + }], + query_parser_engine, + )? .unwrap()) // SAFETY: we are not passing in an empty list. } /// Deparse a list of expressions by wrapping them into a SELECT statement. -fn deparse_list(list: &[Node]) -> Result, Error> { +fn deparse_list( + list: &[Node], + query_parser_engine: QueryParserEngine, +) -> Result, Error> { if list.is_empty() { return Ok(None); } @@ -537,16 +548,23 @@ fn deparse_list(list: &[Node]) -> Result, Error> { op: SetOperation::SetopNone.into(), ..Default::default() }; - let string = parse_result(NodeEnum::SelectStmt(Box::new(stmt))) - .deparse()? - .strip_prefix("SELECT ") - .unwrap_or_default() - .to_string(); + let result = parse_result(NodeEnum::SelectStmt(Box::new(stmt))); + let string = match query_parser_engine { + QueryParserEngine::PgQueryProtobuf => result.deparse()?, + QueryParserEngine::PgQueryRaw => result.deparse_raw()?, + } + .strip_prefix("SELECT ") + .unwrap_or_default() + .to_string(); Ok(Some(string)) } -fn create_stmts(stmt: &UpdateStmt, new_value: &ResTarget) -> Result { +fn create_stmts( + stmt: &UpdateStmt, + new_value: &ResTarget, + query_parser_engine: QueryParserEngine, +) -> Result { let select = SelectStmt { target_list: select_star(), from_clause: vec![Node { @@ -564,7 +582,10 @@ fn create_stmts(stmt: &UpdateStmt, new_value: &ResTarget) -> Result select.deparse()?, + QueryParserEngine::PgQueryRaw => select.deparse_raw()?, + }, ast: Ast::from_parse_result(select), params, }; @@ -582,7 +603,10 @@ fn create_stmts(stmt: &UpdateStmt, new_value: &ResTarget) -> Result delete.deparse()?, + QueryParserEngine::PgQueryRaw => delete.deparse_raw()?, + }, ast: Ast::from_parse_result(delete), params, }; @@ -605,7 +629,10 @@ fn create_stmts(stmt: &UpdateStmt, new_value: &ResTarget) -> Result check.deparse()?, + QueryParserEngine::PgQueryRaw => check.deparse_raw()?, + }, ast: Ast::from_parse_result(check), params, }; @@ -617,9 +644,9 @@ fn create_stmts(stmt: &UpdateStmt, new_value: &ResTarget) -> Result StatementParser<'a, 'b, 'c> { #[cfg(test)] mod test { - use pgdog_config::{ - FlexibleType, Mapping, Rewrite, ShardedMapping, ShardedMappingKind, ShardedTable, - }; + use pgdog_config::{FlexibleType, Mapping, ShardedMapping, ShardedMappingKind, ShardedTable}; use crate::backend::ShardedTables; use crate::net::messages::{Bind, Parameter}; @@ -1824,7 +1822,7 @@ mod test { all: false, }, ]), - rewrite: Rewrite::default(), + ..Default::default() }; let raw = pg_query::parse(stmt) .unwrap() diff --git a/pgdog/src/frontend/router/sharding/test/mod.rs b/pgdog/src/frontend/router/sharding/test/mod.rs index 9a4fc86ae..a9194475a 100644 --- a/pgdog/src/frontend/router/sharding/test/mod.rs +++ b/pgdog/src/frontend/router/sharding/test/mod.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, str::from_utf8}; -use rand::{seq::SliceRandom, thread_rng}; +use rand::seq::SliceRandom; use crate::{ backend::server::test::test_server, @@ -17,7 +17,7 @@ async fn test_shard_varchar() { let mut server = test_server().await; let inserts = (0..100) .map(|i| { - words.shuffle(&mut thread_rng()); + words.shuffle(&mut rand::rng()); let word = words.first().unwrap(); Query::new(format!( diff --git a/pgdog/src/net/discovery/listener.rs b/pgdog/src/net/discovery/listener.rs index faa872fca..2f05b0917 100644 --- a/pgdog/src/net/discovery/listener.rs +++ b/pgdog/src/net/discovery/listener.rs @@ -42,7 +42,7 @@ impl Listener { /// Create new listener. fn new() -> Self { Self { - id: rand::thread_rng().gen(), + id: rand::rng().random(), inner: Arc::new(Mutex::new(Inner { peers: HashMap::new(), })), diff --git a/pgdog/src/net/messages/backend_key.rs b/pgdog/src/net/messages/backend_key.rs index 5fb676e50..9f07d903c 100644 --- a/pgdog/src/net/messages/backend_key.rs +++ b/pgdog/src/net/messages/backend_key.rs @@ -41,8 +41,8 @@ impl BackendKeyData { /// Create new random BackendKeyData (B) message. pub fn new() -> Self { Self { - pid: rand::thread_rng().gen(), - secret: rand::thread_rng().gen(), + pid: rand::rng().random(), + secret: rand::rng().random(), } } @@ -52,7 +52,7 @@ impl BackendKeyData { pub fn new_client() -> Self { Self { pid: next_counter(), - secret: rand::thread_rng().gen(), + secret: rand::rng().random(), } } } diff --git a/pgdog/src/net/messages/buffer.rs b/pgdog/src/net/messages/buffer.rs index 56d3bcac5..35a414874 100644 --- a/pgdog/src/net/messages/buffer.rs +++ b/pgdog/src/net/messages/buffer.rs @@ -179,15 +179,15 @@ mod test { spawn(async move { let mut conn = TcpStream::connect(addr).await.unwrap(); use rand::{rngs::StdRng, Rng, SeedableRng}; - let mut rng = StdRng::from_entropy(); + let mut rng = StdRng::from_os_rng(); for i in 0..5000 { let msg = Sync.to_bytes().unwrap(); conn.write_all(&msg).await.unwrap(); - let query_len = rng.gen_range(10..=1000); + let query_len = rng.random_range(10..=1000); let query: String = (0..query_len) - .map(|_| rng.sample(rand::distributions::Alphanumeric) as char) + .map(|_| rng.sample(rand::distr::Alphanumeric) as char) .collect(); let msg = Parse::named(format!("test_{}", i), &query) diff --git a/pgdog/src/net/messages/mod.rs b/pgdog/src/net/messages/mod.rs index c58908b4d..f080ac706 100644 --- a/pgdog/src/net/messages/mod.rs +++ b/pgdog/src/net/messages/mod.rs @@ -302,5 +302,3 @@ from_message!(ReadyForQuery); from_message!(RowDescription); from_message!(Sync); from_message!(Terminate); - -pub(crate) use from_message; diff --git a/pgdog/src/util.rs b/pgdog/src/util.rs index 7615ff31d..61407976e 100644 --- a/pgdog/src/util.rs +++ b/pgdog/src/util.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Local, Utc}; use once_cell::sync::Lazy; use pgdog_plugin::comp; -use rand::{distributions::Alphanumeric, Rng}; +use rand::{distr::Alphanumeric, Rng}; use std::{env, num::ParseIntError, ops::Deref, time::Duration}; use crate::net::Parameters; // 0.8 @@ -67,7 +67,7 @@ pub fn postgres_now() -> i64 { /// Generate a random string of length n. pub fn random_string(n: usize) -> String { - rand::thread_rng() + rand::rng() .sample_iter(&Alphanumeric) .take(n) .map(char::from) @@ -79,10 +79,10 @@ static INSTANCE_ID: Lazy = Lazy::new(|| { if let Ok(node_id) = env::var("NODE_ID") { node_id } else { - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); (0..8) .map(|_| { - let n: u8 = rng.gen_range(0..16); + let n: u8 = rng.random_range(0..16); format!("{:x}", n) }) .collect()