Skip to content

Commit eda01f4

Browse files
authored
Merge pull request #34246 from ggevay/frontend-peek-slow-path-peeks
Frontend peek sequencing -- slow-path peeks
2 parents 6bc6435 + ba66eec commit eda01f4

File tree

6 files changed

+156
-43
lines changed

6 files changed

+156
-43
lines changed

src/adapter/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,8 @@ impl SessionClient {
10151015
| Command::Dump { .. }
10161016
| Command::GetComputeInstanceClient { .. }
10171017
| Command::GetOracle { .. }
1018-
| Command::DetermineRealTimeRecentTimestamp { .. } => {}
1018+
| Command::DetermineRealTimeRecentTimestamp { .. }
1019+
| Command::ExecuteSlowPathPeek { .. } => {}
10191020
};
10201021
cmd
10211022
});

src/adapter/src/command.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ use enum_kinds::EnumKind;
1818
use futures::Stream;
1919
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
2020
use mz_auth::password::Password;
21+
use mz_cluster_client::ReplicaId;
2122
use mz_compute_types::ComputeInstanceId;
23+
use mz_expr::RowSetFinishing;
2224
use mz_ore::collections::CollectionExt;
2325
use mz_ore::soft_assert_no_log;
2426
use mz_ore::tracing::OpenTelemetryContext;
2527
use mz_persist_client::PersistClient;
2628
use mz_pgcopy::CopyFormatParams;
2729
use mz_repr::global_id::TransientIdGen;
2830
use mz_repr::role_id::RoleId;
29-
use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator};
31+
use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType};
3032
use mz_sql::ast::{FetchDirection, Raw, Statement};
3133
use mz_sql::catalog::ObjectType;
3234
use mz_sql::optimizer_metrics::OptimizerMetrics;
@@ -43,7 +45,8 @@ use crate::catalog::Catalog;
4345
use crate::coord::ExecuteContextExtra;
4446
use crate::coord::appends::BuiltinTableAppendNotify;
4547
use crate::coord::consistency::CoordinatorInconsistencies;
46-
use crate::coord::peek::PeekResponseUnary;
48+
use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary};
49+
use crate::coord::timestamp_selection::TimestampDetermination;
4750
use crate::error::AdapterError;
4851
use crate::session::{EndTransactionAction, RowBatchStream, Session};
4952
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
@@ -182,6 +185,20 @@ pub enum Command {
182185
real_time_recency_timeout: Duration,
183186
tx: oneshot::Sender<Result<Option<mz_repr::Timestamp>, AdapterError>>,
184187
},
188+
189+
ExecuteSlowPathPeek {
190+
dataflow_plan: Box<PeekDataflowPlan<mz_repr::Timestamp>>,
191+
determination: TimestampDetermination<mz_repr::Timestamp>,
192+
finishing: RowSetFinishing,
193+
compute_instance: ComputeInstanceId,
194+
target_replica: Option<ReplicaId>,
195+
intermediate_result_type: SqlRelationType,
196+
source_ids: BTreeSet<GlobalId>,
197+
conn_id: ConnectionId,
198+
max_result_size: u64,
199+
max_query_result_size: Option<u64>,
200+
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
201+
},
185202
}
186203

187204
impl Command {
@@ -204,7 +221,8 @@ impl Command {
204221
| Command::Dump { .. }
205222
| Command::GetComputeInstanceClient { .. }
206223
| Command::GetOracle { .. }
207-
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
224+
| Command::DetermineRealTimeRecentTimestamp { .. }
225+
| Command::ExecuteSlowPathPeek { .. } => None,
208226
}
209227
}
210228

@@ -227,7 +245,8 @@ impl Command {
227245
| Command::Dump { .. }
228246
| Command::GetComputeInstanceClient { .. }
229247
| Command::GetOracle { .. }
230-
| Command::DetermineRealTimeRecentTimestamp { .. } => None,
248+
| Command::DetermineRealTimeRecentTimestamp { .. }
249+
| Command::ExecuteSlowPathPeek { .. } => None,
231250
}
232251
}
233252
}

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ impl Message {
366366
Command::DetermineRealTimeRecentTimestamp { .. } => {
367367
"determine-real-time-recent-timestamp"
368368
}
369+
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
369370
},
370371
Message::ControllerReady {
371372
controller: ControllerReadiness::Compute,

src/adapter/src/coord/command_handler.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,37 @@ impl Coordinator {
313313
}
314314
}
315315
}
316+
317+
Command::ExecuteSlowPathPeek {
318+
dataflow_plan,
319+
determination,
320+
finishing,
321+
compute_instance,
322+
target_replica,
323+
intermediate_result_type,
324+
source_ids,
325+
conn_id,
326+
max_result_size,
327+
max_query_result_size,
328+
tx,
329+
} => {
330+
let result = self
331+
.implement_slow_path_peek(
332+
*dataflow_plan,
333+
determination,
334+
finishing,
335+
compute_instance,
336+
target_replica,
337+
intermediate_result_type,
338+
source_ids,
339+
conn_id,
340+
max_result_size,
341+
max_query_result_size,
342+
)
343+
.await;
344+
345+
let _ = tx.send(result);
346+
}
316347
}
317348
}
318349
.instrument(debug_span!("handle_command"))

src/adapter/src/coord/peek.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,55 @@ impl crate::coord::Coordinator {
11891189
pending_peek
11901190
}
11911191

