diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 0010f320e434d..3b85a9efeaea6 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -1019,7 +1019,8 @@ impl SessionClient { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } - | Command::ExecuteCopyTo { .. } => {} + | Command::ExecuteCopyTo { .. } + | Command::ExecuteSideEffectingFunc { .. } => {} }; cmd }); diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 2aeded4153060..d6b13a3544323 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -33,7 +33,7 @@ use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType use mz_sql::ast::{FetchDirection, Raw, Statement}; use mz_sql::catalog::ObjectType; use mz_sql::optimizer_metrics::OptimizerMetrics; -use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind}; +use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc}; use mz_sql::session::user::User; use mz_sql::session::vars::{OwnedVarInput, SystemVars}; use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement}; @@ -221,6 +221,15 @@ pub enum Command { conn_id: ConnectionId, tx: oneshot::Sender>, }, + + /// Execute a side-effecting function from the frontend peek path. + ExecuteSideEffectingFunc { + plan: SideEffectingFunc, + conn_id: ConnectionId, + /// The current role of the session, used for RBAC checks. + current_role: RoleId, + tx: oneshot::Sender>, + }, } impl Command { @@ -247,7 +256,8 @@ impl Command { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } - | Command::ExecuteCopyTo { .. } => None, + | Command::ExecuteCopyTo { .. } + | Command::ExecuteSideEffectingFunc { .. } => None, } } @@ -274,7 +284,8 @@ impl Command { | Command::GetTransactionReadHoldsBundle { .. } | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } - | Command::ExecuteCopyTo { .. } => None, + | Command::ExecuteCopyTo { .. } + | Command::ExecuteSideEffectingFunc { .. } => None, } } } diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 881cb199abe3a..fde461293e5a6 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -373,6 +373,7 @@ impl Message { Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds", Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek", Command::ExecuteCopyTo { .. } => "execute-copy-to", + Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func", }, Message::ControllerReady { controller: ControllerReadiness::Compute, diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index d0919f66f67cc..1eaa8db8b7860 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -916,18 +916,21 @@ pub struct GroupCommitPermit { _permit: Option, } -/// When we start a [`Session`] we need to update some builtin tables, we don't want to wait for +/// When we start a [`Session`] we need to update some builtin tables, but we don't want to wait for /// these writes to complete for two reasons: /// /// 1. Doing a write can take a relatively long time. /// 2. Decoupling the write from the session start allows us to batch multiple writes together, if /// sessions are being created with a high frequency. /// -/// So as an optimization we do not wait for these writes to complete. But if a [`Session`] tries +/// So, as an optimization we do not wait for these writes to complete. But if a [`Session`] tries /// to query any of these builtin objects, we need to block that query on the writes completing to /// maintain linearizability. /// /// Warning: this already clears the wait flag (i.e., it calls `clear_builtin_table_updates`). +/// +/// TODO(peek-seq): After we delete the old peek sequencing, we can remove the first component of +/// the return tuple. pub(crate) fn waiting_on_startup_appends( catalog: &Catalog, session: &mut Session, diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index c9f752ecbddff..76a4fda6cba9b 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -382,6 +382,18 @@ impl Coordinator { ) .await; } + + Command::ExecuteSideEffectingFunc { + plan, + conn_id, + current_role, + tx, + } => { + let result = self + .execute_side_effecting_func(plan, conn_id, current_role) + .await; + let _ = tx.send(result); + } } } .instrument(debug_span!("handle_command")) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ff6a8dc314881..f414cc514ab94 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -2227,6 +2227,68 @@ impl Coordinator { } } + /// Execute a side-effecting function from the frontend peek path. + /// This is separate from `sequence_side_effecting_func` because + /// - It doesn't have an ExecuteContext. + /// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend + /// peek sequencing can't look at `active_conns`. + /// + /// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek + /// sequencing. + pub(crate) async fn execute_side_effecting_func( + &mut self, + plan: SideEffectingFunc, + conn_id: ConnectionId, + current_role: RoleId, + ) -> Result { + match plan { + SideEffectingFunc::PgCancelBackend { connection_id } => { + if conn_id.unhandled() == connection_id { + // As a special case, if we're canceling ourselves, we return + // a canceled response to the client issuing the query, + // and so we need to do no further processing of the cancel. + return Err(AdapterError::Canceled); + } + + // Perform RBAC check: the current user must be a member of the role + // that owns the connection being cancelled. + if let Some((_id_handle, conn_meta)) = + self.active_conns.get_key_value(&connection_id) + { + let target_role = *conn_meta.authenticated_role_id(); + let role_membership = self + .catalog() + .state() + .collect_role_membership(¤t_role); + if !role_membership.contains(&target_role) { + let target_role_name = self + .catalog() + .try_get_role(&target_role) + .map(|role| role.name().to_string()) + .unwrap_or_else(|| target_role.to_string()); + return Err(AdapterError::Unauthorized( + rbac::UnauthorizedError::RoleMembership { + role_names: vec![target_role_name], + }, + )); + } + + // RBAC check passed, proceed with cancellation. + let id_handle = self + .active_conns + .get_key_value(&connection_id) + .map(|(id, _)| id.clone()) + .expect("checked above"); + self.handle_privileged_cancel(id_handle).await; + Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True]))) + } else { + // Connection not found, return false. + Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False]))) + } + } + } + } + /// Inner method that performs the actual real-time recency timestamp determination. /// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`) /// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`. diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 130c4ed6eaee6..59fa675e9980d 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -19,7 +19,7 @@ use mz_expr::{CollectionPlan, ResultSpec}; use mz_ore::cast::{CastFrom, CastLossy}; use mz_ore::collections::CollectionExt; use mz_ore::now::EpochMillis; -use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log}; +use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp}; @@ -28,7 +28,7 @@ use mz_sql::plan::{self, Plan, QueryWhen}; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; use mz_sql::session::vars::IsolationLevel; -use mz_sql_parser::ast::{CopyDirection, ExplainStage, Statement}; +use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, Statement}; use mz_transform::EmptyStatisticsOracle; use mz_transform::dataflow::DataflowMetainfo; use opentelemetry::trace::TraceContextExt; @@ -142,12 +142,12 @@ impl PeekClient { } } Statement::ExplainPushdown(explain_stmt) => { - // Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements + // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements match &explain_stmt.explainee { - mz_sql_parser::ast::Explainee::Select(..) => {} + mz_sql_parser::ast::Explainee::Select(_, false) => {} _ => { debug!( - "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query" + "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a (non-BROKEN) SELECT query" ); return Ok(None); } @@ -156,7 +156,14 @@ impl PeekClient { Statement::Copy(copy_stmt) => { match ©_stmt.direction { CopyDirection::To => { - // This is COPY TO, continue + // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe + if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) { + debug!( + "Bailing out from try_frontend_peek_inner, because COPY (SUBSCRIBE ...) TO is not supported" + ); + return Ok(None); + } + // This is COPY TO (SELECT), continue } CopyDirection::From => { debug!( @@ -186,6 +193,7 @@ impl PeekClient { let pcx = session.pcx(); let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?; + let (select_plan, explain_ctx, copy_to_ctx) = match &plan { Plan::Select(select_plan) => { let explain_ctx = if session.vars().emit_plan_insights_notice() { @@ -253,6 +261,12 @@ impl PeekClient { (plan, explain_ctx, None) } _ => { + // This shouldn't happen because we already checked for this at the AST + // level before calling `try_frontend_peek_inner`. + soft_panic_or_log!( + "unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}", + explainee + ); debug!( "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN" ); @@ -260,9 +274,29 @@ impl PeekClient { } } } + Plan::SideEffectingFunc(sef_plan) => { + // Side-effecting functions need Coordinator state (e.g., active_conns), + // so delegate to the Coordinator via a Command. + // The RBAC check is performed in the Coordinator where active_conns is available. + let response = self + .call_coordinator(|tx| Command::ExecuteSideEffectingFunc { + plan: sef_plan.clone(), + conn_id: session.conn_id().clone(), + current_role: session.role_metadata().current_role, + tx, + }) + .await?; + return Ok(Some(response)); + } _ => { + // This shouldn't happen because we already checked for this at the AST + // level before calling `try_frontend_peek_inner`. + soft_panic_or_log!( + "Unexpected plan kind in frontend peek sequencing: {:?}", + plan + ); debug!( - "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO" + "Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3" ); return Ok(None); } @@ -296,6 +330,8 @@ impl PeekClient { rbac::check_plan( &conn_catalog, + // We can't look at `active_conns` here, but that's ok, because this case was handled + // above already inside `Command::ExecuteSideEffectingFunc`. None:: Option>, session, &plan, @@ -303,22 +339,10 @@ impl PeekClient { &resolved_ids, )?; - // Check if we're still waiting for any of the builtin table appends from when we - // started the Session to complete. - // - // (This is done slightly earlier in the normal peek sequencing, but we have to be past the - // last use of `conn_catalog` here.) - if let Some(_) = coord::appends::waiting_on_startup_appends(&*catalog, session, &plan) { - // TODO(peek-seq): Don't fall back to the coordinator's peek sequencing here, but call - // `defer_op`. Needs `ExecuteContext`. - // This fallback is currently causing a bug: `waiting_on_startup_appends` has the - // side effect that it already clears the wait flag, and therefore the old peek - // sequencing that we fall back to here won't do waiting. This is tested by - // `test_mz_sessions` and `test_pg_cancel_dropped_role`, where I've disabled the - // frontend peek sequencing for now. This bug will just go away once we don't fall back - // to the old peek sequencing here, but properly call `defer_op` instead. - debug!("Bailing out from try_frontend_peek_inner, because waiting_on_startup_appends"); - return Ok(None); + if let Some((_, wait_future)) = + coord::appends::waiting_on_startup_appends(&*catalog, session, &plan) + { + wait_future.await; } let max_query_result_size = Some(session.vars().max_query_result_size()); diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 9dc5454ed1a03..c805727275cfe 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -2604,11 +2604,6 @@ fn test_dont_drop_sinks_twice() { #[mz_ore::test] fn test_mz_sessions() { let server = test_util::TestHarness::default().start_blocking(); - // TODO(peek-seq): This would currently fail with the flag on, indicating a bug in the frontend - // peek sequencing. Re-enable this once we never fall back from the frontend peek sequencing to - // the old peek sequencing. See comment on the call to `waiting_on_startup_appends` in - // `frontend_peek.rs`. - server.disable_feature_flags(&["enable_frontend_peek_sequencing"]); let mut foo_client = server .pg_config() @@ -3169,13 +3164,6 @@ fn test_params() { #[mz_ore::test] fn test_pg_cancel_dropped_role() { let server = test_util::TestHarness::default().start_blocking(); - - // TODO(peek-seq): This would currently fail with the flag on, indicating a bug in the frontend - // peek sequencing. Re-enable this once we never fall back from the frontend peek sequencing to - // the old peek sequencing. See comment on the call to `waiting_on_startup_appends` in - // `frontend_peek.rs`. - server.disable_feature_flags(&["enable_frontend_peek_sequencing"]); - let dropped_role = "r1"; let mut query_client = server.connect(postgres::NoTls).unwrap(); diff --git a/src/sql/src/plan/side_effecting_func.rs b/src/sql/src/plan/side_effecting_func.rs index ffd216c1724f8..8fb4e4a9550bc 100644 --- a/src/sql/src/plan/side_effecting_func.rs +++ b/src/sql/src/plan/side_effecting_func.rs @@ -55,10 +55,10 @@ use crate::plan::{PlanError, QueryContext}; /// effects. /// /// See the module docs for details. -#[derive(Debug, EnumKind)] +#[derive(Debug, EnumKind, Clone)] #[enum_kind(SefKind)] pub enum SideEffectingFunc { - /// The `pg_cancel_backend` function, . + /// The `pg_cancel_backend` function. PgCancelBackend { // The ID of the connection to cancel. connection_id: u32, diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index be2e4997a1e8e..5819be2553a6d 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -338,6 +338,9 @@ pub fn check_plan( catalog: &impl SessionCatalog, // Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently. // Only required for Plan::SideEffectingFunc; can be None for other plan types. + // TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses + // `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek + // sequencing uses. active_conns: Option Option>, session: &dyn SessionMetadata, plan: &Plan, diff --git a/test/sqllogictest/explain/broken_statements.slt b/test/sqllogictest/explain/broken_statements.slt index add97ae05fa0f..862016da94b52 100644 --- a/test/sqllogictest/explain/broken_statements.slt +++ b/test/sqllogictest/explain/broken_statements.slt @@ -101,6 +101,8 @@ EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR CREATE MATERIALIZED VIEW mv AS SELECT pg_catalog.now(); +statement error db error: ERROR: EXPLAIN FILTER PUSHDOWN queries for this explainee type are not supported +EXPLAIN FILTER PUSHDOWN FOR BROKEN SELECT 1; # EXPLAIN ... BROKEN CREATE INDEX # -------------------------------