Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def initialize(self) -> Testdrive:
ALTER SYSTEM SET statement_logging_max_sample_rate TO 1.0

$[version>=2600200] postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_frontend_peek_sequencing = false;
ALTER SYSTEM SET enable_frontend_peek_sequencing = true;
"""
)
)
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def get_variable_system_parameters(
),
VariableSystemParameter(
"enable_frontend_peek_sequencing",
"false",
"true",
["true", "false"],
),
VariableSystemParameter(
Expand Down
33 changes: 27 additions & 6 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
} = response;

let peek_client = PeekClient::new(
Expand All @@ -272,6 +273,7 @@ impl Client {
transient_id_gen,
optimizer_metrics,
persist_client,
statement_logging_frontend,
);

let mut client = SessionClient {
Expand Down Expand Up @@ -692,6 +694,9 @@ impl SessionClient {
/// Executes a previously-bound portal.
///
/// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
#[mz_ore::instrument(level = "debug")]
pub async fn execute(
&mut self,
Expand All @@ -704,11 +709,19 @@ impl SessionClient {
// Attempt peek sequencing in the session task.
// If unsupported, fall back to the Coordinator path.
// TODO(peek-seq): wire up cancel_future
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
let mut outer_ctx_extra = outer_ctx_extra;
if let Some(resp) = self
.try_frontend_peek(&portal_name, &mut outer_ctx_extra)
.await?
{
debug!("frontend peek succeeded");
// Frontend peek handled the execution and retired outer_ctx_extra if it existed.
// No additional work needed here.
return Ok((resp, execute_started));
} else {
debug!("frontend peek did not happen");
debug!("frontend peek did not happen, falling back to `Command::Execute`");
// If we bailed out, outer_ctx_extra is still present (if it was originally).
// `Command::Execute` will handle it.
}

let response = self
Expand Down Expand Up @@ -1020,7 +1033,11 @@ impl SessionClient {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => {}
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::ExplainTimestamp { .. }
| Command::FrontendStatementLogging(..) => {}
};
cmd
});
Expand Down Expand Up @@ -1105,16 +1122,20 @@ impl SessionClient {

/// Attempt to sequence a peek from the session task.
///
/// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's
/// peek sequencing.
/// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
/// Coordinator's sequencing. If it returns an error, it should be returned to the user.
///
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
/// triggering the execution of the underlying query.
pub(crate) async fn try_frontend_peek(
&mut self,
portal_name: &str,
outer_ctx_extra: &mut Option<ExecuteContextExtra>,
) -> Result<Option<ExecuteResponse>, AdapterError> {
if self.enable_frontend_peek_sequencing {
let session = self.session.as_mut().expect("SessionClient invariant");
self.peek_client
.try_frontend_peek_inner(portal_name, session)
.try_frontend_peek(portal_name, session, outer_ctx_extra)
.await
} else {
Ok(None)
Expand Down
70 changes: 66 additions & 4 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use chrono::{DateTime, Utc};
use derivative::Derivative;
use enum_kinds::EnumKind;
use futures::Stream;
Expand All @@ -21,6 +22,7 @@ use mz_auth::password::Password;
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_controller_types::ClusterId;
use mz_expr::RowSetFinishing;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_no_log;
Expand Down Expand Up @@ -50,10 +52,16 @@ use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::error::AdapterError;
use crate::session::{EndTransactionAction, RowBatchStream, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::statement_logging::WatchSetCreation;
use crate::statement_logging::{
FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
StatementLoggingFrontend,
};
use crate::util::Transmittable;
use crate::webhook::AppendWebhookResponse;
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
use crate::{
AdapterNotice, AppendWebhookError, CollectionIdBundle, ReadHolds, TimestampExplanation,
};

#[derive(Debug)]
pub struct CatalogSnapshot {
Expand Down Expand Up @@ -210,6 +218,9 @@ pub enum Command {
conn_id: ConnectionId,
max_result_size: u64,
max_query_result_size: Option<u64>,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -219,6 +230,9 @@ pub enum Command {
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
conn_id: ConnectionId,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

Expand All @@ -230,6 +244,45 @@ pub enum Command {
current_role: RoleId,
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
},

/// Register a pending peek initiated by frontend sequencing. This is needed for:
/// - statement logging
/// - query cancellation
RegisterFrontendPeek {
uuid: Uuid,
conn_id: ConnectionId,
cluster_id: mz_controller_types::ClusterId,
depends_on: BTreeSet<GlobalId>,
is_fast_path: bool,
/// If statement logging is enabled, contains all info needed for installing watch sets
/// and logging the statement execution.
watch_set: Option<WatchSetCreation>,
tx: oneshot::Sender<Result<(), AdapterError>>,
},

/// Unregister a pending peek that was registered but failed to issue.
/// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
/// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
/// frontend will log the error.
UnregisterFrontendPeek {
uuid: Uuid,
tx: oneshot::Sender<()>,
},

/// Generate a timestamp explanation.
/// This is used when `emit_timestamp_notice` is enabled.
ExplainTimestamp {
conn_id: ConnectionId,
session_wall_time: DateTime<Utc>,
cluster_id: ClusterId,
id_bundle: CollectionIdBundle,
determination: TimestampDetermination<mz_repr::Timestamp>,
tx: oneshot::Sender<TimestampExplanation<mz_repr::Timestamp>>,
},

/// Statement logging event from frontend peek sequencing.
/// No response channel needed - this is fire-and-forget.
FrontendStatementLogging(FrontendStatementLoggingEvent),
}

impl Command {
Expand Down Expand Up @@ -257,7 +310,11 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::ExplainTimestamp { .. }
| Command::FrontendStatementLogging(..) => None,
}
}

Expand Down Expand Up @@ -285,7 +342,11 @@ impl Command {
| Command::StoreTransactionReadHolds { .. }
| Command::ExecuteSlowPathPeek { .. }
| Command::ExecuteCopyTo { .. }
| Command::ExecuteSideEffectingFunc { .. } => None,
| Command::ExecuteSideEffectingFunc { .. }
| Command::RegisterFrontendPeek { .. }
| Command::UnregisterFrontendPeek { .. }
| Command::ExplainTimestamp { .. }
| Command::FrontendStatementLogging(..) => None,
}
}
}
Expand Down Expand Up @@ -318,6 +379,7 @@ pub struct StartupResponse {
pub transient_id_gen: Arc<TransientIdGen>,
pub optimizer_metrics: OptimizerMetrics,
pub persist_client: PersistClient,
pub statement_logging_frontend: StatementLoggingFrontend,
}

/// The response to [`Client::authenticate`](crate::Client::authenticate).
Expand Down
37 changes: 23 additions & 14 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ use mz_catalog::memory::objects::{
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
use mz_compute_client::controller::error::{
CollectionLookupError, DataflowCreationError, InstanceMissing,
};
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
Expand Down Expand Up @@ -190,7 +192,7 @@ use crate::coord::cluster_scheduling::SchedulingDecision;
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::introspection::IntrospectionSubscribe;
use crate::coord::peek::PendingPeek;
use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
use crate::coord::statement_logging::StatementLogging;
use crate::coord::timeline::{TimelineContext, TimelineState};
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
use crate::coord::validity::PlanValidity;
Expand All @@ -203,7 +205,9 @@ use crate::optimize::dataflows::{
};
use crate::optimize::{self, Optimize, OptimizerConfig};
use crate::session::{EndTransactionAction, Session};
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
use crate::statement_logging::{
StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
};
use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{AdapterNotice, ReadHolds, flags};
Expand Down Expand Up @@ -374,6 +378,10 @@ impl Message {
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
Command::ExecuteCopyTo { .. } => "execute-copy-to",
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
Command::ExplainTimestamp { .. } => "explain-timestamp",
Command::FrontendStatementLogging(..) => "frontend-statement-logging",
},
Message::ControllerReady {
controller: ControllerReadiness::Compute,
Expand Down Expand Up @@ -1332,7 +1340,7 @@ impl ExecuteContextExtra {
/// called from code that knows what to do to finish up logging
/// based on the inner value.
#[must_use]
fn retire(mut self) -> Option<StatementLoggingId> {
pub(crate) fn retire(mut self) -> Option<StatementLoggingId> {
let Self { statement_uuid } = &mut self;
statement_uuid.take()
}
Expand All @@ -1356,14 +1364,13 @@ impl Drop for ExecuteContextExtra {
///
/// This struct collects a bundle of state that needs to be threaded
/// through various functions as part of statement execution.
/// Currently, it is only used to finalize execution, by calling one
/// of the methods `retire` or `retire_aysnc`. Finalizing execution
/// It is used to finalize execution, by calling `retire`. Finalizing execution
/// involves sending the session back to the pgwire layer so that it
/// may be used to process further commands. In the future, it will
/// also involve performing some work on the main coordinator thread
/// may be used to process further commands. It also involves
/// performing some work on the main coordinator thread
/// (e.g., recording the time at which the statement finished
/// executing) the state necessary to perform this work is bundled in
/// the `ExecuteContextExtra` object (today, it is simply empty).
/// executing). The state necessary to perform this work is bundled in
/// the `ExecuteContextExtra` object.
#[derive(Debug)]
pub struct ExecuteContext {
inner: Box<ExecuteContextInner>,
Expand Down Expand Up @@ -3762,13 +3769,14 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) {
let ws_id = self.controller.install_compute_watch_set(objects, t);
) -> Result<(), CollectionLookupError> {
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
self.connection_watch_sets
.entry(conn_id.clone())
.or_default()
.insert(ws_id);
self.installed_watch_sets.insert(ws_id, (conn_id, state));
Ok(())
}

/// Install a _watch set_ in the controller that is automatically associated with the given
Expand All @@ -3780,13 +3788,14 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) {
let ws_id = self.controller.install_storage_watch_set(objects, t);
) -> Result<(), CollectionLookupError> {
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
self.connection_watch_sets
.entry(conn_id.clone())
.or_default()
.insert(ws_id);
self.installed_watch_sets.insert(ws_id, (conn_id, state));
Ok(())
}

/// Cancels pending watchsets associated with the provided connection id.
Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ use crate::coord::Coordinator;
use crate::coord::catalog_implications::parsed_state_updates::{
ParsedStateUpdate, ParsedStateUpdateKind,
};
use crate::coord::statement_logging::StatementLoggingId;
use crate::coord::timeline::TimelineState;
use crate::statement_logging::StatementEndedExecutionReason;
use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};

pub mod parsed_state_updates;
Expand Down
Loading