Skip to content

Commit 37e1c7b

Browse files
authored
Merge pull request #34262 from ggevay/frontend-peek-rbac-and-statistics
Frontend peek sequencing -- rbac and statistics
2 parents ba926e9 + bb9c6a2 commit 37e1c7b

File tree

5 files changed

+124
-106
lines changed

5 files changed

+124
-106
lines changed

src/adapter/src/coord/sequencer.rs

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
//! Logic for executing a planned SQL query.
1414
15-
use std::collections::BTreeSet;
15+
use std::collections::{BTreeMap, BTreeSet};
1616
use std::str::FromStr;
1717
use std::sync::Arc;
1818

@@ -40,17 +40,21 @@ use mz_sql::plan::{
4040
};
4141
use mz_sql::rbac;
4242
use mz_sql::session::metadata::SessionMetadata;
43+
use mz_sql::session::vars;
4344
use mz_sql::session::vars::SessionVars;
4445
use mz_sql_parser::ast::{Raw, Statement};
4546
use mz_storage_client::client::TableData;
4647
use mz_storage_client::storage_collections::StorageCollections;
4748
use mz_storage_types::connections::inline::IntoInlineConnection;
49+
use mz_storage_types::controller::StorageError;
4850
use mz_storage_types::stats::RelationPartStats;
4951
use mz_transform::dataflow::DataflowMetainfo;
5052
use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53+
use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
5154
use timely::progress::Antichain;
55+
use timely::progress::Timestamp as TimelyTimestamp;
5256
use tokio::sync::oneshot;
53-
use tracing::{Instrument, Level, Span, event};
57+
use tracing::{Instrument, Level, Span, event, warn};
5458

