Skip to content

Commit bb49ead

Browse files
committed
Frontend peek: statement logging
1 parent f286d77 commit bb49ead

File tree

20 files changed

+1764
-576
lines changed

20 files changed

+1764
-576
lines changed

src/adapter/src/client.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ impl Client {
264264
transient_id_gen,
265265
optimizer_metrics,
266266
persist_client,
267+
statement_logging_frontend,
267268
} = response;
268269

269270
let peek_client = PeekClient::new(
@@ -272,6 +273,7 @@ impl Client {
272273
transient_id_gen,
273274
optimizer_metrics,
274275
persist_client,
276+
statement_logging_frontend,
275277
);
276278

277279
let mut client = SessionClient {
@@ -704,11 +706,19 @@ impl SessionClient {
704706
// Attempt peek sequencing in the session task.
705707
// If unsupported, fall back to the Coordinator path.
706708
// TODO(peek-seq): wire up cancel_future
707-
if let Some(resp) = self.try_frontend_peek(&portal_name).await? {
709+
let mut outer_ctx_extra = outer_ctx_extra;
710+
if let Some(resp) = self
711+
.try_frontend_peek(&portal_name, &mut outer_ctx_extra)
712+
.await?
713+
{
708714
debug!("frontend peek succeeded");
715+
// Frontend peek handled the execution and retired outer_ctx_extra if it existed.
716+
// No additional work needed here.
709717
return Ok((resp, execute_started));
710718
} else {
711-
debug!("frontend peek did not happen");
719+
debug!("frontend peek did not happen, falling back to `Command::Execute`");
720+
// If we bailed out, outer_ctx_extra is still present (if it was originally).
721+
// `Command::Execute` will handle it.
712722
}
713723

714724
let response = self
@@ -1020,7 +1030,9 @@ impl SessionClient {
10201030
| Command::StoreTransactionReadHolds { .. }
10211031
| Command::ExecuteSlowPathPeek { .. }
10221032
| Command::ExecuteCopyTo { .. }
1023-
| Command::ExecuteSideEffectingFunc { .. } => {}
1033+
| Command::ExecuteSideEffectingFunc { .. }
1034+
| Command::RegisterFrontendPeek { .. }
1035+
| Command::FrontendStatementLogging(..) => {}
10241036
};
10251037
cmd
10261038
});
@@ -1110,11 +1122,12 @@ impl SessionClient {
11101122
pub(crate) async fn try_frontend_peek(
11111123
&mut self,
11121124
portal_name: &str,
1125+
outer_ctx_extra: &mut Option<ExecuteContextExtra>,
11131126
) -> Result<Option<ExecuteResponse>, AdapterError> {
11141127
if self.enable_frontend_peek_sequencing {
11151128
let session = self.session.as_mut().expect("SessionClient invariant");
11161129
self.peek_client
1117-
.try_frontend_peek_inner(portal_name, session)
1130+
.try_frontend_peek(portal_name, session, outer_ctx_extra)
11181131
.await
11191132
} else {
11201133
Ok(None)

src/adapter/src/command.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,13 @@ use crate::coord::ExecuteContextExtra;
4747
use crate::coord::appends::BuiltinTableAppendNotify;
4848
use crate::coord::consistency::CoordinatorInconsistencies;
4949
use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
50+
use crate::coord::statement_logging::WatchSetCreation;
5051
use crate::coord::timestamp_selection::TimestampDetermination;
5152
use crate::error::AdapterError;
5253
use crate::session::{EndTransactionAction, RowBatchStream, Session};
53-
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
54+
use crate::statement_logging::{
55+
StatementEndedExecutionReason, StatementExecutionStrategy, StatementLoggingFrontend,
56+
};
5457
use crate::util::Transmittable;
5558
use crate::webhook::AppendWebhookResponse;
5659
use crate::{AdapterNotice, AppendWebhookError, ReadHolds};
@@ -210,6 +213,9 @@ pub enum Command {
210213
conn_id: ConnectionId,
211214
max_result_size: u64,
212215
max_query_result_size: Option<u64>,
216+
/// If statement logging is enabled, contains all info needed for installing watch sets
217+
/// and logging the statement execution.
218+
watch_set: Option<WatchSetCreation>,
213219
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
214220
},
215221

@@ -219,6 +225,9 @@ pub enum Command {
219225
target_replica: Option<ReplicaId>,
220226
source_ids: BTreeSet<GlobalId>,
221227
conn_id: ConnectionId,
228+
/// If statement logging is enabled, contains all info needed for installing watch sets
229+
/// and logging the statement execution.
230+
watch_set: Option<WatchSetCreation>,
222231
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
223232
},
224233

@@ -230,6 +239,24 @@ pub enum Command {
230239
current_role: RoleId,
231240
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
232241
},
242+
243+
/// Register a pending peek initiated by frontend sequencing.
244+
/// The Coordinator will track this peek and log its final result when it completes.
245+
RegisterFrontendPeek {
246+
uuid: Uuid,
247+
conn_id: ConnectionId,
248+
cluster_id: mz_controller_types::ClusterId,
249+
depends_on: BTreeSet<GlobalId>,
250+
is_fast_path: bool,
251+
/// If statement logging is enabled, contains all info needed for installing watch sets
252+
/// and logging the statement execution.
253+
watch_set: Option<WatchSetCreation>,
254+
tx: oneshot::Sender<Result<(), AdapterError>>,
255+
},
256+
257+
/// Statement logging event from frontend peek sequencing.
258+
/// No response channel needed - this is fire-and-forget.
259+
FrontendStatementLogging(crate::statement_logging::FrontendStatementLoggingEvent),
233260
}
234261

235262
impl Command {
@@ -257,7 +284,9 @@ impl Command {
257284
| Command::StoreTransactionReadHolds { .. }
258285
| Command::ExecuteSlowPathPeek { .. }
259286
| Command::ExecuteCopyTo { .. }
260-
| Command::ExecuteSideEffectingFunc { .. } => None,
287+
| Command::ExecuteSideEffectingFunc { .. }
288+
| Command::RegisterFrontendPeek { .. }
289+
| Command::FrontendStatementLogging(..) => None,
261290
}
262291
}
263292

@@ -285,7 +314,9 @@ impl Command {
285314
| Command::StoreTransactionReadHolds { .. }
286315
| Command::ExecuteSlowPathPeek { .. }
287316
| Command::ExecuteCopyTo { .. }
288-
| Command::ExecuteSideEffectingFunc { .. } => None,
317+
| Command::ExecuteSideEffectingFunc { .. }
318+
| Command::RegisterFrontendPeek { .. }
319+
| Command::FrontendStatementLogging(..) => None,
289320
}
290321
}
291322
}
@@ -318,6 +349,7 @@ pub struct StartupResponse {
318349
pub transient_id_gen: Arc<TransientIdGen>,
319350
pub optimizer_metrics: OptimizerMetrics,
320351
pub persist_client: PersistClient,
352+
pub statement_logging_frontend: StatementLoggingFrontend,
321353
}
322354

323355
/// The response to [`Client::authenticate`](crate::Client::authenticate).

src/adapter/src/coord.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ use mz_catalog::memory::objects::{
104104
};
105105
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106106
use mz_compute_client::as_of_selection;
107-
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
107+
use mz_compute_client::controller::error::{
108+
CollectionLookupError, DataflowCreationError, InstanceMissing,
109+
};
108110
use mz_compute_types::ComputeInstanceId;
109111
use mz_compute_types::dataflows::DataflowDescription;
110112
use mz_compute_types::plan::Plan;
@@ -190,7 +192,7 @@ use crate::coord::cluster_scheduling::SchedulingDecision;
190192
use crate::coord::id_bundle::CollectionIdBundle;
191193
use crate::coord::introspection::IntrospectionSubscribe;
192194
use crate::coord::peek::PendingPeek;
193-
use crate::coord::statement_logging::{StatementLogging, StatementLoggingId};
195+
use crate::coord::statement_logging::StatementLogging;
194196
use crate::coord::timeline::{TimelineContext, TimelineState};
195197
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
196198
use crate::coord::validity::PlanValidity;
@@ -203,7 +205,9 @@ use crate::optimize::dataflows::{
203205
};
204206
use crate::optimize::{self, Optimize, OptimizerConfig};
205207
use crate::session::{EndTransactionAction, Session};
206-
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
208+
use crate::statement_logging::{
209+
StatementEndedExecutionReason, StatementLifecycleEvent, StatementLoggingId,
210+
};
207211
use crate::util::{ClientTransmitter, ResultExt};
208212
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
209213
use crate::{AdapterNotice, ReadHolds, flags};
@@ -374,6 +378,8 @@ impl Message {
374378
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
375379
Command::ExecuteCopyTo { .. } => "execute-copy-to",
376380
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
381+
Command::RegisterFrontendPeek { .. } => "register-frontend-peek",
382+
Command::FrontendStatementLogging(..) => "frontend-statement-logging",
377383
},
378384
Message::ControllerReady {
379385
controller: ControllerReadiness::Compute,
@@ -1332,7 +1338,7 @@ impl ExecuteContextExtra {
13321338
/// called from code that knows what to do to finish up logging
13331339
/// based on the inner value.
13341340
#[must_use]
1335-
fn retire(mut self) -> Option<StatementLoggingId> {
1341+
pub(crate) fn retire(mut self) -> Option<StatementLoggingId> {
13361342
let Self { statement_uuid } = &mut self;
13371343
statement_uuid.take()
13381344
}
@@ -3763,13 +3769,14 @@ impl Coordinator {
37633769
objects: BTreeSet<GlobalId>,
37643770
t: Timestamp,
37653771
state: WatchSetResponse,
3766-
) {
3767-
let ws_id = self.controller.install_compute_watch_set(objects, t);
3772+
) -> Result<(), CollectionLookupError> {
3773+
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
37683774
self.connection_watch_sets
37693775
.entry(conn_id.clone())
37703776
.or_default()
37713777
.insert(ws_id);
37723778
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3779+
Ok(())
37733780
}
37743781

37753782
/// Install a _watch set_ in the controller that is automatically associated with the given
@@ -3781,13 +3788,14 @@ impl Coordinator {
37813788
objects: BTreeSet<GlobalId>,
37823789
t: Timestamp,
37833790
state: WatchSetResponse,
3784-
) {
3785-
let ws_id = self.controller.install_storage_watch_set(objects, t);
3791+
) -> Result<(), CollectionLookupError> {
3792+
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
37863793
self.connection_watch_sets
37873794
.entry(conn_id.clone())
37883795
.or_default()
37893796
.insert(ws_id);
37903797
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3798+
Ok(())
37913799
}
37923800

37933801
/// Cancels pending watchsets associated with the provided connection id.

src/adapter/src/coord/catalog_implications.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ use crate::coord::Coordinator;
6060
use crate::coord::catalog_implications::parsed_state_updates::{
6161
ParsedStateUpdate, ParsedStateUpdateKind,
6262
};
63-
use crate::coord::statement_logging::StatementLoggingId;
6463
use crate::coord::timeline::TimelineState;
65-
use crate::statement_logging::StatementEndedExecutionReason;
64+
use crate::statement_logging::{StatementEndedExecutionReason, StatementLoggingId};
6665
use crate::{AdapterError, CollectionIdBundle, ExecuteContext, ResultExt};
6766

6867
pub mod parsed_state_updates;

0 commit comments

Comments
 (0)