From 48e6cb050a555affafe7756d7807d7eb47e5e3a8 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Wed, 17 Dec 2025 13:51:45 +0100 Subject: [PATCH 1/2] tests: Remove the disabling of frontend peeks in statement logging tests --- .../materialize/checks/all_checks/statement_logging.py | 4 ---- test/cluster/mzcompose.py | 10 ---------- 2 files changed, 14 deletions(-) diff --git a/misc/python/materialize/checks/all_checks/statement_logging.py b/misc/python/materialize/checks/all_checks/statement_logging.py index a054f6bdaae02..17597ac49711e 100644 --- a/misc/python/materialize/checks/all_checks/statement_logging.py +++ b/misc/python/materialize/checks/all_checks/statement_logging.py @@ -17,13 +17,9 @@ class StatementLogging(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( - # TODO(peek-seq): enable_frontend_peek_sequencing when it supports statement logging. """ $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET statement_logging_max_sample_rate TO 1.0 - - $[version>=2600200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr} - ALTER SYSTEM SET enable_frontend_peek_sequencing = false; """ ) ) diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index 4b40b999f23c6..6bbfa970043ab 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -3710,16 +3710,6 @@ def workflow_statement_logging(c: Composition, parser: WorkflowArgumentParser) - user="mz_system", ) - # TODO(peek-seq): enable_frontend_peek_sequencing when it supports statement logging. - c.testdrive( - input=dedent( - """ - $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr} - ALTER SYSTEM SET enable_frontend_peek_sequencing = false; - """ - ) - ) - c.run_testdrive_files("statement-logging/statement-logging.td") From fbc92bef2e7486526aab2fdfd1062623ce1bb03d Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Thu, 27 Nov 2025 13:17:53 +0100 Subject: [PATCH 2/2] Frontend peek: statement logging --- src/adapter/src/client.rs | 34 +- src/adapter/src/command.rs | 51 +- src/adapter/src/coord.rs | 36 +- src/adapter/src/coord/catalog_implications.rs | 3 +- src/adapter/src/coord/command_handler.rs | 102 ++- src/adapter/src/coord/peek.rs | 52 +- src/adapter/src/coord/sequencer/inner.rs | 2 +- src/adapter/src/coord/sequencer/inner/peek.rs | 54 +- src/adapter/src/coord/statement_logging.rs | 593 ++++++------- src/adapter/src/frontend_peek.rs | 319 +++++-- src/adapter/src/metrics.rs | 24 +- src/adapter/src/peek_client.rs | 160 +++- src/adapter/src/session.rs | 2 +- src/adapter/src/statement_logging.rs | 793 +++++++++++++++++- src/compute-client/src/controller/instance.rs | 4 + src/controller/src/lib.rs | 27 +- src/environmentd/src/test_util.rs | 5 + src/environmentd/tests/server.rs | 156 +++- src/pgwire/src/protocol.rs | 2 + src/sql/src/session/vars/definitions.rs | 2 +- .../statement-logging/statement-logging.td | 21 + 21 files changed, 1847 insertions(+), 595 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 3b85a9efeaea6..52a4ad8fb2bee 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -264,6 +264,7 @@ impl Client { transient_id_gen, optimizer_metrics, persist_client, + statement_logging_frontend, } = response; let peek_client = PeekClient::new( @@ -272,6 +273,7 @@ impl Client { transient_id_gen, optimizer_metrics, persist_client, + statement_logging_frontend, ); let mut client = SessionClient { @@ -692,6 +694,9 @@ impl SessionClient { /// Executes a previously-bound portal. /// /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop. + /// + /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH + /// triggering the execution of the underlying query. #[mz_ore::instrument(level = "debug")] pub async fn execute( &mut self, @@ -704,11 +709,21 @@ impl SessionClient { // Attempt peek sequencing in the session task. // If unsupported, fall back to the Coordinator path. // TODO(peek-seq): wire up cancel_future - if let Some(resp) = self.try_frontend_peek(&portal_name).await? { + let mut outer_ctx_extra = outer_ctx_extra; + if let Some(resp) = self + .try_frontend_peek(&portal_name, &mut outer_ctx_extra) + .await? + { debug!("frontend peek succeeded"); + // Frontend peek handled the execution and retired outer_ctx_extra if it existed. + // No additional work needed here. return Ok((resp, execute_started)); } else { - debug!("frontend peek did not happen"); + debug!("frontend peek did not happen, falling back to `Command::Execute`"); + // If we bailed out, outer_ctx_extra is still present (if it was originally). + // `Command::Execute` will handle it. + // (This is not true if we bailed out _after_ the frontend peek sequencing has already + // begun its own statement logging. That case would be a bug.) } let response = self @@ -1020,7 +1035,10 @@ impl SessionClient { | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } | Command::ExecuteCopyTo { .. } - | Command::ExecuteSideEffectingFunc { .. } => {} + | Command::ExecuteSideEffectingFunc { .. } + | Command::RegisterFrontendPeek { .. } + | Command::UnregisterFrontendPeek { .. } + | Command::FrontendStatementLogging(..) => {} }; cmd }); @@ -1105,16 +1123,20 @@ impl SessionClient { /// Attempt to sequence a peek from the session task. /// - /// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's - /// peek sequencing. + /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the + /// Coordinator's sequencing. If it returns an error, it should be returned to the user. + /// + /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH + /// triggering the execution of the underlying query. pub(crate) async fn try_frontend_peek( &mut self, portal_name: &str, + outer_ctx_extra: &mut Option, ) -> Result, AdapterError> { if self.enable_frontend_peek_sequencing { let session = self.session.as_mut().expect("SessionClient invariant"); self.peek_client - .try_frontend_peek_inner(portal_name, session) + .try_frontend_peek(portal_name, session, outer_ctx_extra) .await } else { Ok(None) diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index d6b13a3544323..fb0d95eb8115e 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -50,7 +50,11 @@ use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary}; use crate::coord::timestamp_selection::TimestampDetermination; use crate::error::AdapterError; use crate::session::{EndTransactionAction, RowBatchStream, Session}; -use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; +use crate::statement_logging::WatchSetCreation; +use crate::statement_logging::{ + FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy, + StatementLoggingFrontend, +}; use crate::util::Transmittable; use crate::webhook::AppendWebhookResponse; use crate::{AdapterNotice, AppendWebhookError, ReadHolds}; @@ -210,6 +214,9 @@ pub enum Command { conn_id: ConnectionId, max_result_size: u64, max_query_result_size: Option, + /// If statement logging is enabled, contains all info needed for installing watch sets + /// and logging the statement execution. + watch_set: Option, tx: oneshot::Sender>, }, @@ -219,6 +226,9 @@ pub enum Command { target_replica: Option, source_ids: BTreeSet, conn_id: ConnectionId, + /// If statement logging is enabled, contains all info needed for installing watch sets + /// and logging the statement execution. + watch_set: Option, tx: oneshot::Sender>, }, @@ -230,6 +240,34 @@ pub enum Command { current_role: RoleId, tx: oneshot::Sender>, }, + + /// Register a pending peek initiated by frontend sequencing. This is needed for: + /// - statement logging + /// - query cancellation + RegisterFrontendPeek { + uuid: Uuid, + conn_id: ConnectionId, + cluster_id: mz_controller_types::ClusterId, + depends_on: BTreeSet, + is_fast_path: bool, + /// If statement logging is enabled, contains all info needed for installing watch sets + /// and logging the statement execution. + watch_set: Option, + tx: oneshot::Sender>, + }, + + /// Unregister a pending peek that was registered but failed to issue. + /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds. + /// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the + /// frontend will log the error. + UnregisterFrontendPeek { + uuid: Uuid, + tx: oneshot::Sender<()>, + }, + + /// Statement logging event from frontend peek sequencing. + /// No response channel needed - this is fire-and-forget. + FrontendStatementLogging(FrontendStatementLoggingEvent), } impl Command { @@ -257,7 +295,10 @@ impl Command { | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } | Command::ExecuteCopyTo { .. } - | Command::ExecuteSideEffectingFunc { .. } => None, + | Command::ExecuteSideEffectingFunc { .. } + | Command::RegisterFrontendPeek { .. } + | Command::UnregisterFrontendPeek { .. } + | Command::FrontendStatementLogging(..) => None, } } @@ -285,7 +326,10 @@ impl Command { | Command::StoreTransactionReadHolds { .. } | Command::ExecuteSlowPathPeek { .. } | Command::ExecuteCopyTo { .. } - | Command::ExecuteSideEffectingFunc { .. } => None, + | Command::ExecuteSideEffectingFunc { .. } + | Command::RegisterFrontendPeek { .. } + | Command::UnregisterFrontendPeek { .. } + | Command::FrontendStatementLogging(..) => None, } } } @@ -318,6 +362,7 @@ pub struct StartupResponse { pub transient_id_gen: Arc, pub optimizer_metrics: OptimizerMetrics, pub persist_client: PersistClient, + pub statement_logging_frontend: StatementLoggingFrontend, } /// The response to [`Client::authenticate`](crate::Client::authenticate). diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2aae0dd3bbd46..752982fe2b308 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -104,7 +104,9 @@ use mz_catalog::memory::objects::{ }; use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent}; use mz_compute_client::as_of_selection; -use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing}; +use mz_compute_client::controller::error::{ + CollectionLookupError, DataflowCreationError, InstanceMissing, +}; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::Plan; @@ -190,7 +192,7 @@ use crate::coord::cluster_scheduling::SchedulingDecision; use crate::coord::id_bundle::CollectionIdBundle; use crate::coord::introspection::IntrospectionSubscribe; use crate::coord::peek::PendingPeek; -use crate::coord::statement_logging::{StatementLogging, StatementLoggingId}; +use crate::coord::statement_logging::StatementLogging; use crate::coord::timeline::{TimelineContext, TimelineState}; use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination}; use crate::coord::validity::PlanValidity; @@ -203,7 +205,9 @@ use crate::optimize::dataflows::{ }; use crate::optimize::{self, Optimize, OptimizerConfig}; use crate::session::{EndTransactionAction, Session}; -use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent}; +use crate::statement_logging::{ + StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId, +}; use crate::util::{ClientTransmitter, ResultExt}; use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter}; use crate::{AdapterNotice, ReadHolds, flags}; @@ -374,6 +378,9 @@ impl Message { Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek", Command::ExecuteCopyTo { .. } => "execute-copy-to", Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func", + Command::RegisterFrontendPeek { .. } => "register-frontend-peek", + Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek", + Command::FrontendStatementLogging(..) => "frontend-statement-logging", }, Message::ControllerReady { controller: ControllerReadiness::Compute, @@ -1332,7 +1339,7 @@ impl ExecuteContextExtra { /// called from code that knows what to do to finish up logging /// based on the inner value. #[must_use] - fn retire(mut self) -> Option { + pub(crate) fn retire(mut self) -> Option { let Self { statement_uuid } = &mut self; statement_uuid.take() } @@ -1356,14 +1363,13 @@ impl Drop for ExecuteContextExtra { /// /// This struct collects a bundle of state that needs to be threaded /// through various functions as part of statement execution. -/// Currently, it is only used to finalize execution, by calling one -/// of the methods `retire` or `retire_aysnc`. Finalizing execution +/// It is used to finalize execution, by calling `retire`. Finalizing execution /// involves sending the session back to the pgwire layer so that it -/// may be used to process further commands. In the future, it will -/// also involve performing some work on the main coordinator thread +/// may be used to process further commands. It also involves +/// performing some work on the main coordinator thread /// (e.g., recording the time at which the statement finished -/// executing) the state necessary to perform this work is bundled in -/// the `ExecuteContextExtra` object (today, it is simply empty). +/// executing). The state necessary to perform this work is bundled in +/// the `ExecuteContextExtra` object. #[derive(Debug)] pub struct ExecuteContext { inner: Box, @@ -3762,13 +3768,14 @@ impl Coordinator { objects: BTreeSet, t: Timestamp, state: WatchSetResponse, - ) { - let ws_id = self.controller.install_compute_watch_set(objects, t); + ) -> Result<(), CollectionLookupError> { + let ws_id = self.controller.install_compute_watch_set(objects, t)?; self.connection_watch_sets .entry(conn_id.clone()) .or_default() .insert(ws_id); self.installed_watch_sets.insert(ws_id, (conn_id, state)); + Ok(()) } /// Install a _watch set_ in the controller that is automatically associated with the given @@ -3780,13 +3787,14 @@ impl Coordinator { objects: BTreeSet, t: Timestamp, state: WatchSetResponse, - ) { - let ws_id = self.controller.install_storage_watch_set(objects, t); + ) -> Result<(), CollectionLookupError> { + let ws_id = self.controller.install_storage_watch_set(objects, t)?; self.connection_watch_sets .entry(conn_id.clone()) .or_default() .insert(ws_id); self.installed_watch_sets.insert(ws_id, (conn_id, state)); + Ok(()) } /// Cancels pending watchsets associated with the provided connection id. diff --git a/src/adapter/src/coord/catalog_implications.rs b/src/adapter/src/coord/catalog_implications.rs index 5a7c4bc305f50..71a77097bd3e6 100644 --- a/src/adapter/src/coord/catalog_implications.rs +++ b/src/adapter/src/coord/catalog_implications.rs @@ -62,9 +62,8 @@ use crate::coord::Coordinator; use crate::coord::catalog_implications::parsed_state_updates::{ ParsedStateUpdate, ParsedStateUpdateKind, }; -use crate::coord::statement_logging::StatementLoggingId; use crate::coord::timeline::TimelineState; -use crate::statement_logging::StatementEndedExecutionReason; +use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId}; use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt}; pub mod parsed_state_updates; diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 76a4fda6cba9b..d5d72982abe64 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -29,7 +29,7 @@ use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::{instrument, soft_panic_or_log}; use mz_repr::role_id::RoleId; -use mz_repr::{Diff, SqlScalarType, Timestamp}; +use mz_repr::{Diff, GlobalId, SqlScalarType, Timestamp}; use mz_sql::ast::{ AlterConnectionAction, AlterConnectionStatement, AlterSinkAction, AlterSourceAction, AstInfo, ConstantVisitor, CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, @@ -60,12 +60,14 @@ use opentelemetry::trace::TraceContextExt; use tokio::sync::{mpsc, oneshot}; use tracing::{Instrument, debug_span, info, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; use crate::command::{ AuthResponse, CatalogSnapshot, Command, ExecuteResponse, SASLChallengeResponse, SASLVerifyProofResponse, StartupResponse, }; use crate::coord::appends::PendingWriteTxn; +use crate::coord::peek::PendingPeek; use crate::coord::{ ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity, PurifiedStatementReady, validate_ip_with_policy_rules, @@ -73,6 +75,7 @@ use crate::coord::{ use crate::error::{AdapterError, AuthenticationError}; use crate::notice::AdapterNotice; use crate::session::{Session, TransactionOps, TransactionStatus}; +use crate::statement_logging::WatchSetCreation; use crate::util::{ClientTransmitter, ResultExt}; use crate::webhook::{ AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator, @@ -341,6 +344,7 @@ impl Coordinator { conn_id, max_result_size, max_query_result_size, + watch_set, tx, } => { let result = self @@ -355,9 +359,9 @@ impl Coordinator { conn_id, max_result_size, max_query_result_size, + watch_set, ) .await; - let _ = tx.send(result); } @@ -367,6 +371,7 @@ impl Coordinator { target_replica, source_ids, conn_id, + watch_set, tx, } => { // implement_copy_to spawns a background task that sends the response @@ -378,6 +383,7 @@ impl Coordinator { target_replica, source_ids, conn_id, + watch_set, tx, ) .await; @@ -394,6 +400,31 @@ impl Coordinator { .await; let _ = tx.send(result); } + Command::RegisterFrontendPeek { + uuid, + conn_id, + cluster_id, + depends_on, + is_fast_path, + watch_set, + tx, + } => { + self.handle_register_frontend_peek( + uuid, + conn_id, + cluster_id, + depends_on, + is_fast_path, + watch_set, + tx, + ); + } + Command::UnregisterFrontendPeek { uuid, tx } => { + self.handle_unregister_frontend_peek(uuid, tx); + } + Command::FrontendStatementLogging(event) => { + self.handle_frontend_statement_logging_event(event); + } } } .instrument(debug_span!("handle_command")) @@ -656,15 +687,24 @@ impl Coordinator { } let notify = self.builtin_table_update().background(updates); + let catalog = self.owned_catalog(); + let build_info_human_version = + catalog.state().config().build_info.human_version(None); + + let statement_logging_frontend = self + .statement_logging + .create_frontend(build_info_human_version); + let resp = Ok(StartupResponse { role_id, write_notify: notify, session_defaults, - catalog: self.owned_catalog(), + catalog, storage_collections: Arc::clone(&self.controller.storage_collections), transient_id_gen: Arc::clone(&self.transient_id_gen), optimizer_metrics: self.optimizer_metrics.clone(), persist_client: self.persist_client.clone(), + statement_logging_frontend, }); if tx.send(resp).is_err() { // Failed to send to adapter, but everything is setup so we can terminate @@ -1823,4 +1863,60 @@ impl Coordinator { }); let _ = tx.send(response); } + + /// Handle registration of a frontend peek, for statement logging and query cancellation + /// handling. + fn handle_register_frontend_peek( + &mut self, + uuid: Uuid, + conn_id: ConnectionId, + cluster_id: mz_controller_types::ClusterId, + depends_on: BTreeSet, + is_fast_path: bool, + watch_set: Option, + tx: oneshot::Sender>, + ) { + let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id); + if let Some(ws) = watch_set { + if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) { + let _ = tx.send(Err( + AdapterError::concurrent_dependency_drop_from_collection_lookup_error( + e, cluster_id, + ), + )); + return; + } + } + + // Store the peek in pending_peeks for later retrieval when results arrive + self.pending_peeks.insert( + uuid, + PendingPeek { + conn_id: conn_id.clone(), + cluster_id, + depends_on, + ctx_extra: ExecuteContextExtra::new(statement_logging_id), + is_fast_path, + }, + ); + + // Also track it by connection ID for cancellation support + self.client_pending_peeks + .entry(conn_id) + .or_default() + .insert(uuid, cluster_id); + + let _ = tx.send(Ok(())); + } + + /// Handle unregistration of a frontend peek that was registered but failed to issue. + /// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds. + fn handle_unregister_frontend_peek(&mut self, uuid: Uuid, tx: oneshot::Sender<()>) { + // Remove from pending_peeks (this also removes from client_pending_peeks) + if let Some(pending_peek) = self.remove_pending_peek(&uuid) { + // Retire `ExecuteContextExtra`, because the frontend will log the peek's error result. + let _ = pending_peek.ctx_extra.retire(); + } + let _ = tx.send(()); + } } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 659f2b91f5109..16266713cd753 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -58,6 +58,7 @@ use uuid::Uuid; use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo}; use crate::coord::timestamp_selection::TimestampDetermination; use crate::optimize::OptimizerError; +use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; use crate::util::ResultExt; use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse}; @@ -888,6 +889,8 @@ impl crate::coord::Coordinator { } /// Creates an async stream that processes peek responses and yields rows. + /// + /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing. #[mz_ore::instrument(level = "debug")] pub(crate) fn create_peek_response_stream( rows_rx: tokio::sync::oneshot::Receiver, @@ -1195,7 +1198,7 @@ impl crate::coord::Coordinator { /// This is called from the command handler for ExecuteSlowPathPeek. /// /// (For now, this method simply delegates to implement_peek_plan by constructing - /// the necessary PlannedPeek structure and a minimal ExecuteContext.) + /// the necessary PlannedPeek structure.) pub(crate) async fn implement_slow_path_peek( &mut self, dataflow_plan: PeekDataflowPlan, @@ -1208,7 +1211,23 @@ impl crate::coord::Coordinator { conn_id: ConnectionId, max_result_size: u64, max_query_result_size: Option, + watch_set: Option, ) -> Result { + // Install watch sets for statement lifecycle logging if enabled. + // This must happen _before_ creating ExecuteContextExtra, so that if it fails, + // we don't have an ExecuteContextExtra that needs to be retired (the frontend + // will handle logging for the error case). + let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id); + if let Some(ws) = watch_set { + self.install_peek_watch_sets(conn_id.clone(), ws) + .map_err(|e| { + AdapterError::concurrent_dependency_drop_from_collection_lookup_error( + e, + compute_instance, + ) + })?; + } + let source_arity = intermediate_result_type.arity(); let planned_peek = PlannedPeek { @@ -1220,16 +1239,12 @@ impl crate::coord::Coordinator { source_ids, }; - // Create a minimal ExecuteContext - // TODO(peek-seq): Use the real context once we have statement logging. - let mut ctx_extra = ExecuteContextExtra::default(); - // Call the old peek sequencing's implement_peek_plan for now. // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the // relevant parts of the old `implement_peek_plan` into this method, and remove the old // `implement_peek_plan`. self.implement_peek_plan( - &mut ctx_extra, + &mut ExecuteContextExtra::new(statement_logging_id), planned_peek, finishing, compute_instance, @@ -1240,8 +1255,8 @@ impl crate::coord::Coordinator { .await } - /// Implements a COPY TO command by validating S3 connection, shipping the dataflow, - /// and spawning a background task to wait for completion. + /// Implements a COPY TO command by installing peek watch sets, validating S3 connection, + /// shipping the dataflow, and spawning a background task to wait for completion. /// This is called from the command handler for ExecuteCopyTo. /// /// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow @@ -1257,6 +1272,7 @@ impl crate::coord::Coordinator { target_replica: Option, source_ids: BTreeSet, conn_id: ConnectionId, + watch_set: Option, tx: oneshot::Sender>, ) { // Helper to send error and return early @@ -1265,6 +1281,23 @@ impl crate::coord::Coordinator { let _ = tx.send(Err(e)); }; + // Install watch sets for statement lifecycle logging if enabled. + // If this fails, we just send the error back. The frontend will handle logging + // for the error case (no ExecuteContextExtra is created here). + if let Some(ws) = watch_set { + if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) { + let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error( + e, + compute_instance, + ); + send_err(tx, err); + return; + } + } + + // Note: We don't create an ExecuteContextExtra here because the frontend handles + // all statement logging for COPY TO operations. + let sink_id = df_desc.sink_id(); // # Inlined from peek_copy_to_preflight @@ -1338,7 +1371,7 @@ impl crate::coord::Coordinator { // created. If we don't do this, the sink_id remains in drop_sinks but no collection // exists in the compute controller, causing a panic when the connection terminates. self.remove_active_compute_sink(sink_id).await; - let _ = tx.send(Err(e)); + send_err(tx, e); return; } @@ -1355,6 +1388,7 @@ impl crate::coord::Coordinator { Ok(res) => res, Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())), }; + let _ = tx.send(result); } .instrument(span), diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 6108ee89c77ea..53259d08bfdd9 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -3339,7 +3339,7 @@ impl Coordinator { plan_validity, read_hold, }), - ); + ).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then"); } #[instrument] diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 156150a4362f4..9ceafa4f604e9 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -12,7 +12,6 @@ use std::sync::Arc; use itertools::Either; use mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION; -use mz_catalog::memory::objects::CatalogItem; use mz_compute_types::sinks::ComputeSinkConnection; use mz_controller_types::ClusterId; use mz_expr::{CollectionPlan, ResultSpec}; @@ -45,7 +44,7 @@ use crate::coord::{ Coordinator, CopyToContext, ExecuteContext, ExplainContext, ExplainPlanContext, Message, PeekStage, PeekStageCopyTo, PeekStageExplainPlan, PeekStageExplainPushdown, PeekStageFinish, PeekStageLinearizeTimestamp, PeekStageOptimize, PeekStageRealTimeRecency, - PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, WatchSetResponse, + PeekStageTimestampReadHold, PlanValidity, StageResult, Staged, TargetCluster, }; use crate::error::AdapterError; use crate::explain::insights::PlanInsightsContext; @@ -54,6 +53,7 @@ use crate::notice::AdapterNotice; use crate::optimize::{self, Optimize}; use crate::session::{RequireLinearization, Session, TransactionOps, TransactionStatus}; use crate::statement_logging::StatementLifecycleEvent; +use crate::statement_logging::WatchSetCreation; impl Staged for PeekStage { type Ctx = ExecuteContext; @@ -845,50 +845,14 @@ impl Coordinator { } } - if let Some(uuid) = ctx.extra().contents() { - let ts = determination.timestamp_context.timestamp_or_default(); - let mut transitive_storage_deps = BTreeSet::new(); - let mut transitive_compute_deps = BTreeSet::new(); - for item_id in id_bundle - .iter() - .map(|gid| self.catalog.state().get_entry_by_global_id(&gid).id()) - .flat_map(|id| self.catalog.state().transitive_uses(id)) - { - let entry = self.catalog.state().get_entry(&item_id); - match entry.item() { - // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect. - // For example, this peek may depend on just a single version of a table, but - // we would add dependencies on all versions of said table. Doing this is okay - // for now since we can't yet version tables, but should get fixed. - CatalogItem::Table(_) | CatalogItem::Source(_) => { - transitive_storage_deps.extend(entry.global_ids()); - } - // Each catalog item is computed by at most one compute collection at a time, - // which is also the most recent one. - CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => { - transitive_compute_deps.insert(entry.latest_global_id()); - } - _ => {} - } - } - self.install_storage_watch_set( - conn_id.clone(), - transitive_storage_deps, - ts, - WatchSetResponse::StatementDependenciesReady( - uuid, - StatementLifecycleEvent::StorageDependenciesFinished, - ), + if let Some(logging_id) = ctx.extra().contents() { + let watch_set = WatchSetCreation::new( + logging_id, + self.catalog.state(), + &id_bundle, + determination.timestamp_context.timestamp_or_default(), ); - self.install_compute_watch_set( - conn_id, - transitive_compute_deps, - ts, - WatchSetResponse::StatementDependenciesReady( - uuid, - StatementLifecycleEvent::ComputeDependenciesFinished, - ), - ) + self.install_peek_watch_sets(conn_id.clone(), watch_set).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets"); } let max_result_size = self.catalog().system_config().max_result_size(); diff --git a/src/adapter/src/coord/statement_logging.rs b/src/adapter/src/coord/statement_logging.rs index bb0624353f3a3..4f8a9d165a324 100644 --- a/src/adapter/src/coord/statement_logging.rs +++ b/src/adapter/src/coord/statement_logging.rs @@ -8,120 +8,40 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; -use bytes::BytesMut; +use mz_adapter_types::connection::ConnectionId; +use mz_compute_client::controller::error::CollectionLookupError; use mz_controller_types::ClusterId; -use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime}; +use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime}; use mz_ore::task::spawn; -use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis}; -use mz_repr::adt::array::ArrayDimension; +use mz_ore::{cast::CastFrom, cast::CastInto}; use mz_repr::adt::timestamp::TimestampLike; -use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp}; -use mz_sql::ast::display::AstDisplay; -use mz_sql::ast::{AstInfo, Statement}; +use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp}; use mz_sql::plan::Params; use mz_sql::session::metadata::SessionMetadata; -use mz_sql_parser::ast::{StatementKind, statement_kind_label_value}; use mz_storage_client::controller::IntrospectionType; use qcell::QCell; use rand::SeedableRng; -use rand::distr::{Bernoulli, Distribution}; use sha2::{Digest, Sha256}; use tokio::time::MissedTickBehavior; -use tracing::debug; use uuid::Uuid; -use crate::coord::{ConnMeta, Coordinator}; +use crate::coord::{ConnMeta, Coordinator, WatchSetResponse}; use crate::session::{LifecycleTimestamps, Session}; use crate::statement_logging::{ + FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo, SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason, - StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord, + StatementEndedExecutionRecord, StatementLifecycleEvent, StatementLoggingFrontend, + StatementLoggingId, StatementPreparedRecord, ThrottlingState, WatchSetCreation, + create_began_execution_record, effective_sample_rate, pack_statement_began_execution_update, + pack_statement_execution_inner, pack_statement_prepared_update, should_sample_statement, }; use super::Message; -/// Metadata required for logging a prepared statement. -#[derive(Debug)] -pub enum PreparedStatementLoggingInfo { - /// The statement has already been logged; we don't need to log it - /// again if a future execution hits the sampling rate; we merely - /// need to reference the corresponding UUID. - AlreadyLogged { uuid: Uuid }, - /// The statement has not yet been logged; if a future execution - /// hits the sampling rate, we need to log it at that point. - StillToLog { - /// The SQL text of the statement. - sql: String, - /// The SQL text of the statement, redacted to follow our data management - /// policy - redacted_sql: String, - /// When the statement was prepared - prepared_at: EpochMillis, - /// The name with which the statement was prepared - name: String, - /// The ID of the session that prepared the statement - session_id: Uuid, - /// Whether we have already recorded this in the "would have logged" metric - accounted: bool, - /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement - kind: Option, - - /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`] - /// constructor. - _sealed: sealed::Private, - }, -} - -impl PreparedStatementLoggingInfo { - /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL - /// statements are properly redacted. - pub fn still_to_log( - raw_sql: String, - stmt: Option<&Statement>, - prepared_at: EpochMillis, - name: String, - session_id: Uuid, - accounted: bool, - ) -> Self { - let kind = stmt.map(StatementKind::from); - let sql = match kind { - // Always redact SQL statements that may contain sensitive information. - // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them. - // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both - // data privacy and to avoid logging excessive data. - Some( - StatementKind::CreateSecret - | StatementKind::AlterSecret - | StatementKind::Insert - | StatementKind::Update - | StatementKind::Execute, - ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(), - _ => raw_sql, - }; - - PreparedStatementLoggingInfo::StillToLog { - sql, - redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(), - prepared_at, - name, - session_id, - accounted, - kind, - _sealed: sealed::Private, - } - } -} - -#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)] -pub struct StatementLoggingId(Uuid); - -#[derive(Debug)] -pub(crate) struct PreparedStatementEvent { - prepared_statement: Row, - sql_text: Row, -} - +/// Statement logging state in the Coordinator. #[derive(Debug)] pub(crate) struct StatementLogging { /// Information about statement executions that have been logged @@ -140,87 +60,146 @@ pub(crate) struct StatementLogging { /// A reproducible RNG for deciding whether to sample statement executions. /// Only used by tests; otherwise, `rand::rng()` is used. /// Controlled by the system var `statement_logging_use_reproducible_rng`. - reproducible_rng: rand_chacha::ChaCha8Rng, + /// This same instance will be used by all frontend tasks. + reproducible_rng: Arc>, + /// Events to be persisted periodically. pending_statement_execution_events: Vec<(Row, Diff)>, pending_prepared_statement_events: Vec, pending_session_events: Vec, pending_statement_lifecycle_events: Vec, - now: NowFn, + /// Shared throttling state for rate-limiting statement logging. + pub(crate) throttling_state: Arc, - /// The number of bytes that we are allowed to emit for statement logging without being throttled. - /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second, - /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`]. - tokens: u64, - /// The last time at which a statement was logged. - last_logged_ts_seconds: u64, - /// The number of statements that have been throttled since the last successfully logged statement. - throttled_count: usize, + /// Function to get the current time. + pub(crate) now: NowFn, } impl StatementLogging { + const REPRODUCIBLE_RNG_SEED: u64 = 42; + pub(crate) fn new(now: NowFn) -> Self { - let last_logged_ts_seconds = (now)() / 1000; Self { executions_begun: BTreeMap::new(), unlogged_sessions: BTreeMap::new(), - reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42), + reproducible_rng: Arc::new(Mutex::new(rand_chacha::ChaCha8Rng::seed_from_u64( + Self::REPRODUCIBLE_RNG_SEED, + ))), pending_statement_execution_events: Vec::new(), pending_prepared_statement_events: Vec::new(), pending_session_events: Vec::new(), pending_statement_lifecycle_events: Vec::new(), - tokens: 0, - last_logged_ts_seconds, - now: now.clone(), - throttled_count: 0, + throttling_state: Arc::new(ThrottlingState::new(&now)), + now, } } - /// Check if we need to drop a statement - /// due to throttling, and update the number of available tokens appropriately. + /// Create a `StatementLoggingFrontend` for use by frontend peek sequencing. /// - /// Returns `false` if we must throttle this statement, and `true` otherwise. - fn throttling_check( - &mut self, - cost: u64, - target_data_rate: u64, - max_data_credit: Option, - ) -> bool { - let ts = (self.now)() / 1000; - // We use saturating_sub here because system time isn't monotonic, causing cases - // when last_logged_ts_seconds is greater than ts. - let elapsed = ts.saturating_sub(self.last_logged_ts_seconds); - self.last_logged_ts_seconds = ts; - self.tokens = self - .tokens - .saturating_add(target_data_rate.saturating_mul(elapsed)); - if let Some(max_data_credit) = max_data_credit { - self.tokens = self.tokens.min(max_data_credit); - } - if let Some(remaining) = self.tokens.checked_sub(cost) { - debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}"); - self.tokens = remaining; - true - } else { - debug!( - "throttling check failed. tokens available: {}; cost: {cost}", - self.tokens - ); - false + /// This provides the frontend with all the state it needs to perform statement + /// logging without direct access to the Coordinator. + pub(crate) fn create_frontend( + &self, + build_info_human_version: String, + ) -> StatementLoggingFrontend { + StatementLoggingFrontend { + throttling_state: Arc::clone(&self.throttling_state), + reproducible_rng: Arc::clone(&self.reproducible_rng), + build_info_human_version, + now: self.now.clone(), } } } impl Coordinator { + /// Helper to write began execution events to pending buffers. + /// Can be called from both old and new peek sequencing. + fn write_began_execution_events( + &mut self, + record: StatementBeganExecutionRecord, + mseh_update: Row, + prepared_statement: Option, + ) { + // `mz_statement_execution_history` + self.statement_logging + .pending_statement_execution_events + .push((mseh_update, Diff::ONE)); + + // Track the execution for later updates + self.statement_logging + .executions_begun + .insert(record.id, record); + + // If we have a prepared statement, log it and possibly its session + if let Some(ps_event) = prepared_statement { + let session_id = ps_event.session_id; + self.statement_logging + .pending_prepared_statement_events + .push(ps_event); + + // Check if we need to log the session for this prepared statement + if let Some(sh) = self.statement_logging.unlogged_sessions.remove(&session_id) { + let sh_update = Self::pack_session_history_update(&sh); + self.statement_logging + .pending_session_events + .push(sh_update); + } + } + } + + /// Handle a statement logging event from frontend peek sequencing. + pub(crate) fn handle_frontend_statement_logging_event( + &mut self, + event: FrontendStatementLoggingEvent, + ) { + match event { + FrontendStatementLoggingEvent::BeganExecution { + record, + mseh_update, + prepared_statement, + } => { + self.record_statement_lifecycle_event( + &StatementLoggingId(record.id), + &StatementLifecycleEvent::ExecutionBegan, + record.began_at, + ); + self.write_began_execution_events(record, mseh_update, prepared_statement); + } + FrontendStatementLoggingEvent::EndedExecution(ended_record) => { + self.end_statement_execution( + StatementLoggingId(ended_record.id), + ended_record.reason, + ); + } + FrontendStatementLoggingEvent::SetCluster { id, cluster_id } => { + self.set_statement_execution_cluster(id, cluster_id); + } + FrontendStatementLoggingEvent::SetTimestamp { id, timestamp } => { + self.set_statement_execution_timestamp(id, timestamp); + } + FrontendStatementLoggingEvent::SetTransientIndex { + id, + transient_index_id, + } => { + self.set_transient_index_id(id, transient_index_id); + } + FrontendStatementLoggingEvent::Lifecycle { id, event, when } => { + self.record_statement_lifecycle_event(&id, &event, when); + } + } + } + + // TODO[btv] make this configurable via LD? + // Although... Logging every 5 seconds seems like it + // should have acceptable cost for now, since we do a + // group commit for tables every 1s anyway. + const STATEMENT_LOGGING_WRITE_INTERVAL: Duration = Duration::from_secs(5); + pub(crate) fn spawn_statement_logging_task(&self) { let internal_cmd_tx = self.internal_cmd_tx.clone(); spawn(|| "statement_logging", async move { - // TODO[btv] make this configurable via LD? - // Although... Logging every 5 seconds seems like it - // should have acceptable cost for now, since we do a - // group commit for tables every 1s anyway. - let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + let mut interval = tokio::time::interval(Coordinator::STATEMENT_LOGGING_WRITE_INTERVAL); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; @@ -242,6 +221,7 @@ impl Coordinator { |PreparedStatementEvent { prepared_statement, sql_text, + .. }| { ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE)) }, @@ -278,7 +258,7 @@ impl Coordinator { /// byte lengths of the rows. /// /// Returns `false` if we must throttle this statement, and `true` otherwise. - fn statement_logging_throttling_check<'a, I>(&mut self, rows: I) -> bool + fn statement_logging_throttling_check<'a, I>(&self, rows: I) -> bool where I: IntoIterator>, { @@ -298,10 +278,12 @@ impl Coordinator { .catalog .system_config() .statement_logging_max_data_credit(); - self.statement_logging.throttling_check( + + self.statement_logging.throttling_state.throttling_check( cost.cast_into(), target_data_rate.cast_into(), max_data_credit.map(CastInto::cast_into), + &self.statement_logging.now, ) } @@ -320,16 +302,16 @@ impl Coordinator { } /// Returns any statement logging events needed for a particular - /// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata. + /// prepared statement. This is a read-only operation that does not mutate + /// the `PreparedStatementLoggingInfo` metadata. /// /// This function does not do a sampling check, and assumes we did so in a higher layer. + /// It also does not do a throttling check - that is done separately in `begin_statement_execution`. /// - /// - /// Returns A tuple containing: + /// Returns a tuple containing: /// - `Option<(StatementPreparedRecord, PreparedStatementEvent)>`: If the prepared statement - /// has not yet been logged, returns the prepared statement record, the packed row of the - /// prepared statement record, and a row for the SQL text. - /// - `Uuid`: The UUID of the prepared statement if the prepared statement has been logged + /// has not yet been logged, returns the prepared statement record and the packed rows. + /// - `Uuid`: The UUID of the prepared statement. pub(crate) fn get_prepared_statement_info( &self, session: &Session, @@ -366,9 +348,17 @@ impl Coordinator { prepared_at: *prepared_at, kind: *kind, }; + + // `mz_prepared_statement_history` let mut mpsh_row = Row::default(); let mut mpsh_packer = mpsh_row.packer(); - Self::pack_statement_prepared_update(&record, &mut mpsh_packer); + pack_statement_prepared_update(&record, &mut mpsh_packer); + let throttled_count = self + .statement_logging + .throttling_state + .get_throttled_count(); + mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count))); + let sql_row = Row::pack([ Datum::TimestampTz( to_datetime(*prepared_at) @@ -381,15 +371,13 @@ impl Coordinator { Datum::String(redacted_sql.as_str()), ]); - let throttled_count = self.statement_logging.throttled_count; - mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit"))); - ( Some(( record, PreparedStatementEvent { prepared_statement: mpsh_row, sql_text: sql_row, + session_id: *session_id, }, )), uuid, @@ -397,30 +385,13 @@ impl Coordinator { } } } - /// The rate at which statement execution should be sampled. - /// This is the value of the session var `statement_logging_sample_rate`, - /// constrained by the system var `statement_logging_max_sample_rate`. - pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 { - let system: f64 = self - .catalog() - .system_config() - .statement_logging_max_sample_rate() - .try_into() - .expect("value constrained to be convertible to f64"); - let user: f64 = session - .vars() - .get_statement_logging_sample_rate() - .try_into() - .expect("value constrained to be convertible to f64"); - f64::min(system, user) - } /// Record the end of statement execution for a statement whose beginning was logged. /// It is an error to call this function for a statement whose beginning was not logged /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type, /// which is only instantiated by `begin_statement_execution` if the statement is actually logged, /// should prevent this. - pub fn end_statement_execution( + pub(crate) fn end_statement_execution( &mut self, id: StatementLoggingId, reason: StatementEndedExecutionReason, @@ -454,109 +425,6 @@ impl Coordinator { ); } - fn pack_statement_execution_inner( - record: &StatementBeganExecutionRecord, - packer: &mut RowPacker, - ) { - let StatementBeganExecutionRecord { - id, - prepared_statement_id, - sample_rate, - params, - began_at, - cluster_id, - cluster_name, - database_name, - search_path, - application_name, - transaction_isolation, - execution_timestamp, - transaction_id, - transient_index_id, - mz_version, - } = record; - - let cluster = cluster_id.map(|id| id.to_string()); - let transient_index_id = transient_index_id.map(|id| id.to_string()); - packer.extend([ - Datum::Uuid(*id), - Datum::Uuid(*prepared_statement_id), - Datum::Float64((*sample_rate).into()), - match &cluster { - None => Datum::Null, - Some(cluster_id) => Datum::String(cluster_id), - }, - Datum::String(&*application_name), - cluster_name.as_ref().map(String::as_str).into(), - Datum::String(database_name), - ]); - packer.push_list(search_path.iter().map(|s| Datum::String(s))); - packer.extend([ - Datum::String(&*transaction_isolation), - (*execution_timestamp).into(), - Datum::UInt64(*transaction_id), - match &transient_index_id { - None => Datum::Null, - Some(transient_index_id) => Datum::String(transient_index_id), - }, - ]); - packer - .try_push_array( - &[ArrayDimension { - lower_bound: 1, - length: params.len(), - }], - params - .iter() - .map(|p| Datum::from(p.as_ref().map(String::as_str))), - ) - .expect("correct array dimensions"); - packer.push(Datum::from(mz_version.as_str())); - packer.push(Datum::TimestampTz( - to_datetime(*began_at).try_into().expect("Sane system time"), - )); - } - - fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row { - let mut row = Row::default(); - let mut packer = row.packer(); - Self::pack_statement_execution_inner(record, &mut packer); - packer.extend([ - // finished_at - Datum::Null, - // finished_status - Datum::Null, - // error_message - Datum::Null, - // result_size - Datum::Null, - // rows_returned - Datum::Null, - // execution_status - Datum::Null, - ]); - row - } - - fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) { - let StatementPreparedRecord { - id, - session_id, - name, - sql_hash, - prepared_at, - kind, - } = record; - packer.extend([ - Datum::Uuid(*id), - Datum::Uuid(*session_id), - Datum::String(name.as_str()), - Datum::Bytes(sql_hash.as_slice()), - Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")), - kind.map(statement_kind_label_value).into(), - ]); - } - fn pack_session_history_update(event: &SessionHistoryEvent) -> Row { let SessionHistoryEvent { id, @@ -566,11 +434,7 @@ impl Coordinator { } = event; Row::pack_slice(&[ Datum::Uuid(*id), - Datum::TimestampTz( - mz_ore::now::to_datetime(*connected_at) - .try_into() - .expect("must fit"), - ), + Datum::TimestampTz(to_datetime(*connected_at).try_into().expect("must fit")), Datum::String(&*application_name), Datum::String(&*authenticated_user), ]) @@ -584,17 +448,17 @@ impl Coordinator { Row::pack_slice(&[ Datum::Uuid(*uuid), Datum::String(event.as_str()), - Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")), + Datum::TimestampTz(to_datetime(when).try_into().expect("must fit")), ]) } - pub fn pack_full_statement_execution_update( + fn pack_full_statement_execution_update( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> Row { let mut row = Row::default(); let mut packer = row.packer(); - Self::pack_statement_execution_inner(began_record, &mut packer); + pack_statement_execution_inner(began_record, &mut packer); let (status, error_message, result_size, rows_returned, execution_strategy) = match &ended_record.reason { StatementEndedExecutionReason::Success { @@ -629,11 +493,11 @@ impl Coordinator { row } - pub fn pack_statement_ended_execution_updates( + fn pack_statement_ended_execution_updates( began_record: &StatementBeganExecutionRecord, ended_record: &StatementEndedExecutionRecord, ) -> [(Row, Diff); 2] { - let retraction = Self::pack_statement_began_execution_update(began_record); + let retraction = pack_statement_began_execution_update(began_record); let new = Self::pack_full_statement_execution_update(began_record, ended_record); [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)] } @@ -649,19 +513,22 @@ impl Coordinator { .executions_begun .get_mut(&id) .expect("mutate_record must not be called after execution ends"); - let retraction = Self::pack_statement_began_execution_update(record); + let retraction = pack_statement_began_execution_update(record); self.statement_logging .pending_statement_execution_events .push((retraction, Diff::MINUS_ONE)); f(record); - let update = Self::pack_statement_began_execution_update(record); + let update = pack_statement_began_execution_update(record); self.statement_logging .pending_statement_execution_events .push((update, Diff::ONE)); } /// Set the `cluster_id` for a statement, once it's known. - pub fn set_statement_execution_cluster( + /// + /// TODO(peek-seq): We could do cluster resolution and packing in the frontend task, and just + /// send over the rows. + pub(crate) fn set_statement_execution_cluster( &mut self, id: StatementLoggingId, cluster_id: ClusterId, @@ -674,7 +541,7 @@ impl Coordinator { } /// Set the `execution_timestamp` for a statement, once it's known - pub fn set_statement_execution_timestamp( + pub(crate) fn set_statement_execution_timestamp( &mut self, id: StatementLoggingId, timestamp: Timestamp, @@ -684,7 +551,11 @@ impl Coordinator { }); } - pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) { + pub(crate) fn set_transient_index_id( + &mut self, + id: StatementLoggingId, + transient_index_id: GlobalId, + ) { self.mutate_record(id, |record| { record.transient_index_id = Some(transient_index_id) }); @@ -696,7 +567,7 @@ impl Coordinator { /// /// `lifecycle_timestamps` has timestamps that come from the Adapter frontend (`mz-pgwire`) part /// of the lifecycle. - pub fn begin_statement_execution( + pub(crate) fn begin_statement_execution( &mut self, session: &mut Session, params: &Params, @@ -710,17 +581,22 @@ impl Coordinator { if session.user().is_internal() && !enable_internal_statement_logging { return None; } - let sample_rate = self.statement_execution_sample_rate(session); - let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]"); - let sample = if self + let sample_rate = effective_sample_rate(session, self.catalog().system_config()); + let use_reproducible_rng = self .catalog() .system_config() - .statement_logging_use_reproducible_rng() - { - distribution.sample(&mut self.statement_logging.reproducible_rng) + .statement_logging_use_reproducible_rng(); + // Only lock the RNG when we actually need reproducible sampling (tests only) + let sample = if use_reproducible_rng { + let mut rng = self + .statement_logging + .reproducible_rng + .lock() + .expect("rng lock poisoned"); + should_sample_statement(sample_rate, Some(&mut *rng)) } else { - distribution.sample(&mut rand::rng()) + should_sample_statement(sample_rate, None) }; // Figure out the cost of everything before we log. @@ -764,49 +640,24 @@ impl Coordinator { let now = self.now(); let execution_uuid = epoch_to_uuid_v7(&now); - let params = std::iter::zip(params.execute_types.iter(), params.datums.iter()) - .map(|(r#type, datum)| { - mz_pgrepr::Value::from_datum(datum, r#type).map(|val| { - let mut buf = BytesMut::new(); - val.encode_text(&mut buf); - String::from_utf8(Into::>::into(buf)) - .expect("Serialization shouldn't produce non-UTF-8 strings.") - }) - }) - .collect(); - let record = StatementBeganExecutionRecord { - id: execution_uuid, - prepared_statement_id: ps_uuid, + let build_info_version = self + .catalog() + .state() + .config() + .build_info + .human_version(None); + let record = create_began_execution_record( + execution_uuid, + ps_uuid, sample_rate, params, + session, began_at, - application_name: session.application_name().to_string(), - transaction_isolation: session.vars().transaction_isolation().to_string(), - transaction_id: session - .transaction() - .inner() - .expect("Every statement runs in an explicit or implicit transaction") - .id, - mz_version: self - .catalog() - .state() - .config() - .build_info - .human_version(None), - // These are not known yet; we'll fill them in later. - cluster_id: None, - cluster_name: None, - execution_timestamp: None, - transient_index_id: None, - database_name: session.vars().database().into(), - search_path: session - .vars() - .search_path() - .iter() - .map(|s| s.as_str().to_string()) - .collect(), - }; - let mseh_update = Self::pack_statement_began_execution_update(&record); + build_info_version, + ); + + // `mz_statement_execution_history` + let mseh_update = pack_statement_began_execution_update(&record); let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps { if let Some(sh) = self @@ -834,7 +685,10 @@ impl Coordinator { maybe_ps_sql_text, maybe_sh_event.as_ref().map(|(row, _)| row), ]) { - self.statement_logging.throttled_count += 1; + // Increment throttled_count in shared state + self.statement_logging + .throttling_state + .increment_throttled_count(); return None; } // When we successfully log the first instance of a prepared statement @@ -842,7 +696,9 @@ impl Coordinator { // throttled statement executions in the builtin prepared statement history table above, // and then reset the throttled count for future tracking. else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) { - self.statement_logging.throttled_count = 0; + self.statement_logging + .throttling_state + .reset_throttled_count(); } self.record_prepared_statement_as_logged(ps_uuid, session, logging); @@ -872,11 +728,12 @@ impl Coordinator { .pending_prepared_statement_events .push(ps_event); } + Some(StatementLoggingId(execution_uuid)) } /// Record a new connection event - pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) { + pub(crate) fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) { let id = session.uuid(); let session_role = session.authenticated_role_id(); let event = SessionHistoryEvent { @@ -888,11 +745,11 @@ impl Coordinator { self.statement_logging.unlogged_sessions.insert(id, event); } - pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) { + pub(crate) fn end_session_for_statement_logging(&mut self, uuid: Uuid) { self.statement_logging.unlogged_sessions.remove(&uuid); } - pub fn record_statement_lifecycle_event( + pub(crate) fn record_statement_lifecycle_event( &mut self, id: &StatementLoggingId, event: &StatementLifecycleEvent, @@ -907,11 +764,43 @@ impl Coordinator { .push(row); } } -} -mod sealed { - /// A struct that is purposefully private so folks are forced to use the constructor of an - /// enum. - #[derive(Debug, Copy, Clone)] - pub struct Private; + /// Install watch sets for statement lifecycle logging. + /// + /// This installs both storage and compute watch sets that will fire + /// `StatementLifecycleEvent::StorageDependenciesFinished` and + /// `StatementLifecycleEvent::ComputeDependenciesFinished` respectively + /// when the dependencies are ready at the given timestamp. + pub(crate) fn install_peek_watch_sets( + &mut self, + conn_id: ConnectionId, + watch_set: WatchSetCreation, + ) -> Result<(), CollectionLookupError> { + let WatchSetCreation { + logging_id, + timestamp, + storage_ids, + compute_ids, + } = watch_set; + + self.install_storage_watch_set( + conn_id.clone(), + storage_ids, + timestamp, + WatchSetResponse::StatementDependenciesReady( + logging_id, + StatementLifecycleEvent::StorageDependenciesFinished, + ), + )?; + self.install_compute_watch_set( + conn_id, + compute_ids, + timestamp, + WatchSetResponse::StatementDependenciesReady( + logging_id, + StatementLifecycleEvent::ComputeDependenciesFinished, + ), + )?; + Ok(()) + } } diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 59fa675e9980d..4032c103d672d 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -23,7 +23,9 @@ use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp}; +use mz_sql::ast::Raw; use mz_sql::catalog::CatalogCluster; +use mz_sql::plan::Params; use mz_sql::plan::{self, Plan, QueryWhen}; use mz_sql::rbac; use mz_sql::session::metadata::SessionMetadata; @@ -35,18 +37,23 @@ use opentelemetry::trace::TraceContextExt; use tracing::{Span, debug}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::catalog::CatalogState; +use crate::catalog::{Catalog, CatalogState}; use crate::command::Command; -use crate::coord::peek::PeekPlan; +use crate::coord::peek::{FastPathPlan, PeekPlan}; use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle}; use crate::coord::timeline::timedomain_for; use crate::coord::timestamp_selection::TimestampDetermination; -use crate::coord::{Coordinator, CopyToContext, ExplainContext, ExplainPlanContext, TargetCluster}; +use crate::coord::{ + Coordinator, CopyToContext, ExecuteContextExtra, ExplainContext, ExplainPlanContext, + TargetCluster, +}; use crate::explain::insights::PlanInsightsContext; use crate::explain::optimizer_trace::OptimizerTrace; use crate::optimize::dataflows::{ComputeInstanceSnapshot, DataflowBuilder}; use crate::optimize::{Optimize, OptimizerError}; use crate::session::{Session, TransactionOps, TransactionStatus}; +use crate::statement_logging::WatchSetCreation; +use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent}; use crate::{ AdapterError, AdapterNotice, CollectionIdBundle, ExecuteResponse, PeekClient, ReadHolds, TimelineContext, TimestampContext, TimestampProvider, optimize, @@ -54,14 +61,22 @@ use crate::{ use crate::{coord, metrics}; impl PeekClient { - pub(crate) async fn try_frontend_peek_inner( + /// Attempt to sequence a peek from the session task. + /// + /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the + /// Coordinator's sequencing. If it returns an error, it should be returned to the user. + /// + /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH + /// triggering the execution of the underlying query. + pub(crate) async fn try_frontend_peek( &mut self, portal_name: &str, session: &mut Session, + outer_ctx_extra: &mut Option, ) -> Result, AdapterError> { if session.vars().emit_timestamp_notice() { // TODO(peek-seq): implement this. See end of peek_finish - debug!("Bailing out from try_frontend_peek_inner, because emit_timestamp_notice"); + debug!("Bailing out from try_frontend_peek, because emit_timestamp_notice"); return Ok(None); } @@ -86,18 +101,16 @@ impl PeekClient { // catalog revision has changed, which we could see with an atomic read. // But anyhow, this problem will just go away when we reach the point that we never fall // back to the old sequencing. - let catalog = self.catalog_snapshot("try_frontend_peek_inner").await; + let catalog = self.catalog_snapshot("try_frontend_peek").await; if let Err(_) = Coordinator::verify_portal(&*catalog, session, portal_name) { // TODO(peek-seq): Don't fall back to the coordinator's peek sequencing here, but retire already. - debug!( - "Bailing out from try_frontend_peek_inner, because verify_portal returned an error" - ); + debug!("Bailing out from try_frontend_peek, because verify_portal returned an error"); return Ok(None); } - // TODO(peek-seq): statement logging (and then enable it in various tests) - let (stmt, params) = { + // Extract things from the portal. + let (stmt, params, logging, lifecycle_timestamps) = { let portal = session .get_portal_unverified(portal_name) // The portal is a session-level thing, so it couldn't have concurrently disappeared @@ -105,82 +118,183 @@ impl PeekClient { .expect("called verify_portal above"); let params = portal.parameters.clone(); let stmt = portal.stmt.clone(); - (stmt, params) - }; - - let stmt = match stmt { - Some(stmt) => stmt, - None => { - debug!("try_frontend_peek_inner succeeded on an empty query"); - return Ok(Some(ExecuteResponse::EmptyQuery)); - } + let logging = Arc::clone(&portal.logging); + let lifecycle_timestamps = portal.lifecycle_timestamps.clone(); + (stmt, params, logging, lifecycle_timestamps) }; // Before planning, check if this is a statement type we can handle. - match &*stmt { - Statement::Select(_) - | Statement::ExplainAnalyzeObject(_) - | Statement::ExplainAnalyzeCluster(_) => { - // These are always fine, just continue. - // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`. - } - Statement::ExplainPlan(explain_stmt) => { - // Only handle ExplainPlan for SELECT statements. - // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that - // requires purification before planning, which the frontend peek sequencing doesn't - // do. - match &explain_stmt.explainee { - mz_sql_parser::ast::Explainee::Select(..) => { - // This is a SELECT, continue - } - _ => { - debug!( - "Bailing out from try_frontend_peek_inner, because EXPLAIN is not for a SELECT query" - ); - return Ok(None); - } + // This must happen BEFORE statement logging setup to avoid orphaned execution records. + if let Some(ref stmt) = stmt { + match &**stmt { + Statement::Select(_) + | Statement::ExplainAnalyzeObject(_) + | Statement::ExplainAnalyzeCluster(_) => { + // These are always fine, just continue. + // Note: EXPLAIN ANALYZE will `plan` to `Plan::Select`. } - } - Statement::ExplainPushdown(explain_stmt) => { - // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements - match &explain_stmt.explainee { - mz_sql_parser::ast::Explainee::Select(_, false) => {} - _ => { - debug!( - "Bailing out from try_frontend_peek_inner, because EXPLAIN FILTER PUSHDOWN is not for a (non-BROKEN) SELECT query" - ); - return Ok(None); + Statement::ExplainPlan(explain_stmt) => { + // Only handle ExplainPlan for SELECT statements. + // We don't want to handle e.g. EXPLAIN CREATE MATERIALIZED VIEW here, because that + // requires purification before planning, which the frontend peek sequencing doesn't + // do. + match &explain_stmt.explainee { + mz_sql_parser::ast::Explainee::Select(..) => { + // This is a SELECT, continue + } + _ => { + debug!( + "Bailing out from try_frontend_peek, because EXPLAIN is not for a SELECT query" + ); + return Ok(None); + } } } - } - Statement::Copy(copy_stmt) => { - match ©_stmt.direction { - CopyDirection::To => { - // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe - if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) { + Statement::ExplainPushdown(explain_stmt) => { + // Only handle EXPLAIN FILTER PUSHDOWN for non-BROKEN SELECT statements + match &explain_stmt.explainee { + mz_sql_parser::ast::Explainee::Select(_, false) => {} + _ => { debug!( - "Bailing out from try_frontend_peek_inner, because COPY (SUBSCRIBE ...) TO is not supported" + "Bailing out from try_frontend_peek, because EXPLAIN FILTER PUSHDOWN is not for a SELECT query or is for EXPLAIN BROKEN" ); return Ok(None); } - // This is COPY TO (SELECT), continue } - CopyDirection::From => { - debug!( - "Bailing out from try_frontend_peek_inner, because COPY FROM is not supported" - ); - return Ok(None); + } + Statement::Copy(copy_stmt) => { + match ©_stmt.direction { + CopyDirection::To => { + // Check for SUBSCRIBE inside COPY TO - we don't handle Plan::Subscribe + if matches!(©_stmt.relation, CopyRelation::Subscribe(_)) { + debug!( + "Bailing out from try_frontend_peek, because COPY (SUBSCRIBE ...) TO is not supported" + ); + return Ok(None); + } + // This is COPY TO (SELECT), continue + } + CopyDirection::From => { + debug!( + "Bailing out from try_frontend_peek, because COPY FROM is not supported" + ); + return Ok(None); + } } } + _ => { + debug!( + "Bailing out from try_frontend_peek, because statement type is not supported" + ); + return Ok(None); + } } - _ => { - debug!( - "Bailing out from try_frontend_peek_inner, because statement type is not supported" - ); - return Ok(None); + } + + // Set up statement logging, and log the beginning of execution. + // (But only if we're not executing in the context of another statement.) + let statement_logging_id = if outer_ctx_extra.is_none() { + // This is a new statement, so begin statement logging + let result = self.statement_logging_frontend.begin_statement_execution( + session, + ¶ms, + &logging, + catalog.system_config(), + lifecycle_timestamps, + ); + + if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { + self.log_began_execution(began_execution, mseh_update, prepared_statement); + Some(logging_id) + } else { + None } + } else { + // We're executing in the context of another statement (e.g., FETCH), + // so extract the statement logging ID from the outer context if present. + // We take ownership and retire the outer context here. The end of execution will be + // logged in one of the following ways: + // - At the end of this function, if the execution is finished by then. + // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek. + outer_ctx_extra.take().and_then(|extra| extra.retire()) + }; + + let result = self + .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id) + .await; + + // Log the end of execution if we are logging this statement and execution has already + // ended. + if let Some(logging_id) = statement_logging_id { + let reason = match &result { + // Streaming results are handled asynchronously by the coordinator + Ok(Some(ExecuteResponse::SendingRowsStreaming { .. })) => { + // Don't log here - the peek is still executing. + // It will be logged when handle_peek_notification is called. + return result; + } + // COPY TO needs to check its inner response + Ok(Some(resp @ ExecuteResponse::CopyTo { resp: inner, .. })) => { + match inner.as_ref() { + ExecuteResponse::SendingRowsStreaming { .. } => { + // Don't log here - the peek is still executing. + // It will be logged when handle_peek_notification is called. + return result; + } + // For non-streaming COPY TO responses, use the outer CopyTo for conversion + _ => resp.into(), + } + } + // Bailout case, which should not happen + Ok(None) => { + soft_panic_or_log!( + "Bailed out from `try_frontend_peek_inner` after we already logged the beginning of statement execution." + ); + // This statement will be handled by the old peek sequencing, which will do its + // own statement logging from the beginning. So, let's close out this one. + self.log_ended_execution( + logging_id, + StatementEndedExecutionReason::Errored { + error: "Internal error: bailed out from `try_frontend_peek_inner`" + .to_string(), + }, + ); + return result; + } + // All other success responses - use the From implementation + // TODO(peek-seq): After we delete the old peek sequencing, we'll be able to adjust + // the From implementation to do exactly what we need in the frontend peek + // sequencing, so that the above special cases won't be needed. + Ok(Some(resp)) => resp.into(), + Err(e) => StatementEndedExecutionReason::Errored { + error: e.to_string(), + }, + }; + + self.log_ended_execution(logging_id, reason); } + result + } + + /// This is encapsulated in an inner function so that the outer function can still do statement + /// logging after the `?` returns of the inner function. + async fn try_frontend_peek_inner( + &mut self, + session: &mut Session, + catalog: Arc, + stmt: Option>>, + params: Params, + statement_logging_id: Option, + ) -> Result, AdapterError> { + let stmt = match stmt { + Some(stmt) => stmt, + None => { + debug!("try_frontend_peek_inner succeeded on an empty query"); + return Ok(Some(ExecuteResponse::EmptyQuery)); + } + }; + let session_type = metrics::session_type_label_value(session.user()); let stmt_type = metrics::statement_type_label_value(&stmt); @@ -320,7 +434,10 @@ impl PeekClient { (cluster, cluster.id, &cluster.name) }; - // TODO(peek-seq): statement logging: set_statement_execution_cluster + // Log cluster selection + if let Some(logging_id) = &statement_logging_id { + self.log_set_cluster(*logging_id, target_cluster_id); + } coord::catalog_serving::check_cluster_restrictions( target_cluster_name.as_str(), @@ -914,6 +1031,11 @@ impl PeekClient { .await .map_err(|optimizer_error| AdapterError::Internal(format!("internal error in optimizer: {}", optimizer_error)))?; + // Log optimization finished + if let Some(logging_id) = &statement_logging_id { + self.log_lifecycle_event(*logging_id, StatementLifecycleEvent::OptimizationFinished); + } + // Handle the optimization result: either generate EXPLAIN output or continue with execution match optimization_result { Execution::ExplainPlan { @@ -971,14 +1093,8 @@ impl PeekClient { // Continue with normal execution // # From peek_finish - // TODO(peek-seq): statement logging - let (peek_plan, df_meta, typ) = global_lir_plan.unapply(); - // Warning: Do not bail out from the new peek sequencing after this point, because the - // following has side effects. TODO(peek-seq): remove this comment once we never - // bail out to the old sequencing. - coord::sequencer::emit_optimizer_notices( &*catalog, session, @@ -1014,12 +1130,41 @@ impl PeekClient { // # Now back to peek_finish - // TODO(peek-seq): statement logging + let watch_set = statement_logging_id.map(|logging_id| { + WatchSetCreation::new( + logging_id, + catalog.state(), + &input_id_bundle, + determination.timestamp_context.timestamp_or_default(), + ) + }); let max_result_size = catalog.system_config().max_result_size(); let response = match peek_plan { PeekPlan::FastPath(fast_path_plan) => { + if let Some(logging_id) = &statement_logging_id { + // TODO(peek-seq): Actually, we should log it also for + // FastPathPlan::Constant. The only reason we are not doing so at the + // moment is to match the old peek sequencing, so that statement logging + // tests pass with the frontend peek sequencing turned both on and off. + // + // When the old sequencing is removed, we should make a couple of + // changes in how we log timestamps: + // - Move this up to just after timestamp determination, so that it + // appears in the log as soon as possible. + // - Do it also for Constant peeks. + // - Currently, slow-path peeks' timestamp logging is done by + // `implement_peek_plan`. We could remove it from there, and just do + // it here. + if !matches!(fast_path_plan, FastPathPlan::Constant(..)) { + self.log_set_timestamp( + *logging_id, + determination.timestamp_context.timestamp_or_default(), + ); + } + } + let row_set_finishing_seconds = session.metrics().row_set_finishing_seconds().clone(); @@ -1043,6 +1188,9 @@ impl PeekClient { read_holds, peek_stash_read_batch_size_bytes, peek_stash_read_memory_budget_bytes, + session.conn_id().clone(), + source_ids, + watch_set, ) .await? } @@ -1106,6 +1254,10 @@ impl PeekClient { } } + if let Some(logging_id) = &statement_logging_id { + self.log_set_transient_index_id(*logging_id, dataflow_plan.id); + } + self.call_coordinator(|tx| Command::ExecuteSlowPathPeek { dataflow_plan: Box::new(dataflow_plan), determination, @@ -1117,6 +1269,7 @@ impl PeekClient { conn_id: session.conn_id().clone(), max_result_size, max_query_result_size, + watch_set, tx, }) .await? @@ -1144,6 +1297,15 @@ impl PeekClient { &df_meta.optimizer_notices, ); + let watch_set = statement_logging_id.map(|logging_id| { + WatchSetCreation::new( + logging_id, + catalog.state(), + &input_id_bundle, + determination.timestamp_context.timestamp_or_default(), + ) + }); + let response = self .call_coordinator(|tx| Command::ExecuteCopyTo { df_desc: Box::new(df_desc), @@ -1151,6 +1313,7 @@ impl PeekClient { target_replica, source_ids, conn_id: session.conn_id().clone(), + watch_set, tx, }) .await?; diff --git a/src/adapter/src/metrics.rs b/src/adapter/src/metrics.rs index 42d3a5e667654..eee12abf88ffe 100644 --- a/src/adapter/src/metrics.rs +++ b/src/adapter/src/metrics.rs @@ -252,11 +252,14 @@ impl Metrics { .timestamp_difference_for_strict_serializable_ms .clone(), optimization_notices: self.optimization_notices.clone(), + statement_logging_records: self.statement_logging_records.clone(), + statement_logging_unsampled_bytes: self.statement_logging_unsampled_bytes.clone(), + statement_logging_actual_bytes: self.statement_logging_actual_bytes.clone(), } } } -/// Metrics associated with a [`crate::session::Session`]. +/// Metrics to be accessed from a [`crate::session::Session`]. #[derive(Debug, Clone)] pub struct SessionMetrics { row_set_finishing_seconds: Histogram, @@ -265,6 +268,9 @@ pub struct SessionMetrics { determine_timestamp: IntCounterVec, timestamp_difference_for_strict_serializable_ms: HistogramVec, optimization_notices: IntCounterVec, + statement_logging_records: IntCounterVec, + statement_logging_unsampled_bytes: IntCounter, + statement_logging_actual_bytes: IntCounter, } impl SessionMetrics { @@ -295,6 +301,22 @@ impl SessionMetrics { pub(crate) fn optimization_notices(&self, label_values: &[&str]) -> GenericCounter { self.optimization_notices.with_label_values(label_values) } + + pub(crate) fn statement_logging_records( + &self, + label_values: &[&str], + ) -> GenericCounter { + self.statement_logging_records + .with_label_values(label_values) + } + + pub(crate) fn statement_logging_unsampled_bytes(&self) -> &IntCounter { + &self.statement_logging_unsampled_bytes + } + + pub(crate) fn statement_logging_actual_bytes(&self) -> &IntCounter { + &self.statement_logging_actual_bytes + } } pub(crate) fn session_type_label_value(user: &User) -> &'static str { diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 8ebb939d62a7c..e8525123c76ff 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -17,9 +17,9 @@ use mz_compute_types::ComputeInstanceId; use mz_expr::row::RowCollection; use mz_ore::cast::CastFrom; use mz_persist_client::PersistClient; -use mz_repr::RelationDesc; use mz_repr::Timestamp; use mz_repr::global_id::TransientIdGen; +use mz_repr::{RelationDesc, Row}; use mz_sql::optimizer_metrics::OptimizerMetrics; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; @@ -32,6 +32,11 @@ use crate::catalog::Catalog; use crate::command::{CatalogSnapshot, Command}; use crate::coord::Coordinator; use crate::coord::peek::FastPathPlan; +use crate::statement_logging::WatchSetCreation; +use crate::statement_logging::{ + FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend, + StatementLoggingId, +}; use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging}; /// Storage collections trait alias we need to consult for since/frontiers. @@ -59,6 +64,8 @@ pub struct PeekClient { /// Per-timeline oracles from the coordinator. Lazily populated. oracles: BTreeMap + Send + Sync>>, persist_client: PersistClient, + /// Statement logging state for frontend peek sequencing. + pub statement_logging_frontend: StatementLoggingFrontend, } impl PeekClient { @@ -69,6 +76,7 @@ impl PeekClient { transient_id_gen: Arc, optimizer_metrics: OptimizerMetrics, persist_client: PersistClient, + statement_logging_frontend: StatementLoggingFrontend, ) -> Self { Self { coordinator_client, @@ -76,6 +84,7 @@ impl PeekClient { storage_collections, transient_id_gen, optimizer_metrics, + statement_logging_frontend, oracles: Default::default(), // lazily populated persist_client, } @@ -84,8 +93,7 @@ impl PeekClient { pub async fn ensure_compute_instance_client( &mut self, compute_instance: ComputeInstanceId, - ) -> Result<&mut mz_compute_client::controller::instance::Client, InstanceMissing> - { + ) -> Result, InstanceMissing> { if !self.compute_instances.contains_key(&compute_instance) { let client = self .call_coordinator(|tx| Command::GetComputeInstanceClient { @@ -97,8 +105,9 @@ impl PeekClient { } Ok(self .compute_instances - .get_mut(&compute_instance) - .expect("ensured above")) + .get(&compute_instance) + .expect("ensured above") + .clone()) } pub async fn ensure_oracle( @@ -212,10 +221,6 @@ impl PeekClient { /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call /// into the Controller to acquire a hold on the peek target after we create the dataflow. - /// - /// TODO(peek-seq): add statement logging - /// TODO(peek-seq): cancellation (see pending_peeks/client_pending_peeks wiring in the old - /// sequencing) pub async fn implement_fast_path_peek_plan( &mut self, fast_path: FastPathPlan, @@ -230,9 +235,25 @@ impl PeekClient { input_read_holds: ReadHolds, peek_stash_read_batch_size_bytes: usize, peek_stash_read_memory_budget_bytes: usize, + conn_id: mz_adapter_types::connection::ConnectionId, + depends_on: std::collections::BTreeSet, + watch_set: Option, ) -> Result { // If the dataflow optimizes to a constant expression, we can immediately return the result. if let FastPathPlan::Constant(rows_res, _) = fast_path { + // For constant queries with statement logging, immediately log that + // dependencies are "ready" (trivially, because there are none). + if let Some(ref ws) = watch_set { + self.log_lifecycle_event( + ws.logging_id, + statement_logging::StatementLifecycleEvent::StorageDependenciesFinished, + ); + self.log_lifecycle_event( + ws.logging_id, + statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished, + ); + } + let mut rows = match rows_res { Ok(rows) => rows, Err(e) => return Err(e.into()), @@ -328,13 +349,28 @@ impl PeekClient { let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}")); let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols); - // Issue the peek to the instance let client = self .ensure_compute_instance_client(compute_instance) .await .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?; + + // Register coordinator tracking of this peek. This has to complete before issuing the peek. + // + // Warning: If we fail to actually issue the peek after this point, then we need to + // unregister it to avoid an orphaned registration. + self.call_coordinator(|tx| Command::RegisterFrontendPeek { + uuid, + conn_id: conn_id.clone(), + cluster_id: compute_instance, + depends_on, + is_fast_path: true, + watch_set, + tx, + }) + .await?; + let finishing_for_instance = finishing.clone(); - client + let peek_result = client .peek( peek_target, literal_constraints, @@ -347,10 +383,18 @@ impl PeekClient { target_replica, rows_tx, ) - .await - .map_err(|err| { - AdapterError::concurrent_dependency_drop_from_peek_error(err, compute_instance) - })?; + .await; + + if let Err(err) = peek_result { + // Clean up the registered peek since the peek failed to issue. + // The frontend will handle statement logging for the error. + self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx }) + .await; + return Err(AdapterError::concurrent_dependency_drop_from_peek_error( + err, + compute_instance, + )); + } let peek_response_stream = Coordinator::create_peek_response_stream( rows_rx, @@ -362,10 +406,96 @@ impl PeekClient { peek_stash_read_batch_size_bytes, peek_stash_read_memory_budget_bytes, ); + Ok(crate::ExecuteResponse::SendingRowsStreaming { rows: Box::pin(peek_response_stream), instance_id: compute_instance, strategy, }) } + + // Statement logging helper methods + + /// Log the beginning of statement execution. + pub(crate) fn log_began_execution( + &self, + record: statement_logging::StatementBeganExecutionRecord, + mseh_update: Row, + prepared_statement: Option, + ) { + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::BeganExecution { + record, + mseh_update, + prepared_statement, + }, + )); + } + + /// Log cluster selection for a statement. + pub(crate) fn log_set_cluster( + &self, + id: StatementLoggingId, + cluster_id: mz_controller_types::ClusterId, + ) { + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::SetCluster { id, cluster_id }, + )); + } + + /// Log timestamp determination for a statement. + pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) { + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::SetTimestamp { id, timestamp }, + )); + } + + /// Log transient index ID for a statement. + pub(crate) fn log_set_transient_index_id( + &self, + id: StatementLoggingId, + transient_index_id: mz_repr::GlobalId, + ) { + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::SetTransientIndex { + id, + transient_index_id, + }, + )); + } + + /// Log a statement lifecycle event. + pub(crate) fn log_lifecycle_event( + &self, + id: StatementLoggingId, + event: statement_logging::StatementLifecycleEvent, + ) { + let when = (self.statement_logging_frontend.now)(); + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::Lifecycle { id, event, when }, + )); + } + + /// Log the end of statement execution. + pub(crate) fn log_ended_execution( + &self, + id: StatementLoggingId, + reason: statement_logging::StatementEndedExecutionReason, + ) { + let ended_at = (self.statement_logging_frontend.now)(); + let record = statement_logging::StatementEndedExecutionRecord { + id: id.0, + reason, + ended_at, + }; + self.coordinator_client + .send(Command::FrontendStatementLogging( + FrontendStatementLoggingEvent::EndedExecution(record), + )); + } } diff --git a/src/adapter/src/session.rs b/src/adapter/src/session.rs index 980d8b862dae0..50f2dcff168fe 100644 --- a/src/adapter/src/session.rs +++ b/src/adapter/src/session.rs @@ -56,11 +56,11 @@ use crate::client::RecordFirstRowStream; use crate::coord::appends::BuiltinTableAppendNotify; use crate::coord::in_memory_oracle::InMemoryTimestampOracle; use crate::coord::peek::PeekResponseUnary; -use crate::coord::statement_logging::PreparedStatementLoggingInfo; use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination}; use crate::coord::{Coordinator, ExplainContext}; use crate::error::AdapterError; use crate::metrics::{Metrics, SessionMetrics}; +use crate::statement_logging::PreparedStatementLoggingInfo; use crate::{AdapterNotice, ExecuteContext}; const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0); diff --git a/src/adapter/src/statement_logging.rs b/src/adapter/src/statement_logging.rs index 7931037eee15d..81438a1c0060e 100644 --- a/src/adapter/src/statement_logging.rs +++ b/src/adapter/src/statement_logging.rs @@ -7,15 +7,33 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Mutex}; + +use bytes::BytesMut; +use mz_catalog::memory::objects::CatalogItem; use mz_controller_types::ClusterId; -use mz_ore::cast::CastFrom; -use mz_ore::now::EpochMillis; -use mz_repr::{GlobalId, RowIterator}; -use mz_sql_parser::ast::StatementKind; +use mz_ore::cast::{CastFrom, CastInto}; +use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime}; +use mz_ore::soft_panic_or_log; +use mz_repr::adt::array::ArrayDimension; +use mz_repr::adt::timestamp::TimestampLike; +use mz_repr::{Datum, GlobalId, Row, RowIterator, RowPacker, Timestamp}; +use mz_sql::ast::display::AstDisplay; +use mz_sql::ast::{AstInfo, Statement}; +use mz_sql::plan::Params; +use mz_sql::session::metadata::SessionMetadata; +use mz_sql::session::vars::SystemVars; +use mz_sql_parser::ast::{StatementKind, statement_kind_label_value}; +use qcell::QCell; +use rand::distr::{Bernoulli, Distribution}; +use sha2::{Digest, Sha256}; use uuid::Uuid; -use crate::session::TransactionId; -use crate::{AdapterError, ExecuteResponse}; +use crate::catalog::CatalogState; +use crate::session::{LifecycleTimestamps, Session, TransactionId}; +use crate::{AdapterError, CollectionIdBundle, ExecuteResponse}; #[derive(Clone, Debug)] pub enum StatementLifecycleEvent { @@ -110,7 +128,7 @@ pub struct StatementEndedExecutionRecord { /// Contains all the information necessary to generate an entry in /// `mz_prepared_statement_history` #[derive(Clone, Debug)] -pub struct StatementPreparedRecord { +pub(crate) struct StatementPreparedRecord { pub id: Uuid, pub sql_hash: [u8; 32], pub name: String, @@ -120,15 +138,7 @@ pub struct StatementPreparedRecord { } #[derive(Clone, Debug)] -pub enum StatementLoggingEvent { - Prepared(StatementPreparedRecord), - BeganExecution(StatementBeganExecutionRecord), - EndedExecution(StatementEndedExecutionRecord), - BeganSession(SessionHistoryEvent), -} - -#[derive(Clone, Debug)] -pub struct SessionHistoryEvent { +pub(crate) struct SessionHistoryEvent { pub id: Uuid, pub connected_at: EpochMillis, pub application_name: String, @@ -254,3 +264,754 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason { } } } + +mod sealed { + /// A struct that is purposefully private so folks are forced to use the constructor of an + /// enum. + #[derive(Debug, Copy, Clone)] + pub struct Private; +} + +/// Metadata required for logging a prepared statement. +#[derive(Debug)] +pub enum PreparedStatementLoggingInfo { + /// The statement has already been logged; we don't need to log it + /// again if a future execution hits the sampling rate; we merely + /// need to reference the corresponding UUID. + AlreadyLogged { uuid: Uuid }, + /// The statement has not yet been logged; if a future execution + /// hits the sampling rate, we need to log it at that point. + StillToLog { + /// The SQL text of the statement. + sql: String, + /// The SQL text of the statement, redacted to follow our data management + /// policy + redacted_sql: String, + /// When the statement was prepared + prepared_at: EpochMillis, + /// The name with which the statement was prepared + name: String, + /// The ID of the session that prepared the statement + session_id: Uuid, + /// Whether we have already recorded this in the "would have logged" metric + accounted: bool, + /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement + kind: Option, + + /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`] + /// constructor. + _sealed: sealed::Private, + }, +} + +impl PreparedStatementLoggingInfo { + /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL + /// statements are properly redacted. + pub fn still_to_log( + raw_sql: String, + stmt: Option<&Statement>, + prepared_at: EpochMillis, + name: String, + session_id: Uuid, + accounted: bool, + ) -> Self { + let kind = stmt.map(StatementKind::from); + let sql = match kind { + // Always redact SQL statements that may contain sensitive information. + // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them. + // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both + // data privacy and to avoid logging excessive data. + Some( + StatementKind::CreateSecret + | StatementKind::AlterSecret + | StatementKind::Insert + | StatementKind::Update + | StatementKind::Execute, + ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(), + _ => raw_sql, + }; + + PreparedStatementLoggingInfo::StillToLog { + sql, + redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(), + prepared_at, + name, + session_id, + accounted, + kind, + _sealed: sealed::Private, + } + } +} + +#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)] +pub struct StatementLoggingId(pub Uuid); + +/// Rows to be written to `mz_prepared_statement_history` and `mz_sql_text`, with the session id. +#[derive(Debug, Clone)] +pub struct PreparedStatementEvent { + pub prepared_statement: Row, + pub sql_text: Row, + pub session_id: Uuid, +} + +/// Throttling state for statement logging, shared across multiple frontend tasks (and currently +/// also shared with the old peek sequencing). +#[derive(Debug)] +pub struct ThrottlingState { + /// Inner state protected by a mutex for rate-limiting, because the two inner fields have to be + /// manipulated together atomically. + /// This mutex is locked once per unsampled query. (There is both sampling and throttling. + /// Sampling happens before throttling.) This should be ok for now: Our QPS will not be more + /// than 10000s for now, and a mutex should be able to do 100000s of lockings per second, even + /// with some contention. If this ever becomes an issue, then we could redesign throttling to be + /// per-session/per-tokio-worker-thread. + inner: Mutex, + /// The number of statements that have been throttled since the last successfully logged + /// statement. This is not needed for the throttling decision itself, so it can be a separate + /// atomic to allow reading/writing without acquiring the inner mutex. + throttled_count: std::sync::atomic::AtomicUsize, +} + +#[derive(Debug)] +struct ThrottlingStateInner { + /// The number of bytes that we are allowed to emit for statement logging without being throttled. + /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second, + /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`]. + tokens: u64, + /// The last time at which a statement was logged. + last_logged_ts_seconds: u64, +} + +impl ThrottlingState { + /// Create a new throttling state. + pub fn new(now: &NowFn) -> Self { + Self { + inner: Mutex::new(ThrottlingStateInner { + tokens: 0, + last_logged_ts_seconds: now() / 1000, + }), + throttled_count: std::sync::atomic::AtomicUsize::new(0), + } + } + + /// Check if we need to drop a statement due to throttling, and update the number of available + /// tokens appropriately. + /// + /// Returns `false` if we must throttle this statement, and `true` otherwise. + /// Note: `throttled_count` is NOT modified by this method - callers are responsible + /// for incrementing it on throttle failure and resetting it when appropriate. + pub fn throttling_check( + &self, + cost: u64, + target_data_rate: u64, + max_data_credit: Option, + now: &NowFn, + ) -> bool { + let ts = now() / 1000; + let mut inner = self.inner.lock().expect("throttling state lock poisoned"); + // We use saturating_sub here because system time isn't monotonic, causing cases + // when last_logged_ts_seconds is greater than ts. + let elapsed = ts.saturating_sub(inner.last_logged_ts_seconds); + inner.last_logged_ts_seconds = ts; + inner.tokens = inner + .tokens + .saturating_add(target_data_rate.saturating_mul(elapsed)); + if let Some(max_data_credit) = max_data_credit { + inner.tokens = inner.tokens.min(max_data_credit); + } + if let Some(remaining) = inner.tokens.checked_sub(cost) { + tracing::debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}"); + inner.tokens = remaining; + true + } else { + tracing::debug!( + "throttling check failed. tokens available: {}; cost: {cost}", + inner.tokens + ); + false + } + } + + pub fn get_throttled_count(&self) -> usize { + self.throttled_count.load(Ordering::Relaxed) + } + + pub fn increment_throttled_count(&self) { + self.throttled_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn reset_throttled_count(&self) { + self.throttled_count.store(0, Ordering::Relaxed); + } +} + +/// Encapsulates statement logging state needed by the frontend peek sequencing. +/// +/// This struct bundles together all the statement logging-related state that +/// the frontend peek sequencing needs to perform statement logging independently +/// of the Coordinator's main task. +#[derive(Debug, Clone)] +pub struct StatementLoggingFrontend { + /// Shared throttling state for rate-limiting statement logging. + pub throttling_state: Arc, + /// Reproducible RNG for statement sampling (only used in tests). + pub reproducible_rng: Arc>, + /// Cached human version string from build info. + pub build_info_human_version: String, + /// Function to get current time for statement logging. + pub now: NowFn, +} + +impl StatementLoggingFrontend { + /// Get prepared statement info for frontend peek sequencing. + /// + /// This function processes prepared statement logging info and builds the event rows. + /// It does NOT do throttling - that is handled externally by the caller in `begin_statement_execution`. + /// It DOES mutate the logging info to mark the statement as already logged. + /// + /// # Arguments + /// * `session` - The session executing the statement + /// * `logging` - Prepared statement logging info + /// + /// # Returns + /// A tuple containing: + /// - `Option`: If the prepared statement has not yet been logged, + /// returns the packed rows for the prepared statement. + /// - `Uuid`: The UUID of the prepared statement. + fn get_prepared_statement_info( + &self, + session: &mut Session, + logging: &Arc>, + ) -> (Option, Uuid) { + let logging_ref = session.qcell_rw(&*logging); + let mut prepared_statement_event = None; + + let ps_uuid = match logging_ref { + PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid, + PreparedStatementLoggingInfo::StillToLog { + sql, + redacted_sql, + prepared_at, + name, + session_id, + accounted, + kind, + _sealed: _, + } => { + assert!( + *accounted, + "accounting for logging should be done in `begin_statement_execution`" + ); + let uuid = epoch_to_uuid_v7(prepared_at); + let sql = std::mem::take(sql); + let redacted_sql = std::mem::take(redacted_sql); + let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into(); + + // Copy session_id before mutating logging_ref + let sid = *session_id; + + let record = StatementPreparedRecord { + id: uuid, + sql_hash, + name: std::mem::take(name), + session_id: sid, + prepared_at: *prepared_at, + kind: *kind, + }; + + // `mz_prepared_statement_history` + let mut mpsh_row = Row::default(); + let mut mpsh_packer = mpsh_row.packer(); + pack_statement_prepared_update(&record, &mut mpsh_packer); + + let sql_row = Row::pack([ + Datum::TimestampTz( + to_datetime(*prepared_at) + .truncate_day() + .try_into() + .expect("must fit"), + ), + Datum::Bytes(sql_hash.as_slice()), + Datum::String(sql.as_str()), + Datum::String(redacted_sql.as_str()), + ]); + + // Read throttled_count from shared state + let throttled_count = self.throttling_state.get_throttled_count(); + + mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count))); + + prepared_statement_event = Some(PreparedStatementEvent { + prepared_statement: mpsh_row, + sql_text: sql_row, + session_id: sid, + }); + + *logging_ref = PreparedStatementLoggingInfo::AlreadyLogged { uuid }; + uuid + } + }; + + (prepared_statement_event, ps_uuid) + } + + /// Begin statement execution logging from the frontend. (Corresponds to + /// `Coordinator::begin_statement_execution`, which is used by the old peek sequencing.) + /// + /// This encapsulates all the statement logging setup: + /// - Retrieves system config values + /// - Performs sampling and throttling checks + /// - Creates statement logging records + /// - Attends to metrics. + /// + /// Returns None if the statement should not be logged (due to sampling or throttling), or the + /// info required to proceed with statement logging. + /// The `Row` is the pre-packed row for `mz_statement_execution_history`. + /// The `Option` is None when we have already logged the prepared + /// statement before, and this is just a subsequent execution. + pub fn begin_statement_execution( + &self, + session: &mut Session, + params: &Params, + logging: &Arc>, + system_config: &SystemVars, + lifecycle_timestamps: Option, + ) -> Option<( + StatementLoggingId, + StatementBeganExecutionRecord, + Row, + Option, + )> { + // Skip logging for internal users unless explicitly enabled + let enable_internal_statement_logging = system_config.enable_internal_statement_logging(); + if session.user().is_internal() && !enable_internal_statement_logging { + return None; + } + + let sample_rate = effective_sample_rate(session, system_config); + + let use_reproducible_rng = system_config.statement_logging_use_reproducible_rng(); + let target_data_rate: Option = system_config + .statement_logging_target_data_rate() + .map(|rate| rate.cast_into()); + let max_data_credit: Option = system_config + .statement_logging_max_data_credit() + .map(|credit| credit.cast_into()); + + // Only lock the RNG when we actually need reproducible sampling (tests only) + let sample = if use_reproducible_rng { + let mut rng = self.reproducible_rng.lock().expect("rng lock poisoned"); + should_sample_statement(sample_rate, Some(&mut *rng)) + } else { + should_sample_statement(sample_rate, None) + }; + + let sampled_label = sample.then_some("true").unwrap_or("false"); + session + .metrics() + .statement_logging_records(&[sampled_label]) + .inc_by(1); + + // Clone only the metrics needed below, before the mutable borrow of session. + let unsampled_bytes_metric = session + .metrics() + .statement_logging_unsampled_bytes() + .clone(); + let actual_bytes_metric = session.metrics().statement_logging_actual_bytes().clone(); + + // Handle the accounted flag and record byte metrics + let is_new_prepared_statement = if let Some((sql, accounted)) = + match session.qcell_rw(logging) { + PreparedStatementLoggingInfo::AlreadyLogged { .. } => None, + PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => { + Some((sql, accounted)) + } + } { + if !*accounted { + unsampled_bytes_metric.inc_by(u64::cast_from(sql.len())); + if sample { + actual_bytes_metric.inc_by(u64::cast_from(sql.len())); + } + *accounted = true; + } + true + } else { + false + }; + + if !sample { + return None; + } + + // Get prepared statement info (this also marks it as logged) + let (prepared_statement_event, ps_uuid) = + self.get_prepared_statement_info(session, logging); + + let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps { + lifecycle_timestamps.received + } else { + (self.now)() + }; + + let current_time = (self.now)(); + let execution_uuid = epoch_to_uuid_v7(¤t_time); + + // Create the execution record + let began_execution = create_began_execution_record( + execution_uuid, + ps_uuid, + sample_rate, + params, + session, + began_at, + self.build_info_human_version.clone(), + ); + + // Build rows to calculate cost for throttling + let mseh_update = pack_statement_began_execution_update(&began_execution); + let maybe_ps_prepared_statement = prepared_statement_event + .as_ref() + .map(|e| &e.prepared_statement); + let maybe_ps_sql_text = prepared_statement_event.as_ref().map(|e| &e.sql_text); + + // Calculate cost of all rows we intend to log + let cost: usize = [ + Some(&mseh_update), + maybe_ps_prepared_statement, + maybe_ps_sql_text, + ] + .into_iter() + .filter_map(|row_opt| row_opt.map(|row| row.byte_len())) + .fold(0_usize, |acc, x| acc.saturating_add(x)); + + // Do throttling check + let passed = if let Some(target_data_rate) = target_data_rate { + self.throttling_state.throttling_check( + cost.cast_into(), + target_data_rate, + max_data_credit, + &self.now, + ) + } else { + true // No throttling configured + }; + + if !passed { + // Increment throttled_count in shared state + self.throttling_state.increment_throttled_count(); + return None; + } + + // When we successfully log the first instance of a prepared statement + // (i.e., it is not throttled), reset the throttled count for future tracking. + if is_new_prepared_statement { + self.throttling_state.reset_throttled_count(); + } + + Some(( + StatementLoggingId(execution_uuid), + began_execution, + mseh_update, + prepared_statement_event, + )) + } +} + +/// The effective rate at which statement execution should be sampled. +/// This is the value of the session var `statement_logging_sample_rate`, +/// constrained by the system var `statement_logging_max_sample_rate`. +pub(crate) fn effective_sample_rate(session: &Session, system_vars: &SystemVars) -> f64 { + let system_max: f64 = system_vars + .statement_logging_max_sample_rate() + .try_into() + .expect("value constrained to be convertible to f64"); + let user_rate: f64 = session + .vars() + .get_statement_logging_sample_rate() + .try_into() + .expect("value constrained to be convertible to f64"); + f64::min(system_max, user_rate) +} + +/// Helper function to decide whether to sample a statement execution. +/// Returns `true` if the statement should be sampled based on the sample rate. +/// +/// If `reproducible_rng` is `Some`, uses the provided RNG for reproducible sampling (used in tests). +/// If `reproducible_rng` is `None`, uses the thread-local RNG. +pub(crate) fn should_sample_statement( + sample_rate: f64, + reproducible_rng: Option<&mut rand_chacha::ChaCha8Rng>, +) -> bool { + let distribution = Bernoulli::new(sample_rate).unwrap_or_else(|_| { + soft_panic_or_log!("statement_logging_sample_rate is out of range [0, 1]"); + Bernoulli::new(0.0).expect("0.0 is valid for Bernoulli") + }); + if let Some(rng) = reproducible_rng { + distribution.sample(rng) + } else { + distribution.sample(&mut rand::rng()) + } +} + +/// Helper function to serialize statement parameters for logging. +fn serialize_params(params: &Params) -> Vec> { + std::iter::zip(params.execute_types.iter(), params.datums.iter()) + .map(|(r#type, datum)| { + mz_pgrepr::Value::from_datum(datum, r#type).map(|val| { + let mut buf = BytesMut::new(); + val.encode_text(&mut buf); + String::from_utf8(Into::>::into(buf)) + .expect("Serialization shouldn't produce non-UTF-8 strings.") + }) + }) + .collect() +} + +/// Helper function to create a `StatementBeganExecutionRecord`. +pub(crate) fn create_began_execution_record( + execution_uuid: Uuid, + prepared_statement_uuid: Uuid, + sample_rate: f64, + params: &Params, + session: &Session, + began_at: EpochMillis, + build_info_version: String, +) -> StatementBeganExecutionRecord { + let params = serialize_params(params); + StatementBeganExecutionRecord { + id: execution_uuid, + prepared_statement_id: prepared_statement_uuid, + sample_rate, + params, + began_at, + application_name: session.application_name().to_string(), + transaction_isolation: session.vars().transaction_isolation().to_string(), + transaction_id: session + .transaction() + .inner() + .map(|t| t.id) + .unwrap_or_else(|| { + // This should never happen because every statement runs in an explicit or implicit + // transaction. + soft_panic_or_log!( + "Statement logging got a statement with no associated transaction" + ); + 9999999 + }), + mz_version: build_info_version, + // These are not known yet; we'll fill them in later. + cluster_id: None, + cluster_name: None, + execution_timestamp: None, + transient_index_id: None, + database_name: session.vars().database().into(), + search_path: session + .vars() + .search_path() + .iter() + .map(|s| s.as_str().to_string()) + .collect(), + } +} + +/// Represents a single statement logging event that can be sent from the frontend +/// peek sequencing to the Coordinator via an mpsc channel. +#[derive(Debug, Clone)] +pub enum FrontendStatementLoggingEvent { + /// Statement execution began, possibly with an associated prepared statement + /// if this is the first time the prepared statement is being logged + BeganExecution { + record: StatementBeganExecutionRecord, + /// `mz_statement_execution_history` + mseh_update: Row, + prepared_statement: Option, + }, + /// Statement execution ended + EndedExecution(StatementEndedExecutionRecord), + /// Set the cluster for a statement execution + SetCluster { + id: StatementLoggingId, + cluster_id: ClusterId, + }, + /// Set the execution timestamp for a statement + SetTimestamp { + id: StatementLoggingId, + timestamp: Timestamp, + }, + /// Set the transient index ID for a statement + SetTransientIndex { + id: StatementLoggingId, + transient_index_id: GlobalId, + }, + /// Record a statement lifecycle event + Lifecycle { + id: StatementLoggingId, + event: StatementLifecycleEvent, + when: EpochMillis, + }, +} + +pub(crate) fn pack_statement_execution_inner( + record: &StatementBeganExecutionRecord, + packer: &mut RowPacker, +) { + let StatementBeganExecutionRecord { + id, + prepared_statement_id, + sample_rate, + params, + began_at, + cluster_id, + cluster_name, + database_name, + search_path, + application_name, + transaction_isolation, + execution_timestamp, + transaction_id, + transient_index_id, + mz_version, + } = record; + + let cluster = cluster_id.map(|id| id.to_string()); + let transient_index_id = transient_index_id.map(|id| id.to_string()); + packer.extend([ + Datum::Uuid(*id), + Datum::Uuid(*prepared_statement_id), + Datum::Float64((*sample_rate).into()), + match &cluster { + None => Datum::Null, + Some(cluster_id) => Datum::String(cluster_id), + }, + Datum::String(&*application_name), + cluster_name.as_ref().map(String::as_str).into(), + Datum::String(database_name), + ]); + packer.push_list(search_path.iter().map(|s| Datum::String(s))); + packer.extend([ + Datum::String(&*transaction_isolation), + (*execution_timestamp).into(), + Datum::UInt64(*transaction_id), + match &transient_index_id { + None => Datum::Null, + Some(transient_index_id) => Datum::String(transient_index_id), + }, + ]); + packer + .try_push_array( + &[ArrayDimension { + lower_bound: 1, + length: params.len(), + }], + params + .iter() + .map(|p| Datum::from(p.as_ref().map(String::as_str))), + ) + .expect("correct array dimensions"); + packer.push(Datum::from(mz_version.as_str())); + packer.push(Datum::TimestampTz( + to_datetime(*began_at).try_into().expect("Sane system time"), + )); +} + +pub(crate) fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row { + let mut row = Row::default(); + let mut packer = row.packer(); + pack_statement_execution_inner(record, &mut packer); + packer.extend([ + // finished_at + Datum::Null, + // finished_status + Datum::Null, + // error_message + Datum::Null, + // result_size + Datum::Null, + // rows_returned + Datum::Null, + // execution_status + Datum::Null, + ]); + row +} + +pub(crate) fn pack_statement_prepared_update( + record: &StatementPreparedRecord, + packer: &mut RowPacker, +) { + let StatementPreparedRecord { + id, + session_id, + name, + sql_hash, + prepared_at, + kind, + } = record; + packer.extend([ + Datum::Uuid(*id), + Datum::Uuid(*session_id), + Datum::String(name.as_str()), + Datum::Bytes(sql_hash.as_slice()), + Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")), + kind.map(statement_kind_label_value).into(), + ]); +} + +/// Bundles all information needed to install watch sets for statement lifecycle logging. +/// This includes the statement logging ID and the transitive dependencies to watch. +#[derive(Debug)] +pub struct WatchSetCreation { + /// The statement logging ID for this execution. + pub logging_id: StatementLoggingId, + /// The timestamp at which to watch for dependencies becoming ready. + pub timestamp: Timestamp, + /// Transitive storage dependencies (tables, sources) to watch. + pub storage_ids: BTreeSet, + /// Transitive compute dependencies (materialized views, indexes) to watch. + pub compute_ids: BTreeSet, +} + +impl WatchSetCreation { + /// Compute transitive dependencies for watch sets from an input ID bundle, categorized into + /// storage and compute IDs. + pub fn new( + logging_id: StatementLoggingId, + catalog_state: &CatalogState, + input_id_bundle: &CollectionIdBundle, + timestamp: Timestamp, + ) -> Self { + let mut storage_ids = BTreeSet::new(); + let mut compute_ids = BTreeSet::new(); + + for item_id in input_id_bundle + .iter() + .map(|gid| catalog_state.get_entry_by_global_id(&gid).id()) + .flat_map(|id| catalog_state.transitive_uses(id)) + { + let entry = catalog_state.get_entry(&item_id); + match entry.item() { + // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect. + // For example, this peek may depend on just a single version of a table, but + // we would add dependencies on all versions of said table. Doing this is okay + // for now since we can't yet version tables, but should get fixed. + CatalogItem::Table(_) | CatalogItem::Source(_) => { + storage_ids.extend(entry.global_ids()); + } + // Each catalog item is computed by at most one compute collection at a time, + // which is also the most recent one. + CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => { + compute_ids.insert(entry.latest_global_id()); + } + _ => {} + } + } + + Self { + logging_id, + timestamp, + storage_ids, + compute_ids, + } + } +} diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 009eeedd22489..016f724c2dc56 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -279,6 +279,8 @@ where } /// Issue a peek by calling into the instance task. + /// + /// If this returns an error, then it didn't modify any `Instance` state. pub async fn peek( &self, peek_target: PeekTarget, @@ -1735,6 +1737,8 @@ where } /// Initiate a peek request for the contents of `id` at `timestamp`. + /// + /// If this returns an error, then it didn't modify any `Instance` state. #[mz_ore::instrument(level = "debug")] pub fn peek( &mut self, diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 34dee8670231f..625f50097cd17 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -32,6 +32,7 @@ use futures::future::BoxFuture; use mz_build_info::BuildInfo; use mz_cluster_client::metrics::ControllerMetrics; use mz_cluster_client::{ReplicaId, WallclockLagFn}; +use mz_compute_client::controller::error::CollectionLookupError; use mz_compute_client::controller::{ ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification, }; @@ -372,15 +373,20 @@ where &mut self, mut objects: BTreeSet, t: T, - ) -> WatchSetId { + ) -> Result { let ws_id = self.watch_set_id_gen.allocate_id(); + // Collect all frontiers first, returning any errors + let frontiers: BTreeMap = objects + .iter() + .map(|id| { + self.compute + .collection_frontiers(*id, None) + .map(|f| (*id, f.write_frontier)) + }) + .collect::>()?; objects.retain(|id| { - let frontier = self - .compute - .collection_frontiers(*id, None) - .map(|f| f.write_frontier) - .expect("missing compute dependency"); + let frontier = frontiers.get(id).expect("just collected"); frontier.less_equal(&t) }); if objects.is_empty() { @@ -395,7 +401,7 @@ where self.unfulfilled_watch_sets.insert(ws_id, (objects, t)); } - ws_id + Ok(ws_id) } /// Install a _watch set_ in the controller. @@ -410,13 +416,12 @@ where &mut self, mut objects: BTreeSet, t: T, - ) -> WatchSetId { + ) -> Result { let ws_id = self.watch_set_id_gen.allocate_id(); let uppers = self .storage - .collections_frontiers(objects.iter().cloned().collect()) - .expect("missing storage dependencies") + .collections_frontiers(objects.iter().cloned().collect())? .into_iter() .map(|(id, _since, upper)| (id, upper)) .collect::>(); @@ -436,7 +441,7 @@ where } self.unfulfilled_watch_sets.insert(ws_id, (objects, t)); } - ws_id + Ok(ws_id) } /// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index 2da086afa0c62..be40cc0c6e095 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -1232,6 +1232,11 @@ impl TestServerWithRuntime { pub fn internal_sql_local_addr(&self) -> SocketAddr { self.server.internal_sql_local_addr() } + + /// Returns the metrics registry for the test server. + pub fn metrics_registry(&self) -> &MetricsRegistry { + &self.server.metrics_registry + } } #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 63ba185f46c63..e5280ac7f2686 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -185,6 +185,45 @@ impl TestServerWithStatementLoggingChecks { { self.server.connect_internal(tls) } + + /// Returns the metrics registry for the test server. + pub fn metrics_registry(&self) -> &MetricsRegistry { + self.server.metrics_registry() + } +} + +/// Helper to get statement logging record counts from the metrics registry. +/// Returns (sampled_true_count, sampled_false_count). +fn get_statement_logging_record_counts( + server: &TestServerWithStatementLoggingChecks, +) -> (u64, u64) { + let metrics = server.metrics_registry().gather(); + let record_count_metric = metrics + .into_iter() + .find(|m| m.name() == "mz_statement_logging_record_count") + .expect("mz_statement_logging_record_count metric should exist"); + + let metric_entries = record_count_metric.get_metric(); + let sampled_true = metric_entries + .iter() + .find(|m| { + m.get_label() + .iter() + .any(|l| l.name() == "sample" && l.value() == "true") + }) + .map(|m| u64::cast_lossy(m.get_counter().get_value())) + .unwrap_or(0); + let sampled_false = metric_entries + .iter() + .find(|m| { + m.get_label() + .iter() + .any(|l| l.name() == "sample" && l.value() == "false") + }) + .map(|m| u64::cast_lossy(m.get_counter().get_value())) + .unwrap_or(0); + + (sampled_true, sampled_false) } impl Drop for TestServerWithStatementLoggingChecks { @@ -588,10 +627,51 @@ ORDER BY mseh.began_at", ); assert_none!(sl_results[3].result_size); assert_none!(sl_results[3].rows_returned); + + // Verify metrics show all statements were sampled (100% sample rate means no unsampled). + let (sampled_true, sampled_false) = get_statement_logging_record_counts(&server); + assert!( + sampled_true > 0, + "some statements should be sampled with 100% rate" + ); + assert_eq!( + sampled_false, 0, + "no statements should be unsampled with 100% rate" + ); + + // Verify statement_logging_actual_bytes metric is being tracked. + // With 100% sample rate, actual_bytes should equal unsampled_bytes. + let metrics = server.metrics_registry().gather(); + let actual_bytes = metrics + .iter() + .find(|m| m.name() == "mz_statement_logging_actual_bytes") + .expect("mz_statement_logging_actual_bytes metric should exist") + .get_metric()[0] + .get_counter() + .get_value(); + let unsampled_bytes = metrics + .iter() + .find(|m| m.name() == "mz_statement_logging_unsampled_bytes") + .expect("mz_statement_logging_unsampled_bytes metric should exist") + .get_metric()[0] + .get_counter() + .get_value(); + assert!( + actual_bytes > 0.0, + "actual_bytes should be > 0 with 100% sample rate" + ); + assert_eq!( + actual_bytes, unsampled_bytes, + "with 100% sample rate, actual_bytes should equal unsampled_bytes" + ); } fn run_throttling_test(use_prepared_statement: bool) { - let (server, mut client) = setup_statement_logging(1.0, 1.0, "1000"); + // The `target_data_rate` should be + // - high enough so that the `SELECT 1` queries get throttled (even with high CPU load due to + // other tests running in parallel), + // - but low enough that the `SELECT 2` query after the sleep doesn't get throttled. + let (server, mut client) = setup_statement_logging(1.0, 1.0, "200"); thread::sleep(Duration::from_secs(2)); if use_prepared_statement { @@ -800,6 +880,18 @@ fn test_statement_logging_sampling_inner( let sqls: Vec = sl.into_iter().map(|r| r.get(0)).collect(); assert_eq!(sqls, expected_sqls); + + // Verify the statement_logging_record_count metric correctly tracks sampled vs unsampled. + // With 50% sampling and deterministic RNG, exactly 21 of 50 statements should be sampled. + let (sampled_true, sampled_false) = get_statement_logging_record_counts(&server); + assert_eq!( + sampled_true, 21, + "expected 21 statements to be sampled with 50% rate and deterministic RNG" + ); + assert_eq!( + sampled_false, 29, + "expected 29 statements to not be sampled with 50% rate and deterministic RNG" + ); } #[mz_ore::test] @@ -816,21 +908,15 @@ fn test_statement_logging_sampling_constrained() { test_statement_logging_sampling_inner(server, client); } -#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))] -async fn test_statement_logging_unsampled_metrics() { - let server = test_util::TestHarness::default().start().await; - server - .disable_feature_flags(&["enable_frontend_peek_sequencing"]) - .await; - let client = server.connect().await.unwrap(); +/// Test that the `mz_statement_logging_unsampled_bytes` metric tracks the total bytes +/// of SQL text that would have been logged if statement logging were fully enabled. +/// We set `sample_rate=0.0` so no statements are actually sampled/logged, but the +/// unsampled_bytes metric still gets incremented for every executed statement. +#[mz_ore::test] +fn test_statement_logging_unsampled_metrics() { + // Use sample_rate=0.0 so statements are not sampled, but unsampled_bytes metric is still tracked. + let (server, mut client) = setup_statement_logging(1.0, 0.0, ""); - // TODO[btv] - // - // The point of these metrics is to show how much SQL text we - // would have logged had statement logging been turned on. - // Since there is no way (yet) to turn statement logging off or on, - // this test is valid as-is currently. However, once we turn statement logging on, - // we should make sure to turn it _off_ in this test. let batch_queries = [ "SELECT 'Hello, world!';SELECT 1;;", "SELECT 'Hello, world again!'", @@ -859,29 +945,26 @@ async fn test_statement_logging_unsampled_metrics() { .count(); for q in batch_queries { - client.batch_execute(q).await.unwrap(); + client.batch_execute(q).unwrap(); } for q in single_queries { - client.execute(q, &[]).await.unwrap(); + client.execute(q, &[]).unwrap(); } for q in prepared_queries { - let s = client.prepare(q).await.unwrap(); - client.execute(&s, &[]).await.unwrap(); + let s = client.prepare(q).unwrap(); + client.execute(&s, &[]).unwrap(); } - client.batch_execute(&named_prepared_outer).await.unwrap(); + client.batch_execute(&named_prepared_outer).unwrap(); // This should NOT be logged, since we never actually execute it. - client - .prepare("SELECT 'Hello, not counted!'") - .await - .unwrap(); + client.prepare("SELECT 'Hello, not counted!'").unwrap(); let expected_total = batch_total + single_total + prepared_total + named_prepared_outer_len; let metric_value = server - .metrics_registry + .metrics_registry() .gather() .into_iter() .find(|m| m.name() == "mz_statement_logging_unsampled_bytes") @@ -891,6 +974,14 @@ async fn test_statement_logging_unsampled_metrics() { .get_value(); let metric_value = usize::cast_from(u64::try_cast_from(metric_value).unwrap()); assert_eq!(expected_total, metric_value); + + // Also verify that statement_logging_record_count shows all statements as not sampled + // (since we're using 0% sample rate). + let (sampled_true, _sampled_false) = get_statement_logging_record_counts(&server); + assert_eq!( + sampled_true, 0, + "no statements should be sampled with 0% sample rate" + ); } #[mz_ore::test] @@ -2014,20 +2105,11 @@ fn test_ws_passes_options() { // doesn't cause a crash with subscribes over web sockets, // which was previously happening (in staging) due to us // dropping the `ExecuteContext` on the floor in that case. -fn test_ws_subscribe_no_crash() { - let server = test_util::TestHarness::default() - .with_system_parameter_default( - "statement_logging_max_sample_rate".to_string(), - "1.0".to_string(), - ) - .with_system_parameter_default( - "statement_logging_default_sample_rate".to_string(), - "1.0".to_string(), - ) - .start_blocking(); +fn test_statement_logging_ws_subscribe_no_crash() { + let (server, _client) = setup_statement_logging(1.0, 1.0, ""); // Create our WebSocket. - let ws_url = server.ws_addr(); + let ws_url = server.server.ws_addr(); let (mut ws, _resp) = tungstenite::connect(ws_url).unwrap(); test_util::auth_with_ws(&mut ws, Default::default()).unwrap(); diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index c396cd614cecf..13008cd248c62 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -1340,6 +1340,8 @@ where Ok(State::Ready) } + /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH + /// triggering the execution of the underlying query. fn execute( &mut self, portal_name: String, diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index bbc257e8c0b27..bd5443eb7c3e1 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1951,7 +1951,7 @@ feature_flags!( name: statement_logging_use_reproducible_rng, desc: "statement logging with reproducible RNG", default: false, - enable_for_item_parsing: true, + enable_for_item_parsing: false, }, { name: enable_notices_for_index_already_exists, diff --git a/test/cluster/statement-logging/statement-logging.td b/test/cluster/statement-logging/statement-logging.td index 7a34d5487a48f..9f35f3c83db93 100644 --- a/test/cluster/statement-logging/statement-logging.td +++ b/test/cluster/statement-logging/statement-logging.td @@ -33,6 +33,10 @@ ALTER SYSTEM SET enable_statement_lifecycle_logging = true # so we can be sure that their `began_at` is different. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" +# This should fail due to RBAC checks. +! SELECT count(*) FROM mz_internal.mz_statement_execution_history; +contains: permission denied + # Make it so we can query the tables later $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} ALTER SYSTEM SET enable_rbac_checks = false @@ -47,6 +51,11 @@ ALTER SYSTEM SET enable_compute_peek_response_stash = false > SELECT 'beginning real test!' "beginning real test!" +> SHOW OBJECTS; + +! SELECT 1/0; +contains: division by zero + > PREPARE p AS values ($1) > EXECUTE p('hello world') @@ -130,6 +139,7 @@ serializable my_app 1 {} success true "SET transaction_isolation TO serializable" "strict serializable" materialize {public} c my_app 1 {} success 18 1 standard true "SELECT count(*) FROM t" "strict serializable" true materialize {public} c my_app 1 {} success true "DROP CLUSTER c" "strict serializable" materialize {public} +quickstart quickstart my_app 1 {} error "division by zero" true "SELECT 1/0" "strict serializable" materialize {public} quickstart quickstart my_app 1 {} error "Evaluation error: division by zero" true "SELECT f/0 FROM t" "strict serializable" true materialize {public} quickstart quickstart my_app 1 {} success 13 1 constant true "EXECUTE p ('')" "strict serializable" materialize {public} quickstart quickstart my_app 1 {} success 18 1 fast-path true "SELECT * FROM t" "strict serializable" materialize {public} @@ -146,6 +156,7 @@ quickstart quickstart my_app 1 {} success true "FETC quickstart quickstart my_app 1 {} success true "INSERT INTO t VALUES ('')" "strict serializable" materialize {public} quickstart quickstart my_app 1 {} success true "PREPARE p AS values ($1)" "strict serializable" materialize {public} quickstart quickstart my_app 1 {} success true "SET cluster TO c" "strict serializable" materialize {public} +mz_catalog_server mz_catalog_server my_app 1 {} success 0 0 fast-path true "SHOW OBJECTS" "strict serializable" materialize {public} mz_catalog_server mz_catalog_server my_app 1 {} success 22 1 constant true "SELECT 'beginning real test!'" "strict serializable" materialize {public} mz_catalog_server mz_catalog_server my_app 1 {} success 14 1 constant true "SELECT 'serializable'" serializable materialize {public} mz_catalog_server mz_catalog_server my_app 1 {} success 17 1 standard true "SELECT count(*) > 0 FROM mz_internal.mz_cluster_replica_metrics" "strict serializable" true materialize {public} @@ -225,6 +236,16 @@ COMMIT execution-finished "SET cluster TO c" execution-finished "SET transaction_isolation TO serializable" execution-began "SET transaction_isolation TO serializable" execution-finished +"SELECT 1/0" compute-dependencies-finished +"SELECT 1/0" execution-began +"SELECT 1/0" execution-finished +"SELECT 1/0" optimization-finished +"SELECT 1/0" storage-dependencies-finished +"SHOW OBJECTS" compute-dependencies-finished +"SHOW OBJECTS" execution-began +"SHOW OBJECTS" execution-finished +"SHOW OBJECTS" optimization-finished +"SHOW OBJECTS" storage-dependencies-finished # Test that everything in a transaction has the same transaction ID