diff --git a/Cargo.lock b/Cargo.lock index e7b114c98..8fd4b6e7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2385,6 +2385,7 @@ dependencies = [ "rustls-native-certs", "rustls-pki-types", "scram", + "semver", "serde", "serde_json", "sha1", @@ -2413,7 +2414,7 @@ dependencies = [ [[package]] name = "pgdog-macros" -version = "0.1.1" +version = "0.1.2" dependencies = [ "proc-macro2", "quote", @@ -2422,7 +2423,7 @@ dependencies = [ [[package]] name = "pgdog-plugin" -version = "0.1.8" +version = "0.1.9" dependencies = [ "bindgen 0.71.1", "libc", diff --git a/pgdog-macros/Cargo.toml b/pgdog-macros/Cargo.toml index 4bf671240..33f84d4f7 100644 --- a/pgdog-macros/Cargo.toml +++ b/pgdog-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgdog-macros" -version = "0.1.1" +version = "0.1.2" edition = "2024" authors = ["Lev Kokotov "] license = "MIT" diff --git a/pgdog-macros/src/lib.rs b/pgdog-macros/src/lib.rs index 65df8ce16..e55a0a8fb 100644 --- a/pgdog-macros/src/lib.rs +++ b/pgdog-macros/src/lib.rs @@ -25,6 +25,14 @@ pub fn plugin(_input: TokenStream) -> TokenStream { } } + #[unsafe(no_mangle)] + pub unsafe extern "C" fn pgdog_plugin_lib_version(output: *mut pgdog_plugin::PdStr) { + let version = pgdog_plugin::comp::plugin_lib_version(); + unsafe { + *output = version; + } + } + #[unsafe(no_mangle)] pub unsafe extern "C" fn pgdog_pg_query_version(output: *mut pgdog_plugin::PdStr) { let version: pgdog_plugin::PdStr = option_env!("PGDOG_PGQUERY_VERSION") diff --git a/pgdog-plugin/Cargo.toml b/pgdog-plugin/Cargo.toml index 454c3026a..f551bab3c 100644 --- a/pgdog-plugin/Cargo.toml +++ b/pgdog-plugin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgdog-plugin" -version = "0.1.8" +version = "0.1.9" edition = "2021" license = "MIT" authors = ["Lev Kokotov "] @@ -18,7 +18,7 @@ libloading = "0.8" libc = "0.2" tracing = "0.1" pg_query = "6.1.1" -pgdog-macros = { path = "../pgdog-macros", version = "0.1.1" } +pgdog-macros = { path = "../pgdog-macros", version = "0.1.2" } toml = "0.9" [build-dependencies] diff --git a/pgdog-plugin/include/types.h b/pgdog-plugin/include/types.h index e21b580b3..a5942b60b 100644 --- a/pgdog-plugin/include/types.h +++ b/pgdog-plugin/include/types.h @@ -61,10 +61,23 @@ typedef struct PdRouterContext { PdParameters params; } PdRouterContext; +/** + * ErrorResponse. + */ +typedef struct PdErrorResponse { + const char *severity; + const char *code; + const char *message; + const char *detail; + const char *context; + const char *file; + const char *routine; +} PdErrorResponse; + /** * Routing decision returned by the plugin. */ - typedef struct PdRoute { +typedef struct PdRoute { /** Which shard the query should go to. * * `-1` for all shards, `-2` for unknown, this setting is ignored. @@ -75,4 +88,8 @@ typedef struct PdRouterContext { * `1` for `true`, `0` for `false`, `2` for unknown, this setting is ignored. */ uint8_t read_write; + /** + * Specific error response to return, if blocking query. + */ + PdErrorResponse error_response; } PdRoute; diff --git a/pgdog-plugin/src/bindings.rs b/pgdog-plugin/src/bindings.rs index 561d24e5b..b7986c27f 100644 --- a/pgdog-plugin/src/bindings.rs +++ b/pgdog-plugin/src/bindings.rs @@ -308,6 +308,37 @@ const _: () = { ["Offset of field: PdRouterContext::params"] [::std::mem::offset_of!(PdRouterContext, params) - 40usize]; }; +#[doc = " ErrorResponse."] +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct PdErrorResponse { + pub severity: *const ::std::os::raw::c_char, + pub code: *const ::std::os::raw::c_char, + pub message: *const ::std::os::raw::c_char, + pub detail: *const ::std::os::raw::c_char, + pub context: *const ::std::os::raw::c_char, + pub file: *const ::std::os::raw::c_char, + pub routine: *const ::std::os::raw::c_char, +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of PdErrorResponse"][::std::mem::size_of::() - 56usize]; + ["Alignment of PdErrorResponse"][::std::mem::align_of::() - 8usize]; + ["Offset of field: PdErrorResponse::severity"] + [::std::mem::offset_of!(PdErrorResponse, severity) - 0usize]; + ["Offset of field: PdErrorResponse::code"] + [::std::mem::offset_of!(PdErrorResponse, code) - 8usize]; + ["Offset of field: PdErrorResponse::message"] + [::std::mem::offset_of!(PdErrorResponse, message) - 16usize]; + ["Offset of field: PdErrorResponse::detail"] + [::std::mem::offset_of!(PdErrorResponse, detail) - 24usize]; + ["Offset of field: PdErrorResponse::context"] + [::std::mem::offset_of!(PdErrorResponse, context) - 32usize]; + ["Offset of field: PdErrorResponse::file"] + [::std::mem::offset_of!(PdErrorResponse, file) - 40usize]; + ["Offset of field: PdErrorResponse::routine"] + [::std::mem::offset_of!(PdErrorResponse, routine) - 48usize]; +}; #[doc = " Routing decision returned by the plugin."] #[repr(C)] #[derive(Debug, Copy, Clone)] @@ -316,11 +347,15 @@ pub struct PdRoute { pub shard: i64, #[doc = " Is the query a read and should go to a replica?\n\n `1` for `true`, `0` for `false`, `2` for unknown, this setting is ignored."] pub read_write: u8, + #[doc = " Specific error response to return, if blocking query."] + pub error_response: PdErrorResponse, } #[allow(clippy::unnecessary_operation, clippy::identity_op)] const _: () = { - ["Size of PdRoute"][::std::mem::size_of::() - 16usize]; + ["Size of PdRoute"][::std::mem::size_of::() - 72usize]; ["Alignment of PdRoute"][::std::mem::align_of::() - 8usize]; ["Offset of field: PdRoute::shard"][::std::mem::offset_of!(PdRoute, shard) - 0usize]; ["Offset of field: PdRoute::read_write"][::std::mem::offset_of!(PdRoute, read_write) - 8usize]; + ["Offset of field: PdRoute::error_response"] + [::std::mem::offset_of!(PdRoute, error_response) - 16usize]; }; diff --git a/pgdog-plugin/src/comp.rs b/pgdog-plugin/src/comp.rs index e49fe0380..53b41c935 100644 --- a/pgdog-plugin/src/comp.rs +++ b/pgdog-plugin/src/comp.rs @@ -6,3 +6,8 @@ use crate::PdStr; pub fn rustc_version() -> PdStr { env!("RUSTC_VERSION").into() } + +/// Version of pgdog-plugin itself. +pub fn plugin_lib_version() -> PdStr { + env!("CARGO_PKG_VERSION").into() +} diff --git a/pgdog-plugin/src/context.rs b/pgdog-plugin/src/context.rs index b05ef001a..8aff142c3 100644 --- a/pgdog-plugin/src/context.rs +++ b/pgdog-plugin/src/context.rs @@ -1,9 +1,10 @@ //! Context passed to and from the plugins. -use std::ops::Deref; +use std::{fmt::Debug, ops::Deref}; use crate::{ - bindings::PdRouterContext, parameters::Parameters, PdParameters, PdRoute, PdStatement, + bindings::PdRouterContext, error_response::ErrorResponse, parameters::Parameters, + PdErrorResponse, PdParameters, PdRoute, PdStatement, }; /// PostgreSQL statement, parsed by [`pg_query`]. @@ -449,6 +450,7 @@ impl Route { ffi: PdRoute { shard: shard.into(), read_write: read_write.into(), + error_response: PdErrorResponse::none(), }, } } @@ -462,6 +464,7 @@ impl Route { ffi: PdRoute { shard: -2, read_write: 2, + error_response: PdErrorResponse::none(), }, } } @@ -473,7 +476,48 @@ impl Route { ffi: PdRoute { shard: -3, read_write: 2, + error_response: PdErrorResponse::none(), }, } } + + /// Block the query from being sent to a database and return the provided error message. + /// + /// # Arguments + /// + /// * `error`: Error response to return to the client. + /// + pub fn error(error: ErrorResponse) -> Route { + Self { + ffi: PdRoute { + shard: -3, + read_write: 2, + error_response: error.into(), + }, + } + } + + /// Get the error set by the plugin on the route, if any. + pub fn get_error(&self) -> Option { + if self.error_response.is_none() { + None + } else { + Some(ErrorResponse::from(self.ffi.error_response)) + } + } +} + +impl Debug for Route { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Route") + .field("shard", &self.ffi.shard) + .field("read_write", &self.ffi.read_write) + .finish() + } +} + +impl Drop for Route { + fn drop(&mut self) { + unsafe { self.ffi.error_response.deallocate() }; + } } diff --git a/pgdog-plugin/src/error_response.rs b/pgdog-plugin/src/error_response.rs new file mode 100644 index 000000000..ba32388b4 --- /dev/null +++ b/pgdog-plugin/src/error_response.rs @@ -0,0 +1,163 @@ +//! ErrorResponse message. +use std::ffi::{CStr, CString}; +use std::ptr; + +use crate::bindings::PdErrorResponse; + +/// Error response, returned to the client upon blocking a query +/// from executing. +#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct ErrorResponse { + /// Severity, e.g., INFO, WARNING, ERROR, etc. + pub severity: String, + /// Error code. See for list of well-known error codes. + pub code: String, + /// Error message as it will be shown to the client. + pub message: String, + /// Additional error details (optional). + pub detail: Option, + /// Error context (optional). + pub context: Option, + /// The file where the error occured. Helps with debugging. Optional. + pub file: Option, + /// The name of the function that returned the error. Helps with debugging. Optional. + pub routine: Option, +} + +impl PdErrorResponse { + /// Create a NULL ErrorResponse, i.e., no error. + pub fn none() -> Self { + let null = ptr::null(); + Self { + severity: null, + code: null, + message: null, + detail: null, + context: null, + file: null, + routine: null, + } + } +} + +impl From for ErrorResponse { + fn from(value: PdErrorResponse) -> Self { + fn cstr_to_string(ptr: *const std::os::raw::c_char) -> String { + if ptr.is_null() { + String::new() + } else { + unsafe { CStr::from_ptr(ptr).to_string_lossy().to_string() } + } + } + + fn cstr_to_option_string(ptr: *const std::os::raw::c_char) -> Option { + if ptr.is_null() { + None + } else { + unsafe { Some(CStr::from_ptr(ptr).to_string_lossy().to_string()) } + } + } + + Self { + severity: cstr_to_string(value.severity), + code: cstr_to_string(value.code), + message: cstr_to_string(value.message), + detail: cstr_to_option_string(value.detail), + context: cstr_to_option_string(value.context), + file: cstr_to_option_string(value.file), + routine: cstr_to_option_string(value.routine), + } + } +} + +impl From for PdErrorResponse { + fn from(value: ErrorResponse) -> Self { + fn string_to_ptr(s: String) -> *const std::os::raw::c_char { + CString::new(s).unwrap().into_raw() + } + + fn option_string_to_ptr(s: Option) -> *const std::os::raw::c_char { + match s { + Some(s) => string_to_ptr(s), + None => ptr::null(), + } + } + + Self { + severity: string_to_ptr(value.severity), + code: string_to_ptr(value.code), + message: string_to_ptr(value.message), + detail: option_string_to_ptr(value.detail), + context: option_string_to_ptr(value.context), + file: option_string_to_ptr(value.file), + routine: option_string_to_ptr(value.routine), + } + } +} + +impl PdErrorResponse { + /// Error response not set (is null). + pub fn is_none(&self) -> bool { + self.severity.is_null() + } + + /// Take ownership of the error response and deallocate its memory. + /// This consumes the PdErrorResponse and properly deallocates all C strings. + pub unsafe fn deallocate(&mut self) { + fn deallocate_cstring_ptr(ptr: &mut *const std::os::raw::c_char) { + unsafe { + if !ptr.is_null() { + let _ = CString::from_raw(*ptr as *mut std::os::raw::c_char); + } + } + + *ptr = ptr::null(); + } + + deallocate_cstring_ptr(&mut self.severity); + deallocate_cstring_ptr(&mut self.code); + deallocate_cstring_ptr(&mut self.message); + deallocate_cstring_ptr(&mut self.detail); + deallocate_cstring_ptr(&mut self.context); + deallocate_cstring_ptr(&mut self.file); + deallocate_cstring_ptr(&mut self.routine); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pd_error_response_deallocate() { + let error_response = ErrorResponse { + severity: "ERROR".to_string(), + code: "42P01".to_string(), + message: "table does not exist".to_string(), + detail: Some("The table 'users' was not found".to_string()), + context: None, + file: Some("executor.rs".to_string()), + routine: Some("execute_query".to_string()), + }; + + let mut pd_error: PdErrorResponse = error_response.into(); + + assert!(!pd_error.severity.is_null()); + assert!(!pd_error.code.is_null()); + assert!(!pd_error.message.is_null()); + assert!(!pd_error.detail.is_null()); + assert!(pd_error.context.is_null()); + assert!(!pd_error.file.is_null()); + assert!(!pd_error.routine.is_null()); + + unsafe { pd_error.deallocate() }; + } + + #[test] + fn test_pd_error_response_deallocate_none() { + let mut pd_error = PdErrorResponse::none(); + assert!(pd_error.is_none()); + + unsafe { pd_error.deallocate() }; + } +} diff --git a/pgdog-plugin/src/lib.rs b/pgdog-plugin/src/lib.rs index c691b2749..723b9a6fa 100644 --- a/pgdog-plugin/src/lib.rs +++ b/pgdog-plugin/src/lib.rs @@ -167,6 +167,7 @@ pub mod bindings; pub mod ast; pub mod comp; pub mod context; +pub mod error_response; pub mod parameters; pub mod plugin; pub mod prelude; @@ -174,6 +175,7 @@ pub mod string; pub use bindings::*; pub use context::*; +pub use error_response::*; pub use plugin::*; pub use libloading; diff --git a/pgdog-plugin/src/plugin.rs b/pgdog-plugin/src/plugin.rs index fbd03d38b..75d6e60e7 100644 --- a/pgdog-plugin/src/plugin.rs +++ b/pgdog-plugin/src/plugin.rs @@ -32,6 +32,8 @@ pub struct Plugin<'a> { rustc_version: Option>, /// Plugin version. plugin_version: Option>, + /// Library version. + lib_version: Option>, } impl<'a> Plugin<'a> { @@ -73,6 +75,7 @@ impl<'a> Plugin<'a> { let route = unsafe { library.get(b"pgdog_route\0") }.ok(); let rustc_version = unsafe { library.get(b"pgdog_rustc_version\0") }.ok(); let plugin_version = unsafe { library.get(b"pgdog_plugin_version\0") }.ok(); + let lib_version = unsafe { library.get(b"pgdog_plugin_lib_version\0") }.ok(); Self { name: name.to_owned(), @@ -81,6 +84,7 @@ impl<'a> Plugin<'a> { route, rustc_version, plugin_version, + lib_version, } } @@ -149,4 +153,13 @@ impl<'a> Plugin<'a> { output }) } + + /// `pgdog-plugin` version used by the plugin. + pub fn lib_version(&self) -> Option { + let mut output = PdStr::default(); + self.lib_version.as_ref().map(|func| unsafe { + func(&mut output as *mut PdStr); + output + }) + } } diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 43c22272c..7cf654f7a 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -61,6 +61,7 @@ lru = "0.16" hickory-resolver = "0.25.2" lazy_static = "1" dashmap = "6" +semver = "1" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index f04779828..a88cc1250 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -114,7 +114,7 @@ impl QueryEngine { } // Route transaction to the right servers. - if !self.route_transaction(context).await? { + if !self.route_query(context).await? { self.update_stats(context); debug!("transaction has nowhere to go"); return Ok(()); diff --git a/pgdog/src/frontend/client/query_engine/route_query.rs b/pgdog/src/frontend/client/query_engine/route_query.rs index e8369680a..e09045c45 100644 --- a/pgdog/src/frontend/client/query_engine/route_query.rs +++ b/pgdog/src/frontend/client/query_engine/route_query.rs @@ -1,9 +1,10 @@ use tracing::{error, trace}; use super::*; +use crate::frontend::router::{parser::Error as ParserError, Error as RouterError}; impl QueryEngine { - pub(super) async fn route_transaction( + pub(super) async fn route_query( &mut self, context: &mut QueryEngineContext<'_>, ) -> Result { @@ -29,6 +30,12 @@ impl QueryEngine { cmd ); } + // Query intercepted by plugin. + Err(RouterError::Parser(ParserError::ErrorResponse(err))) => { + self.stats + .sent(context.stream.error(err, context.in_transaction()).await?); + return Ok(false); + } Err(err) => { error!("{:?} [{:?}]", err, context.stream.peer_addr()); let bytes_sent = context diff --git a/pgdog/src/frontend/router/parser/error.rs b/pgdog/src/frontend/router/parser/error.rs index c4f9e422b..30ef65709 100644 --- a/pgdog/src/frontend/router/parser/error.rs +++ b/pgdog/src/frontend/router/parser/error.rs @@ -2,7 +2,7 @@ use thiserror::Error; -use crate::frontend::router::sharding; +use crate::{frontend::router::sharding, net::ErrorResponse}; #[derive(Debug, Error)] pub enum Error { @@ -68,4 +68,7 @@ pub enum Error { #[error("regex error")] RegexError, + + #[error("{0}")] + ErrorResponse(ErrorResponse), } diff --git a/pgdog/src/frontend/router/parser/query/plugins.rs b/pgdog/src/frontend/router/parser/query/plugins.rs index eecdd9bd4..3204643cd 100644 --- a/pgdog/src/frontend/router/parser/query/plugins.rs +++ b/pgdog/src/frontend/router/parser/query/plugins.rs @@ -49,6 +49,12 @@ impl QueryParser { for plugin in plugins { if let Some(route) = plugin.route(context) { + // SAFETY: This can be acquired only once. If you drop + // the route, it'll deallocate the error. + let route: pgdog_plugin::Route = route.into(); + if let Some(error) = route.get_error() { + return Err(Error::ErrorResponse(error.into())); + } match route.shard.try_into() { Ok(shard) => match shard { PdShard::All => self.plugin_output.shard = Some(Shard::All), diff --git a/pgdog/src/net/messages/error_response.rs b/pgdog/src/net/messages/error_response.rs index f9107aabf..4ba688e33 100644 --- a/pgdog/src/net/messages/error_response.rs +++ b/pgdog/src/net/messages/error_response.rs @@ -19,6 +19,20 @@ pub struct ErrorResponse { pub routine: Option, } +impl From for ErrorResponse { + fn from(value: pgdog_plugin::ErrorResponse) -> Self { + Self { + severity: value.severity, + code: value.code, + message: value.message, + detail: value.detail, + context: value.context, + file: value.file, + routine: value.routine, + } + } +} + impl Default for ErrorResponse { fn default() -> Self { Self { diff --git a/pgdog/src/plugin/mod.rs b/pgdog/src/plugin/mod.rs index 23cbbf6da..22aaa85a9 100644 --- a/pgdog/src/plugin/mod.rs +++ b/pgdog/src/plugin/mod.rs @@ -2,15 +2,17 @@ use std::ops::Deref; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; use pgdog_plugin::libloading::Library; use pgdog_plugin::Plugin; use pgdog_plugin::{comp, libloading}; +use semver::Version; use tokio::time::Instant; use tracing::{debug, error, info, warn}; static LIBS: OnceCell> = OnceCell::new(); pub static PLUGINS: OnceCell> = OnceCell::new(); +static MIN_VERSION: Lazy = Lazy::new(|| Version::parse("0.1.9").unwrap()); /// Load plugins. /// @@ -60,6 +62,24 @@ pub fn load(names: &[&str]) -> Result<(), libloading::Error> { continue; } + // Check pgdog-plugin version. + if let Some(lib_version) = plugin.lib_version() { + let lib_version = lib_version.deref(); + let lib_version = Version::parse(lib_version).unwrap(); + if lib_version < *MIN_VERSION { + warn!("skipping plugin \"{}\" because it's using an unsupported version of pgdog-plugin crate ({})", + plugin.name(), + lib_version + ); + continue; + } + } else { + warn!("skipping plugin \"{}\" because it's using an old version of pgdog-plugin crate", + plugin.name(), + ); + continue; + } + if plugin.init() { debug!("plugin \"{}\" initialized", name); }