Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,8 @@ impl SessionClient {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. } => {}
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => {}
};
cmd
});
Expand Down
17 changes: 14 additions & 3 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType
use mz_sql::ast::{FetchDirection, Raw, Statement};
use mz_sql::catalog::ObjectType;
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind, SideEffectingFunc};
use mz_sql::session::user::User;
use mz_sql::session::vars::{OwnedVarInput, SystemVars};
use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
Expand Down Expand Up @@ -221,6 +221,15 @@ pub enum Command {
conn_id: ConnectionId,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

/// Execute a side-effecting function from the frontend peek path.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we don't have to mention here that it's from the frontend peek path, eventually everything will come "from the frontend". But maybe you have plans to clean this up holistically already once the work is done

Copy link
Contributor Author

@ggevay ggevay Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is captured in the
"There are lots of refactorings that we could/should do after the old peek sequencing is deleted."
todo item in https://github.com/MaterializeInc/database-issues/issues/9593.

ExecuteSideEffectingFunc {
plan: SideEffectingFunc,
conn_id: ConnectionId,
/// The current role of the session, used for RBAC checks.
current_role: RoleId,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},
}

impl Command {
Expand All @@ -247,7 +256,8 @@ impl Command {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. } => None,
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
}
}

Expand All @@ -274,7 +284,8 @@ impl Command {
| Command::GetTransactionReadHoldsBundle { .. }
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. } => None,
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Message {
Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
Command::ExecuteCopyTo { .. } => "execute-copy-to",
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
},
Message::ControllerReady {
controller: ControllerReadiness::Compute,
Expand Down
7 changes: 5 additions & 2 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,18 +916,21 @@ pub struct GroupCommitPermit {
_permit: Option<OwnedSemaphorePermit>,
}

/// When we start a [`Session`] we need to update some builtin tables, we don't want to wait for
/// When we start a [`Session`] we need to update some builtin tables, but we don't want to wait for
/// these writes to complete for two reasons:
///
/// 1. Doing a write can take a relatively long time.
/// 2. Decoupling the write from the session start allows us to batch multiple writes together, if
/// sessions are being created with a high frequency.
///
/// So as an optimization we do not wait for these writes to complete. But if a [`Session`] tries
/// So, as an optimization we do not wait for these writes to complete. But if a [`Session`] tries
/// to query any of these builtin objects, we need to block that query on the writes completing to
/// maintain linearizability.
///
/// Warning: this already clears the wait flag (i.e., it calls `clear_builtin_table_updates`).
///
/// TODO(peek-seq): After we delete the old peek sequencing, we can remove the first component of
/// the return tuple.
pub(crate) fn waiting_on_startup_appends(
catalog: &Catalog,
session: &mut Session,
Expand Down
12 changes: 12 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,18 @@ impl Coordinator {
)
.await;
}

Command::ExecuteSideEffectingFunc {
plan,
conn_id,
current_role,
tx,
} => {
let result = self
.execute_side_effecting_func(plan, conn_id, current_role)
.await;
let _ = tx.send(result);
}
}
}
.instrument(debug_span!("handle_command"))
Expand Down
62 changes: 62 additions & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,68 @@ impl Coordinator {
}
}

/// Execute a side-effecting function from the frontend peek path.
/// This is separate from `sequence_side_effecting_func` because
/// - It doesn't have an ExecuteContext.
/// - It needs to do its own RBAC check, because the `rbac::check_plan` call in the frontend
/// peek sequencing can't look at `active_conns`.
///
/// TODO(peek-seq): Delete `sequence_side_effecting_func` after we delete the old peek
/// sequencing.
pub(crate) async fn execute_side_effecting_func(
&mut self,
plan: SideEffectingFunc,
conn_id: ConnectionId,
current_role: RoleId,
) -> Result<ExecuteResponse, AdapterError> {
match plan {
SideEffectingFunc::PgCancelBackend { connection_id } => {
if conn_id.unhandled() == connection_id {
// As a special case, if we're canceling ourselves, we return
// a canceled response to the client issuing the query,
// and so we need to do no further processing of the cancel.
return Err(AdapterError::Canceled);
}

// Perform RBAC check: the current user must be a member of the role
// that owns the connection being cancelled.
if let Some((_id_handle, conn_meta)) =
self.active_conns.get_key_value(&connection_id)
{
let target_role = *conn_meta.authenticated_role_id();
let role_membership = self
.catalog()
.state()
.collect_role_membership(&current_role);
if !role_membership.contains(&target_role) {
let target_role_name = self
.catalog()
.try_get_role(&target_role)
.map(|role| role.name().to_string())
.unwrap_or_else(|| target_role.to_string());
return Err(AdapterError::Unauthorized(
rbac::UnauthorizedError::RoleMembership {
role_names: vec![target_role_name],
},
));
}

// RBAC check passed, proceed with cancellation.
let id_handle = self
.active_conns
.get_key_value(&connection_id)
.map(|(id, _)| id.clone())
.expect("checked above");
self.handle_privileged_cancel(id_handle).await;
Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::True])))
} else {
// Connection not found, return false.
Ok(Self::send_immediate_rows(Row::pack_slice(&[Datum::False])))
}
}
}
}