1192+
/// Implements a slow-path peek by creating a transient dataflow.
1193+
/// This is called from the command handler for ExecuteSlowPathPeek.
1194+
///
1195+
/// (For now, this method simply delegates to implement_peek_plan by constructing
1196+
/// the necessary PlannedPeek structure and a minimal ExecuteContext.)
1197+
pub(crate) async fn implement_slow_path_peek(
1198+
&mut self,
1199+
dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1200+
determination: TimestampDetermination<mz_repr::Timestamp>,
1201+
finishing: RowSetFinishing,
1202+
compute_instance: ComputeInstanceId,
1203+
target_replica: Option<ReplicaId>,
1204+
intermediate_result_type: SqlRelationType,
1205+
source_ids: BTreeSet<GlobalId>,
1206+
conn_id: ConnectionId,
1207+
max_result_size: u64,
1208+
max_query_result_size: Option<u64>,
1209+
) -> Result<ExecuteResponse, AdapterError> {
1210+
let source_arity = intermediate_result_type.arity();
1211+
1212+
let planned_peek = PlannedPeek {
1213+
plan: PeekPlan::SlowPath(dataflow_plan),
1214+
determination,
1215+
conn_id,
1216+
intermediate_result_type,
1217+
source_arity,
1218+
source_ids,
1219+
};
1220+
1221+
// Create a minimal ExecuteContext
1222+
// TODO(peek-seq): Use the real context once we have statement logging.
1223+
let mut ctx_extra = ExecuteContextExtra::default();
1224+
1225+
// Call the old peek sequencing's implement_peek_plan for now.
1226+
// TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1227+
// relevant parts of the old `implement_peek_plan` into this method, and remove the old
1228+
// `implement_peek_plan`.
1229+
self.implement_peek_plan(
1230+
&mut ctx_extra,
1231+
planned_peek,
1232+
finishing,
1233+
compute_instance,
1234+
target_replica,
1235+
max_result_size,
1236+
max_query_result_size,
1237+
)
1238+
.await
1239+
}
1240+
11921241
/// Constructs an [`ExecuteResponse`] that that will send some rows to the
11931242
/// client immediately, as opposed to asking the dataflow layer to send along
11941243
/// the rows after some computation.

src/adapter/src/frontend_peek.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -731,16 +731,6 @@ impl PeekClient {
731731
session.add_notice(AdapterNotice::PlanInsights(insights));
732732
}
733733

734-
let fast_path_plan = match peek_plan {
735-
PeekPlan::SlowPath(_) => {
736-
debug!(
737-
"Bailing out from try_frontend_peek_inner, because it's a slow-path peek"
738-
);
739-
return Ok(None);
740-
}
741-
PeekPlan::FastPath(p) => p,
742-
};
743-
744734
// Warning: Do not bail out from the new peek sequencing after this point, because the
745735
// following has side effects. TODO(peek-seq): remove this comment once we never
746736
// bail out to the old sequencing.
@@ -798,35 +788,57 @@ impl PeekClient {
798788

799789
let max_result_size = catalog.system_config().max_result_size();
800790

801-
let row_set_finishing_seconds =
802-
session.metrics().row_set_finishing_seconds().clone();
803-
804-
let peek_stash_read_batch_size_bytes =
805-
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
806-
.get(catalog.system_config().dyncfgs());
807-
let peek_stash_read_memory_budget_bytes =
808-
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
809-
.get(catalog.system_config().dyncfgs());
810-
811-
// Implement the peek, and capture the response.
812-
let resp = self
813-
.implement_fast_path_peek_plan(
814-
fast_path_plan,
815-
determination.timestamp_context.timestamp_or_default(),
816-
select_plan.finishing.clone(),
817-
target_cluster_id,
818-
target_replica,
819-
typ,
820-
max_result_size,
821-
max_query_result_size,
822-
row_set_finishing_seconds,
823-
read_holds,
824-
peek_stash_read_batch_size_bytes,
825-
peek_stash_read_memory_budget_bytes,
826-
)
827-
.await?;
791+
match peek_plan {
792+
PeekPlan::FastPath(fast_path_plan) => {
793+
let row_set_finishing_seconds =
794+
session.metrics().row_set_finishing_seconds().clone();
828795

829-
Ok(Some(resp))
796+
let peek_stash_read_batch_size_bytes =
797+
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
798+
.get(catalog.system_config().dyncfgs());
799+
let peek_stash_read_memory_budget_bytes =
800+
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
801+
.get(catalog.system_config().dyncfgs());
802+
803+
let resp = self
804+
.implement_fast_path_peek_plan(
805+
fast_path_plan,
806+
determination.timestamp_context.timestamp_or_default(),
807+
select_plan.finishing,
808+
target_cluster_id,
809+
target_replica,
810+
typ,
811+
max_result_size,
812+
max_query_result_size,
813+
row_set_finishing_seconds,
814+
read_holds,
815+
peek_stash_read_batch_size_bytes,
816+
peek_stash_read_memory_budget_bytes,
817+
)
818+
.await?;
819+
820+
Ok(Some(resp))
821+
}
822+
PeekPlan::SlowPath(dataflow_plan) => {
823+
let response = self
824+
.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
825+
dataflow_plan: Box::new(dataflow_plan),
826+
determination,
827+
finishing: select_plan.finishing,
828+
compute_instance: target_cluster_id,
829+
target_replica,
830+
intermediate_result_type: typ,
831+
source_ids,
832+
conn_id: session.conn_id().clone(),
833+
max_result_size,
834+
max_query_result_size,
835+
tx,
836+
})
837+
.await?;
838+
839+
Ok(Some(response))
840+
}
841+
}
830842
}
831843
}
832844
}

0 commit comments

Comments
 (0)