Skip to content

Commit fbc92be

Browse files
committed
Frontend peek: statement logging
1 parent 48e6cb0 commit fbc92be

File tree

21 files changed

+1847
-595
lines changed

21 files changed

+1847
-595
lines changed

src/adapter/src/client.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl Client {
264264
transient_id_gen,
265265
optimizer_metrics,
266266
persist_client,
267+
statement_logging_frontend,
267268
} = response;
268269

269270
let peek_client = PeekClient::new(
@@ -272,6 +273,7 @@ impl Client {
272273
transient_id_gen,
273274
optimizer_metrics,
274275
persist_client,
276+
statement_logging_frontend,
275277
);
276278

277279
let mut client = SessionClient {
@@ -692,6 +694,9 @@ impl SessionClient {
692694
/// Executes a previously-bound portal.
693695
///
694696
/// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
697+
///
698+
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
699+
/// triggering the execution of the underlying query.
695700
#[mz_ore::instrument(level = "debug")]
696701
pub async fn execute(
697702
&mut self,
@@ -704,11 +709,21 @@ impl SessionClient {
704709
// Attempt peek sequencing in the session task.
705710
// If unsupported, fall back to the Coordinator path.
706711
// TODO(peek-seq): wire up cancel_future
707-
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
712+
let mut outer_ctx_extra = outer_ctx_extra;
713+
if let Some(resp) = self
714+
.try_frontend_peek(&portal_name, &mut outer_ctx_extra)
715+
.await?
716+
{
708717
debug!("frontend peek succeeded");
718+
// Frontend peek handled the execution and retired outer_ctx_extra if it existed.
719+
// No additional work needed here.
709720
return Ok((resp, execute_started));
710721
} else {
711-
debug!("frontend peek did not happen");
722+
debug!("frontend peek did not happen, falling back to `Command::Execute`");
723+
// If we bailed out, outer_ctx_extra is still present (if it was originally).
724+
// `Command::Execute` will handle it.
725+
// (This is not true if we bailed out _after_ the frontend peek sequencing has already
726+
// begun its own statement logging. That case would be a bug.)
712727
}
713728

714729
let response = self
@@ -1020,7 +1035,10 @@ impl SessionClient {
10201035
| Command::StoreTransactionReadHolds { .. }
10211036
| Command::ExecuteSlowPathPeek { .. }
10221037
| Command::ExecuteCopyTo { .. }
1023-
| Command::ExecuteSideEffectingFunc { .. } => {}
1038+
| Command::ExecuteSideEffectingFunc { .. }
1039+
| Command::RegisterFrontendPeek { .. }
1040+
| Command::UnregisterFrontendPeek { .. }
1041+
| Command::FrontendStatementLogging(..) => {}
10241042
};
10251043
cmd
10261044
});
@@ -1105,16 +1123,20 @@ impl SessionClient {
11051123

11061124
/// Attempt to sequence a peek from the session task.
11071125
///
1108-
/// Returns Some(response) if we handled the peek, or None to fall back to the Coordinator's
1109-
/// peek sequencing.
1126+
/// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
1127+
/// Coordinator's sequencing. If it returns an error, it should be returned to the user.
1128+
///
1129+
/// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
1130+
/// triggering the execution of the underlying query.
11101131
pub(crate) async fn try_frontend_peek(
11111132
&mut self,
11121133
portal_name: &str,
1134+
outer_ctx_extra: &mut Option<ExecuteContextExtra>,
11131135
) -> Result<Option<ExecuteResponse>, AdapterError> {
11141136
if self.enable_frontend_peek_sequencing {
11151137
let session = self.session.as_mut().expect("SessionClient invariant");
11161138
self.peek_client
1117-
.try_frontend_peek_inner(portal_name, session)
1139+
.try_frontend_peek(portal_name, session, outer_ctx_extra)
11181140
.await
11191141
} else {
11201142
Ok(None)

src/adapter/src/command.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
5050
use crate::coord::timestamp_selection::TimestampDetermination;
5151
use crate::error::AdapterError;
5252
use crate::session::{EndTransactionAction, RowBatchStream, Session};
53-
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
53+
use crate::statement_logging::WatchSetCreation;
54+
use crate::statement_logging::{
55+
FrontendStatementLoggingEvent, StatementEndedExecutionReason, StatementExecutionStrategy,
56+
StatementLoggingFrontend,
57+
};
5458
use crate::util::Transmittable;
5559
use crate::webhook::AppendWebhookResponse;
5660
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
@@ -210,6 +214,9 @@ pub enum Command {
210214
conn_id: ConnectionId,
211215
max_result_size: u64,
212216
max_query_result_size: Option<u64>,
217+
/// If statement logging is enabled, contains all info needed for installing watch sets
218+
/// and logging the statement execution.
219+
watch_set: Option<WatchSetCreation>,
213220
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
214221
},
215222

@@ -219,6 +226,9 @@ pub enum Command {
219226
target_replica: Option<ReplicaId>,
220227
source_ids: BTreeSet<GlobalId>,
221228
conn_id: ConnectionId,
229+
/// If statement logging is enabled, contains all info needed for installing watch sets
230+
/// and logging the statement execution.
231+
watch_set: Option<WatchSetCreation>,
222232
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
223233
},
224234

@@ -230,6 +240,34 @@ pub enum Command {
230240
current_role: RoleId,
231241
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
232242
},
243+
244+
/// Register a pending peek initiated by frontend sequencing. This is needed for:
245+
/// - statement logging
246+
/// - query cancellation
247+
RegisterFrontendPeek {
248+
uuid: Uuid,
249+
conn_id: ConnectionId,
250+
cluster_id: mz_controller_types::ClusterId,
251+
depends_on: BTreeSet<GlobalId>,
252+
is_fast_path: bool,
253+
/// If statement logging is enabled, contains all info needed for installing watch sets
254+
/// and logging the statement execution.
255+
watch_set: Option<WatchSetCreation>,
256+
tx: oneshot::Sender<Result<(), AdapterError>>,
257+
},
258+
259+
/// Unregister a pending peek that was registered but failed to issue.
260+
/// This is used for cleanup when `client.peek()` fails after `RegisterFrontendPeek` succeeds.
261+
/// The `ExecuteContextExtra` is dropped without logging the statement retirement, because the
262+
/// frontend will log the error.
263+
UnregisterFrontendPeek {
264+
uuid: Uuid,
265+
tx: oneshot::Sender<()>,
266+
},
267+
268+
/// Statement logging event from frontend peek sequencing.
269+
/// No response channel needed - this is fire-and-forget.
270+
FrontendStatementLogging(FrontendStatementLoggingEvent),
233271
}
234272

235273
impl Command {
@@ -257,7 +295,10 @@ impl Command {
257295
| Command::StoreTransactionReadHolds { .. }
258296
| Command::ExecuteSlowPathPeek { .. }
259297
| Command::ExecuteCopyTo { .. }
260-
| Command::ExecuteSideEffectingFunc { .. } => None,
298+
| Command::ExecuteSideEffectingFunc { .. }
299+
| Command::RegisterFrontendPeek { .. }
300+
| Command::UnregisterFrontendPeek { .. }
301+
| Command::FrontendStatementLogging(..) => None,
261302
}
262303
}
263304

@@ -285,7 +326,10 @@ impl Command {
285326
| Command::StoreTransactionReadHolds { .. }
286327
| Command::ExecuteSlowPathPeek { .. }
287328
| Command::ExecuteCopyTo { .. }
288-
| Command::ExecuteSideEffectingFunc { .. } => None,
329+
| Command::ExecuteSideEffectingFunc { .. }
330+
| Command::RegisterFrontendPeek { .. }
331+
| Command::UnregisterFrontendPeek { .. }
332+
| Command::FrontendStatementLogging(..) => None,
289333
}
290334
}
291335
}
@@ -318,6 +362,7 @@ pub struct StartupResponse {
318362
pub transient_id_gen: Arc<TransientIdGen>,
319363
pub optimizer_metrics: OptimizerMetrics,
320364
pub persist_client: PersistClient,
365+
pub statement_logging_frontend: StatementLoggingFrontend,
321366
}
322367

323368
/// The response to [`Client::authenticate`](crate::Client::authenticate).

src/adapter/src/coord.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ use mz_catalog::memory::objects::{
104104
};
105105
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106106
use mz_compute_client::as_of_selection;
107-
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
107+
use mz_compute_client::controller::error::{
108+
CollectionLookupError, DataflowCreationError, InstanceMissing,
109+
};
108110
use mz_compute_types::ComputeInstanceId;
109111
use mz_compute_types::dataflows::DataflowDescription;
110112
use mz_compute_types::plan::Plan;
@@ -190,7 +192,7 @@ use crate::coord::cluster_scheduling::SchedulingDecision;
190192
use crate::coord::id_bundle::CollectionIdBundle;
191193
use crate::coord::introspection::IntrospectionSubscribe;
192194
use crate::coord::peek::PendingPeek;
193-
use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
195+
use crate::coord::statement_logging::StatementLogging;
194196
use crate::coord::timeline::{TimelineContext, TimelineState};
195197
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196198
use crate::coord::validity::PlanValidity;
@@ -203,7 +205,9 @@ use crate::optimize::dataflows::{
203205
};
204206
use crate::optimize::{self, Optimize, OptimizerConfig};
205207
use crate::session::{EndTransactionAction, Session};
206-
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
208+
use crate::statement_logging::{
209+
StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
210+
};
207211
use crate::util::{ClientTransmitter, ResultExt};
208212
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209213
use crate::{AdapterNotice, ReadHolds, flags};
@@ -374,6 +378,9 @@ impl Message {
374378
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
375379
Command::ExecuteCopyTo { .. } => "execute-copy-to",
376380
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
381+
Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
382+
Command::UnregisterFrontendPeek { .. } => "unregister-frontend-peek",
383+
Command::FrontendStatementLogging(..) => "frontend-statement-logging",
377384
},
378385
Message::ControllerReady {
379386
controller: ControllerReadiness::Compute,
@@ -1332,7 +1339,7 @@ impl ExecuteContextExtra {
13321339
/// called from code that knows what to do to finish up logging
13331340
/// based on the inner value.
13341341
#[must_use]
1335-
fn retire(mut self) -> Option<StatementLoggingId> {
1342+
pub(crate) fn retire(mut self) -> Option<StatementLoggingId> {
13361343
let Self { statement_uuid } = &mut self;
13371344
statement_uuid.take()
13381345
}
@@ -1356,14 +1363,13 @@ impl Drop for ExecuteContextExtra {
13561363
///
13571364
/// This struct collects a bundle of state that needs to be threaded
13581365
/// through various functions as part of statement execution.
1359-
/// Currently, it is only used to finalize execution, by calling one
1360-
/// of the methods `retire` or `retire_aysnc`. Finalizing execution
1366+
/// It is used to finalize execution, by calling `retire`. Finalizing execution
13611367
/// involves sending the session back to the pgwire layer so that it
1362-
/// may be used to process further commands. In the future, it will
1363-
/// also involve performing some work on the main coordinator thread
1368+
/// may be used to process further commands. It also involves
1369+
/// performing some work on the main coordinator thread
13641370
/// (e.g., recording the time at which the statement finished
1365-
/// executing) the state necessary to perform this work is bundled in
1366-
/// the `ExecuteContextExtra` object (today, it is simply empty).
1371+
/// executing). The state necessary to perform this work is bundled in
1372+
/// the `ExecuteContextExtra` object.
13671373
#[derive(Debug)]
13681374
pub struct ExecuteContext {
13691375
inner: Box<ExecuteContextInner>,
@@ -3762,13 +3768,14 @@ impl Coordinator {
37623768
objects: BTreeSet<GlobalId>,
37633769
t: Timestamp,
37643770
state: WatchSetResponse,
3765-
) {
3766-
let ws_id = self.controller.install_compute_watch_set(objects, t);
3771+
) -> Result<(), CollectionLookupError> {
3772+
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
37673773
self.connection_watch_sets
37683774
.entry(conn_id.clone())
37693775
.or_default()
37703776
.insert(ws_id);
37713777
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3778+
Ok(())
37723779
}
37733780

37743781
/// Install a _watch set_ in the controller that is automatically associated with the given
@@ -3780,13 +3787,14 @@ impl Coordinator {
37803787
objects: BTreeSet<GlobalId>,
37813788
t: Timestamp,
37823789
state: WatchSetResponse,
3783-
) {
3784-
let ws_id = self.controller.install_storage_watch_set(objects, t);
3790+
) -> Result<(), CollectionLookupError> {
3791+
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
37853792
self.connection_watch_sets
37863793
.entry(conn_id.clone())
37873794
.or_default()
37883795
.insert(ws_id);
37893796
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3797+
Ok(())
37903798
}
37913799

37923800
/// Cancels pending watchsets associated with the provided connection id.

src/adapter/src/coord/catalog_implications.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ use crate::coord::Coordinator;
6262
use crate::coord::catalog_implications::parsed_state_updates::{
6363
ParsedStateUpdate, ParsedStateUpdateKind,
6464
};
65-
use crate::coord::statement_logging::StatementLoggingId;
6665
use crate::coord::timeline::TimelineState;
67-
use crate::statement_logging::StatementEndedExecutionReason;
66+
use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
6867
use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
6968

7069
pub mod parsed_state_updates;

0 commit comments

Comments
 (0)