/// Inner method that performs the actual real-time recency timestamp determination.
/// This is called by both the old peek sequencing code (via `determine_real_time_recent_timestamp`)
/// and the new command handler for `Command::DetermineRealTimeRecentTimestamp`.
Expand Down
70 changes: 47 additions & 23 deletions src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mz_expr::{CollectionPlan, ResultSpec};
use mz_ore::cast::{CastFrom, CastLossy};
use mz_ore::collections::CollectionExt;
use mz_ore::now::EpochMillis;
use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log};
use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
Expand All @@ -28,7 +28,7 @@ use mz_sql::plan::{self, Plan, QueryWhen};
use mz_sql::rbac;
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::vars::IsolationLevel;
use mz_sql_parser::ast::{CopyDirection, ExplainStage, Statement};
use mz_sql_parser::ast::{CopyDirection, CopyRelation, ExplainStage, Statement};
use mz_transform::EmptyStatisticsOracle;
use mz_transform::dataflow::DataflowMetainfo;
use opentelemetry::trace::TraceContextExt;
Expand Down Expand Up @@ -142,12 +142,12 @@ impl PeekClient {
}
}
Statement::ExplainPushdown(explain_stmt) => {
// Only handle EXPLAIN FILTER PUSHDOWN for SELECT statements
// Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements
match &explain_stmt.explainee {
mz_sql_parser::ast::Explainee::Select(..) => {}
mz_sql_parser::ast::Explainee::Select(_, false) => {}
_ => {
debug!(
"Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query"
"Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a (non-BROKEN) SELECT query"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "is for BROKEN SELECT query" instead of is not for a non-broken? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this in my next PR (statement logging: #34305), to avoid some merge conflicts, if that's ok.

);
return Ok(None);
}
Expand All @@ -156,7 +156,14 @@ impl PeekClient {
Statement::Copy(copy_stmt) => {
match &copy_stmt.direction {
CopyDirection::To => {
// This is COPY TO, continue
// Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe
if matches!(&copy_stmt.relation, CopyRelation::Subscribe(_)) {
debug!(
"Bailing out from try_frontend_peek_inner, because COPY (SUBSCRIBE ...) TO is not supported"
);
return Ok(None);
}
// This is COPY TO (SELECT), continue
}
CopyDirection::From => {
debug!(
Expand Down Expand Up @@ -186,6 +193,7 @@ impl PeekClient {

let pcx = session.pcx();
let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, &params, &resolved_ids)?;

let (select_plan, explain_ctx, copy_to_ctx) = match &plan {
Plan::Select(select_plan) => {
let explain_ctx = if session.vars().emit_plan_insights_notice() {
Expand Down Expand Up @@ -253,16 +261,42 @@ impl PeekClient {
(plan, explain_ctx, None)
}
_ => {
// This shouldn't happen because we already checked for this at the AST
// level before calling `try_frontend_peek_inner`.
soft_panic_or_log!(
"unexpected EXPLAIN FILTER PUSHDOWN plan kind in frontend peek sequencing: {:?}",
explainee
);
debug!(
"Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is EXPLAIN BROKEN"
);
return Ok(None);
}
}
}
Plan::SideEffectingFunc(sef_plan) => {
// Side-effecting functions need Coordinator state (e.g., active_conns),
// so delegate to the Coordinator via a Command.
// The RBAC check is performed in the Coordinator where active_conns is available.
let response = self
.call_coordinator(|tx| Command::ExecuteSideEffectingFunc {
plan: sef_plan.clone(),
conn_id: session.conn_id().clone(),
current_role: session.role_metadata().current_role,
tx,
})
.await?;
return Ok(Some(response));
}
_ => {
// This shouldn't happen because we already checked for this at the AST
// level before calling `try_frontend_peek_inner`.
soft_panic_or_log!(
"Unexpected plan kind in frontend peek sequencing: {:?}",
plan
);
debug!(
"Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO"
"Bailing out from try_frontend_peek_inner, because the Plan is not a SELECT, side-effecting SELECT, EXPLAIN SELECT, EXPLAIN FILTER PUSHDOWN, or COPY TO S3"
);
return Ok(None);
}
Expand Down Expand Up @@ -296,29 +330,19 @@ impl PeekClient {

rbac::check_plan(
&conn_catalog,
// We can't look at `active_conns` here, but that's ok, because this case was handled
// above already inside `Command::ExecuteSideEffectingFunc`.
None::<fn(u32) -> Option<RoleId>>,
session,
&plan,
Some(target_cluster_id),
&resolved_ids,
)?;

// Check if we're still waiting for any of the builtin table appends from when we
// started the Session to complete.
//
// (This is done slightly earlier in the normal peek sequencing, but we have to be past the
// last use of `conn_catalog` here.)
if let Some(_) = coord::appends::waiting_on_startup_appends(&*catalog, session, &plan) {
// TODO(peek-seq): Don't fall back to the coordinator's peek sequencing here, but call
// `defer_op`. Needs `ExecuteContext`.
// This fallback is currently causing a bug: `waiting_on_startup_appends` has the
// side effect that it already clears the wait flag, and therefore the old peek
// sequencing that we fall back to here won't do waiting. This is tested by
// `test_mz_sessions` and `test_pg_cancel_dropped_role`, where I've disabled the
// frontend peek sequencing for now. This bug will just go away once we don't fall back
// to the old peek sequencing here, but properly call `defer_op` instead.
debug!("Bailing out from try_frontend_peek_inner, because waiting_on_startup_appends");
return Ok(None);
if let Some((_, wait_future)) =
coord::appends::waiting_on_startup_appends(&*catalog, session, &plan)
{
wait_future.await;
}

let max_query_result_size = Some(session.vars().max_query_result_size());
Expand Down
12 changes: 0 additions & 12 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2604,11 +2604,6 @@ fn test_dont_drop_sinks_twice() {
#[mz_ore::test]
fn test_mz_sessions() {
let server = test_util::TestHarness::default().start_blocking();
// TODO(peek-seq): This would currently fail with the flag on, indicating a bug in the frontend
// peek sequencing. Re-enable this once we never fall back from the frontend peek sequencing to
// the old peek sequencing. See comment on the call to `waiting_on_startup_appends` in
// `frontend_peek.rs`.
server.disable_feature_flags(&["enable_frontend_peek_sequencing"]);

let mut foo_client = server
.pg_config()
Expand Down Expand Up @@ -3169,13 +3164,6 @@ fn test_params() {
#[mz_ore::test]
fn test_pg_cancel_dropped_role() {
let server = test_util::TestHarness::default().start_blocking();

// TODO(peek-seq): This would currently fail with the flag on, indicating a bug in the frontend
// peek sequencing. Re-enable this once we never fall back from the frontend peek sequencing to
// the old peek sequencing. See comment on the call to `waiting_on_startup_appends` in
// `frontend_peek.rs`.
server.disable_feature_flags(&["enable_frontend_peek_sequencing"]);

let dropped_role = "r1";

let mut query_client = server.connect(postgres::NoTls).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/sql/src/plan/side_effecting_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ use crate::plan::{PlanError, QueryContext};
/// effects.
///
/// See the module docs for details.
#[derive(Debug, EnumKind)]
#[derive(Debug, EnumKind, Clone)]
#[enum_kind(SefKind)]
pub enum SideEffectingFunc {
/// The `pg_cancel_backend` function, .
/// The `pg_cancel_backend` function.
PgCancelBackend {
// The ID of the connection to cancel.
connection_id: u32,
Expand Down
3 changes: 3 additions & 0 deletions src/sql/src/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ pub fn check_plan(
catalog: &impl SessionCatalog,
// Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
// Only required for Plan::SideEffectingFunc; can be None for other plan types.
// TODO(peek-seq): Remove this when deleting the old peek sequencing. The logic here that uses
// `active_conns` is mirrored in `execute_side_effecting_func`, which is what the frontend peek
// sequencing uses.
active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
session: &dyn SessionMetadata,
plan: &Plan,
Expand Down
2 changes: 2 additions & 0 deletions test/sqllogictest/explain/broken_statements.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ EXPLAIN PHYSICAL PLAN AS VERBOSE TEXT FOR
CREATE MATERIALIZED VIEW mv AS
SELECT pg_catalog.now();

statement error db error: ERROR: EXPLAIN FILTER PUSHDOWN queries for this explainee type are not supported
EXPLAIN FILTER PUSHDOWN FOR BROKEN SELECT 1;

# EXPLAIN ... BROKEN CREATE INDEX
# -------------------------------
Expand Down