5559
use crate::ExecuteContext;
5660
use crate::catalog::{Catalog, CatalogState};
@@ -183,14 +187,14 @@ impl Coordinator {
183187

184188
if let Err(e) = rbac::check_plan(
185189
&session_catalog,
186-
|id| {
190+
Some(|id| {
187191
// We use linear search through active connections if needed, which is fine
188192
// because the RBAC check will call the closure at most once.
189193
self.active_conns()
190194
.into_iter()
191195
.find(|(conn_id, _)| conn_id.unhandled() == id)
192196
.map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
193-
},
197+
}),
194198
ctx.session(),
195199
&plan,
196200
target_cluster_id,
@@ -1205,3 +1209,88 @@ pub(crate) async fn explain_plan_inner(
12051209

12061210
Ok(rows)
12071211
}
1212+
1213+
/// Creates a statistics oracle for query optimization.
1214+
///
1215+
/// This is a free-standing function that can be called from both the old peek sequencing
1216+
/// and the new frontend peek sequencing.
1217+
pub(crate) async fn statistics_oracle(
1218+
session: &Session,
1219+
source_ids: &BTreeSet<GlobalId>,
1220+
query_as_of: &Antichain<Timestamp>,
1221+
is_oneshot: bool,
1222+
system_config: &vars::SystemVars,
1223+
storage_collections: &dyn StorageCollections<Timestamp = Timestamp>,
1224+
) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1225+
if !session.vars().enable_session_cardinality_estimates() {
1226+
return Ok(Box::new(EmptyStatisticsOracle));
1227+
}
1228+
1229+
let timeout = if is_oneshot {
1230+
// TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
1231+
system_config.optimizer_oneshot_stats_timeout()
1232+
} else {
1233+
system_config.optimizer_stats_timeout()
1234+
};
1235+
1236+
let cached_stats = mz_ore::future::timeout(
1237+
timeout,
1238+
CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1239+
)
1240+
.await;
1241+
1242+
match cached_stats {
1243+
Ok(stats) => Ok(Box::new(stats)),
1244+
Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1245+
warn!(
1246+
is_oneshot = is_oneshot,
1247+
"optimizer statistics collection timed out after {}ms",
1248+
timeout.as_millis()
1249+
);
1250+
1251+
Ok(Box::new(EmptyStatisticsOracle))
1252+
}
1253+
Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1254+
}
1255+
}
1256+
1257+
#[derive(Debug)]
1258+
struct CachedStatisticsOracle {
1259+
cache: BTreeMap<GlobalId, usize>,
1260+
}
1261+
1262+
impl CachedStatisticsOracle {
1263+
pub async fn new<T: TimelyTimestamp>(
1264+
ids: &BTreeSet<GlobalId>,
1265+
as_of: &Antichain<T>,
1266+
storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
1267+
) -> Result<Self, StorageError<T>> {
1268+
let mut cache = BTreeMap::new();
1269+
1270+
for id in ids {
1271+
let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1272+
1273+
match stats {
1274+
Ok(stats) => {
1275+
cache.insert(*id, stats.num_updates);
1276+
}
1277+
Err(StorageError::IdentifierMissing(id)) => {
1278+
::tracing::debug!("no statistics for {id}")
1279+
}
1280+
Err(e) => return Err(e),
1281+
}
1282+
}
1283+
1284+
Ok(Self { cache })
1285+
}
1286+
}
1287+
1288+
impl StatisticsOracle for CachedStatisticsOracle {
1289+
fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1290+
self.cache.get(&id).map(|estimate| *estimate)
1291+
}
1292+
1293+
fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1294+
self.cache.clone()
1295+
}
1296+
}

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 8 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,9 @@ use mz_storage_client::controller::{CollectionDescription, DataSource, ExportDes
8686
use mz_storage_types::AlterCompatible;
8787
use mz_storage_types::connections::inline::IntoInlineConnection;
8888
use mz_storage_types::controller::StorageError;
89-
use mz_transform::EmptyStatisticsOracle;
9089
use mz_transform::dataflow::DataflowMetainfo;
9190
use smallvec::SmallVec;
9291
use timely::progress::Antichain;
93-
use timely::progress::Timestamp as TimelyTimestamp;
9492
use tokio::sync::{oneshot, watch};
9593
use tracing::{Instrument, Span, info, warn};
9694

@@ -5124,93 +5122,23 @@ impl Coordinator {
51245122

51255123
Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
51265124
}
5127-
}
5128-
5129-
#[derive(Debug)]
5130-
struct CachedStatisticsOracle {
5131-
cache: BTreeMap<GlobalId, usize>,
5132-
}
5133-
5134-
impl CachedStatisticsOracle {
5135-
pub async fn new<T: TimelyTimestamp>(
5136-
ids: &BTreeSet<GlobalId>,
5137-
as_of: &Antichain<T>,
5138-
storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
5139-
) -> Result<Self, StorageError<T>> {
5140-
let mut cache = BTreeMap::new();
5141-
5142-
for id in ids {
5143-
let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
51445125

5145-
match stats {
5146-
Ok(stats) => {
5147-
cache.insert(*id, stats.num_updates);
5148-
}
5149-
Err(StorageError::IdentifierMissing(id)) => {
5150-
::tracing::debug!("no statistics for {id}")
5151-
}
5152-
Err(e) => return Err(e),
5153-
}
5154-
}
5155-
5156-
Ok(Self { cache })
5157-
}
5158-
}
5159-
5160-
impl mz_transform::StatisticsOracle for CachedStatisticsOracle {
5161-
fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
5162-
self.cache.get(&id).map(|estimate| *estimate)
5163-
}
5164-
5165-
fn as_map(&self) -> BTreeMap<GlobalId, usize> {
5166-
self.cache.clone()
5167-
}
5168-
}
5169-
5170-
impl Coordinator {
51715126
pub(super) async fn statistics_oracle(
51725127
&self,
51735128
session: &Session,
51745129
source_ids: &BTreeSet<GlobalId>,
51755130
query_as_of: &Antichain<Timestamp>,
51765131
is_oneshot: bool,
51775132
) -> Result<Box<dyn mz_transform::StatisticsOracle>, AdapterError> {
5178-
if !session.vars().enable_session_cardinality_estimates() {
5179-
return Ok(Box::new(EmptyStatisticsOracle));
5180-
}
5181-
5182-
let timeout = if is_oneshot {
5183-
// TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
5184-
self.catalog()
5185-
.system_config()
5186-
.optimizer_oneshot_stats_timeout()
5187-
} else {
5188-
self.catalog().system_config().optimizer_stats_timeout()
5189-
};
5190-
5191-
let cached_stats = mz_ore::future::timeout(
5192-
timeout,
5193-
CachedStatisticsOracle::new(
5194-
source_ids,
5195-
query_as_of,
5196-
self.controller.storage_collections.as_ref(),
5197-
),
5133+
super::statistics_oracle(
5134+
session,
5135+
source_ids,
5136+
query_as_of,
5137+
is_oneshot,
5138+
self.catalog().system_config(),
5139+
self.controller.storage_collections.as_ref(),
51985140
)
5199-
.await;
5200-
5201-
match cached_stats {
5202-
Ok(stats) => Ok(Box::new(stats)),
5203-
Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
5204-
warn!(
5205-
is_oneshot = is_oneshot,
5206-
"optimizer statistics collection timed out after {}ms",
5207-
timeout.as_millis()
5208-
);
5209-
5210-
Ok(Box::new(EmptyStatisticsOracle))
5211-
}
5212-
Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
5213-
}
5141+
.await
52145142
}
52155143
}
52165144

