diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 926c9f72d0122..4753ffec79d22 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -196,7 +196,7 @@ def get_variable_system_parameters( ), VariableSystemParameter( "enable_frontend_peek_sequencing", - "false", + "true", ["true", "false"], ), VariableSystemParameter( diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index d3636dce3d9f3..1425bf607ae1f 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1343,9 +1343,9 @@ impl Drop for ExecuteContextExtra { // Note: the impact when this error hits // is that the statement will never be marked // as finished in the statement log. - soft_panic_or_log!( - "execute context for statement {statement_uuid:?} dropped without being properly retired." - ); + // soft_panic_or_log!( + // "execute context for statement {statement_uuid:?} dropped without being properly retired." + // ); } } } diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index 69bf7146fd82f..40ddf8679db4b 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -411,9 +411,13 @@ pub(crate) fn timedomain_for<'a, I>( where I: IntoIterator, { + let mut orig_uses_ids = Vec::new(); + // Gather all the used schemas. let mut schemas = BTreeSet::new(); for id in uses_ids { + orig_uses_ids.push(id.clone()); + let entry = catalog.get_entry_by_global_id(id); let name = entry.name(); schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec)); @@ -452,6 +456,15 @@ where collection_ids.extend(global_ids); } + { + // Assert that we got back a superset of the original ids. + // This should be true, because the query is able to reference only the latest version of + // each object. + for id in orig_uses_ids.iter() { + assert!(collection_ids.contains(id)); + } + } + // Gather the dependencies of those items. let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids); diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 166dc5b0281cb..e8bc8752f2153 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -11,12 +11,13 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::sync::Arc; -use itertools::Either; +use itertools::{Either, Itertools}; use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION; use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection; use mz_compute_types::ComputeInstanceId; use mz_expr::{CollectionPlan, ResultSpec}; use mz_ore::cast::{CastFrom, CastLossy}; +use mz_ore::collections::CollectionExt; use mz_ore::now::EpochMillis; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; @@ -578,6 +579,36 @@ impl PeekClient { } }; + { + // Assert that we have a read hold for all the collections in our `input_id_bundle`. + for id in input_id_bundle.iter() { + let s = read_holds.storage_holds.contains_key(&id); + let c = read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(&id); + assert!(s || c); + } + + // Assert that the each part of the `input_id_bundle` corresponds to the right part of + // `read_holds`. + for id in input_id_bundle.storage_ids.iter() { + assert!(read_holds.storage_holds.contains_key(id)); + } + for id in input_id_bundle + .compute_ids + .iter() + .flat_map(|(_instance, colls)| colls) + { + assert!( + read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(id) + ); + } + } + // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve // this when we decide what to with `AS OF` in transactions.) // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems @@ -981,6 +1012,44 @@ impl PeekClient { .await? } PeekPlan::SlowPath(dataflow_plan) => { + { + // Assert that we have some read holds for all the imports of the dataflow. + for id in dataflow_plan.desc.source_imports.keys() { + assert!(read_holds.storage_holds.contains_key(id)); + } + for id in dataflow_plan.desc.index_imports.keys() { + assert!( + read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(id) + ); + } + + // Also check the holds against the as_of. (plus other minor things) + for (id, h) in read_holds.storage_holds.iter() { + assert_eq!(*id, h.id); + let as_of = dataflow_plan + .desc + .as_of + .clone() + .expect("dataflow has an as_of") + .into_element(); + assert!(h.since.less_equal(&as_of)); + } + for ((instance, id), h) in read_holds.compute_holds.iter() { + assert_eq!(*id, h.id); + assert_eq!(*instance, target_cluster_id); + let as_of = dataflow_plan + .desc + .as_of + .clone() + .expect("dataflow has an as_of") + .into_element(); + assert!(h.since.less_equal(&as_of)); + } + } + self.call_coordinator(|tx| Command::ExecuteSlowPathPeek { dataflow_plan: Box::new(dataflow_plan), determination, diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index e51729b25ce89..c6ab372e4513d 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -31,10 +31,10 @@ pub type ChangeTx = Arc< /// the issuer behind the scenes. pub struct ReadHold { /// Identifies that collection that we have a hold on. - id: GlobalId, + pub id: GlobalId, /// The times at which we hold. - since: Antichain, + pub since: Antichain, /// For communicating changes to this read hold back to whoever issued it. change_tx: ChangeTx,