Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ class StatementLogging(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
# TODO(peek-seq): enable_frontend_peek_sequencing when it supports statement logging.
"""
$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET statement_logging_max_sample_rate TO 1.0

$[version>=2600200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_frontend_peek_sequencing = false;
"""
)
)
Expand Down
34 changes: 28 additions & 6 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
} = response;

let peek_client = PeekClient::new(
Expand All @@ -272,6 +273,7 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
);

let mut client = SessionClient {
Expand Down Expand Up @@ -692,6 +694,9 @@ impl SessionClient {
/// Executes a previously-bound portal.
///
/// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
#[mz_ore::instrument(level = "debug")]
pub async fn execute(
&mut self,
Expand All @@ -704,11 +709,21 @@ impl SessionClient {
// Attempt peek sequencing in the session task.
// If unsupported, fall back to the Coordinator path.
// TODO(peek-seq): wire up cancel_future
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
let mut outer_ctx_extra = outer_ctx_extra;
if let Some(resp) = self
.try_frontend_peek(&portal_name, &mut outer_ctx_extra)
.await?
{
debug!("frontend peek succeeded");
// Frontend peek handled the execution and retired outer_ctx_extra if it existed.
// No additional work needed here.
return Ok((resp, execute_started));
} else {
debug!("frontend peek did not happen");
debug!("frontend peek did not happen, falling back to `Command::Execute`");
// If we bailed out, outer_ctx_extra is still present (if it was originally).
// `Command::Execute` will handle it.
// (This is not true if we bailed out _after_ the frontend peek sequencing has already
// begun its own statement logging. That case would be a bug.)
}

let response = self
Expand Down Expand Up @@ -1020,7 +1035,10 @@ impl SessionClient {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => {}
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::FrontendStatementLogging(..) => {}
};
cmd
});
Expand Down Expand Up @@ -1105,16 +1123,20 @@ impl SessionClient {

/// Attempt to sequence a peek from the session task.
///
/// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's
/// peek sequencing.
/// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
/// Coordinator's sequencing. If it returns an error, it should be returned to the user.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
pub(crate) async fn try_frontend_peek(
&mut self,
portal_name: &str,
outer_ctx_extra: &mut Option<ExecuteContextExtra>,
) -> Result<Option<ExecuteResponse>, AdapterError> {
if self.enable_frontend_peek_sequencing {
let session = self.session.as_mut().expect("SessionClient invariant");
self.peek_client
.try_frontend_peek_inner(portal_name, session)
.try_frontend_peek(portal_name, session, outer_ctx_extra)
.await
} else {
Ok(None)
Expand Down
51 changes: 48 additions & 3 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::error::AdapterError;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::statement_logging::WatchSetCreation;
use crate::statement_logging::{
FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
StatementLoggingFrontend,
};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
Expand Down Expand Up @@ -210,6 +214,9 @@ pub enum Command {
conn_id: ConnectionId,
max_result_size: u64,
max_query_result_size: Option<u64>,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -219,6 +226,9 @@ pub enum Command {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
conn_id: ConnectionId,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -230,6 +240,34 @@ pub enum Command {
current_role: RoleId,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

/// Register a pending peek initiated by frontend sequencing. This is needed for:
/// - statement logging
/// - query cancellation
RegisterFrontendPeek {
uuid: Uuid,
conn_id: ConnectionId,
cluster_id: mz_controller_types::ClusterId,
depends_on: BTreeSet<GlobalId>,
is_fast_path: bool,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<(), AdapterError>>,
},

/// Unregister a pending peek that was registered but failed to issue.
/// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
/// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
/// frontend will log the error.
UnregisterFrontendPeek {
uuid: Uuid,
tx: oneshot::Sender<()>,
},

/// Statement logging event from frontend peek sequencing.
/// No response channel needed - this is fire-and-forget.
FrontendStatementLogging(FrontendStatementLoggingEvent),
}

impl Command {
Expand Down Expand Up @@ -257,7 +295,10 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::FrontendStatementLogging(..) => None,
}
}

Expand Down Expand Up @@ -285,7 +326,10 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::FrontendStatementLogging(..) => None,
}
}
}
Expand Down Expand Up @@ -318,6 +362,7 @@ pub struct StartupResponse {
pub transient_id_gen: Arc<TransientIdGen>,
pub optimizer_metrics: OptimizerMetrics,
pub persist_client: PersistClient,
pub statement_logging_frontend: StatementLoggingFrontend,
}

/// The response to [`Client::authenticate`](crate::Client::authenticate).
Expand Down
36 changes: 22 additions & 14 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ use mz_catalog::memory::objects::{
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
use mz_compute_client::controller::error::{
CollectionLookupError, DataflowCreationError, InstanceMissing,
};
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
Expand Down Expand Up @@ -190,7 +192,7 @@ use crate::coord::cluster_scheduling::SchedulingDecision;
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::introspection::IntrospectionSubscribe;
use crate::coord::peek::PendingPeek;
use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
use crate::coord::statement_logging::StatementLogging;
use crate::coord::timeline::{TimelineContext, TimelineState};
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
use crate::coord::validity::PlanValidity;
Expand All @@ -203,7 +205,9 @@ use crate::optimize::dataflows::{
};
use crate::optimize::{self, Optimize, OptimizerConfig};
use crate::session::{EndTransactionAction, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
use crate::statement_logging::{
StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
};
use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{AdapterNotice, ReadHolds, flags};
Expand Down Expand Up @@ -374,6 +378,9 @@ impl Message {
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
Command::ExecuteCopyTo { .. } => "execute-copy-to",
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
Command::FrontendStatementLogging(..) => "frontend-statement-logging",
},
Message::ControllerReady {
controller: ControllerReadiness::Compute,
Expand Down Expand Up @@ -1332,7 +1339,7 @@ impl ExecuteContextExtra {
/// called from code that knows what to do to finish up logging
/// based on the inner value.
#[must_use]
fn retire(mut self) -> Option<StatementLoggingId> {
pub(crate) fn retire(mut self) -> Option<StatementLoggingId> {
let Self { statement_uuid } = &mut self;
statement_uuid.take()
}
Expand All @@ -1356,14 +1363,13 @@ impl Drop for ExecuteContextExtra {
///
/// This struct collects a bundle of state that needs to be threaded
/// through various functions as part of statement execution.
/// Currently, it is only used to finalize execution, by calling one
/// of the methods `retire` or `retire_aysnc`. Finalizing execution
/// It is used to finalize execution, by calling `retire`. Finalizing execution
/// involves sending the session back to the pgwire layer so that it
/// may be used to process further commands. In the future, it will
/// also involve performing some work on the main coordinator thread
/// may be used to process further commands. It also involves
/// performing some work on the main coordinator thread
/// (e.g., recording the time at which the statement finished
/// executing) the state necessary to perform this work is bundled in
/// the `ExecuteContextExtra` object (today, it is simply empty).
/// executing). The state necessary to perform this work is bundled in
/// the `ExecuteContextExtra` object.
Copy link
Contributor Author

@ggevay ggevay Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The functionality that this comment covers is not actually being modified. The comment is being modified because it was extremely outdated.)

#[derive(Debug)]
pub struct ExecuteContext {
inner: Box<ExecuteContextInner>,
Expand Down Expand Up @@ -3762,13 +3768,14 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) {
let ws_id = self.controller.install_compute_watch_set(objects, t);
) -> Result<(), CollectionLookupError> {
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Had to make these fallible for the same reason as a lot of things that the frontend peek sequencing calls have to be fallible: the query's dependencies might disappear during sequencing at any moment. This was not the case in the old peek sequencing, when the sequencing itself was occupying the main coordinator task.)

self.connection_watch_sets
.entry(conn_id.clone())
.or_default()
.insert(ws_id);
self.installed_watch_sets.insert(ws_id, (conn_id, state));
Ok(())
}

/// Install a _watch set_ in the controller that is automatically associated with the given
Expand All @@ -3780,13 +3787,14 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) {
let ws_id = self.controller.install_storage_watch_set(objects, t);
) -> Result<(), CollectionLookupError> {
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
self.connection_watch_sets
.entry(conn_id.clone())
.or_default()
.insert(ws_id);
self.installed_watch_sets.insert(ws_id, (conn_id, state));
Ok(())
}

/// Cancels pending watchsets associated with the provided connection id.
Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ use crate::coord::Coordinator;
use crate::coord::catalog_implications::parsed_state_updates::{
ParsedStateUpdate, ParsedStateUpdateKind,
};
use crate::coord::statement_logging::StatementLoggingId;
use crate::coord::timeline::TimelineState;
use crate::statement_logging::StatementEndedExecutionReason;
use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};

pub mod parsed_state_updates;
Expand Down
Loading