src/adapter/src/frontend_peek.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use mz_expr::{CollectionPlan, ResultSpec};
1919
use mz_ore::cast::{CastFrom, CastLossy};
2020
use mz_ore::now::EpochMillis;
2121
use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
22+
use mz_repr::role_id::RoleId;
2223
use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
2324
use mz_sql::catalog::CatalogCluster;
2425
use mz_sql::plan::{self, Plan, QueryWhen};
@@ -35,7 +36,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
3536
use crate::catalog::CatalogState;
3637
use crate::command::Command;
3738
use crate::coord::peek::PeekPlan;
38-
use crate::coord::sequencer::eval_copy_to_uri;
39+
use crate::coord::sequencer::{eval_copy_to_uri, statistics_oracle};
3940
use crate::coord::timestamp_selection::TimestampDetermination;
4041
use crate::coord::{Coordinator, CopyToContext, ExplainContext, ExplainPlanContext, TargetCluster};
4142
use crate::explain::insights::PlanInsightsContext;
@@ -300,11 +301,7 @@ impl PeekClient {
300301

301302
rbac::check_plan(
302303
&conn_catalog,
303-
|_id| {
304-
// This is only used by `Plan::SideEffectingFunc`, so it is irrelevant for us here
305-
// TODO(peek-seq): refactor `check_plan` to make this nicer
306-
unreachable!()
307-
},
304+
None::<fn(u32) -> Option<RoleId>>,
308305
session,
309306
&plan,
310307
Some(target_cluster_id),
@@ -518,14 +515,16 @@ impl PeekClient {
518515

519516
// # From peek_optimize
520517

521-
if session.vars().enable_session_cardinality_estimates() {
522-
debug!(
523-
"Bailing out from try_frontend_peek_inner, because of enable_session_cardinality_estimates"
524-
);
525-
return Ok(None);
526-
}
527-
// TODO(peek-seq): wire up statistics
528-
let stats = Box::new(EmptyStatisticsOracle);
518+
let stats = statistics_oracle(
519+
session,
520+
&source_ids,
521+
&determination.timestamp_context.antichain(),
522+
true,
523+
catalog.system_config(),
524+
&*self.storage_collections,
525+
)
526+
.await
527+
.unwrap_or_else(|_| Box::new(EmptyStatisticsOracle));
529528

530529
// Generate data structures that can be moved to another task where we will perform possibly
531530
// expensive optimizations.

src/sql/src/rbac.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,8 @@ pub fn check_usage(
337337
pub fn check_plan(
338338
catalog: &impl SessionCatalog,
339339
// Function mapping a connection ID to an authenticated role. The roles may have been dropped concurrently.
340-
active_conns: impl FnOnce(u32) -> Option<RoleId>,
340+
// Only required for Plan::SideEffectingFunc; can be None for other plan types.
341+
active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
341342
session: &dyn SessionMetadata,
342343
plan: &Plan,
343344
target_cluster_id: Option<ClusterId>,
@@ -377,7 +378,7 @@ pub fn is_rbac_enabled_for_session(
377378
fn generate_rbac_requirements(
378379
catalog: &impl SessionCatalog,
379380
plan: &Plan,
380-
active_conns: impl FnOnce(u32) -> Option<RoleId>,
381+
active_conns: Option<impl FnOnce(u32) -> Option<RoleId>>,
381382
target_cluster_id: Option<ClusterId>,
382383
role_id: RoleId,
383384
) -> RbacRequirements {
@@ -1464,11 +1465,12 @@ fn generate_rbac_requirements(
14641465
},
14651466
Plan::SideEffectingFunc(func) => {
14661467
let role_membership = match func {
1467-
SideEffectingFunc::PgCancelBackend { connection_id } => {
1468-
active_conns(*connection_id)
1469-
.map(|x| [x].into())
1470-
.unwrap_or_default()
1471-
}
1468+
SideEffectingFunc::PgCancelBackend { connection_id } => active_conns
1469+
.expect("active_conns is required for Plan::SideEffectingFunc")(
1470+
*connection_id
1471+
)
1472+
.map(|x| [x].into())
1473+
.unwrap_or_default(),
14721474
};
14731475
RbacRequirements {
14741476
role_membership,

src/storage-client/src/storage_collections.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ mod metrics;
8888
/// - Hands out [ReadHolds](ReadHold) that prevent a collection's since from
8989
/// advancing while it needs to be read at a specific time.
9090
#[async_trait]
91-
pub trait StorageCollections: Debug {
91+
pub trait StorageCollections: Debug + Sync {
9292
type Timestamp: TimelyTimestamp;
9393

9494
/// On boot, reconcile this [StorageCollections] with outside state. We get

0 commit comments

Comments
 (0)