From 79b43b711dd646354048977877d5de5b580da447 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Tue, 25 Nov 2025 14:12:52 +0100 Subject: [PATCH 1/3] tests: Temporarily turn on enable_frontend_peek_sequencing in CI --- misc/python/materialize/mzcompose/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 926c9f72d0122..4753ffec79d22 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -196,7 +196,7 @@ def get_variable_system_parameters( ), VariableSystemParameter( "enable_frontend_peek_sequencing", - "false", + "true", ["true", "false"], ), VariableSystemParameter( From 63b4948de5f0a7910d3d45c11d541357d1983e87 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Mon, 1 Dec 2025 20:44:59 +0100 Subject: [PATCH 2/3] DNM Debugging: Assert various read hold stuff --- src/adapter/src/coord/timeline.rs | 13 ++++++ src/adapter/src/frontend_peek.rs | 71 ++++++++++++++++++++++++++++- src/storage-types/src/read_holds.rs | 4 +- 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index 69bf7146fd82f..40ddf8679db4b 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -411,9 +411,13 @@ pub(crate) fn timedomain_for<'a, I>( where I: IntoIterator, { + let mut orig_uses_ids = Vec::new(); + // Gather all the used schemas. let mut schemas = BTreeSet::new(); for id in uses_ids { + orig_uses_ids.push(id.clone()); + let entry = catalog.get_entry_by_global_id(id); let name = entry.name(); schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec)); @@ -452,6 +456,15 @@ where collection_ids.extend(global_ids); } + { + // Assert that we got back a superset of the original ids. + // This should be true, because the query is able to reference only the latest version of + // each object. + for id in orig_uses_ids.iter() { + assert!(collection_ids.contains(id)); + } + } + // Gather the dependencies of those items. let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids); diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 166dc5b0281cb..e8bc8752f2153 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -11,12 +11,13 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::sync::Arc; -use itertools::Either; +use itertools::{Either, Itertools}; use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION; use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection; use mz_compute_types::ComputeInstanceId; use mz_expr::{CollectionPlan, ResultSpec}; use mz_ore::cast::{CastFrom, CastLossy}; +use mz_ore::collections::CollectionExt; use mz_ore::now::EpochMillis; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; @@ -578,6 +579,36 @@ impl PeekClient { } }; + { + // Assert that we have a read hold for all the collections in our `input_id_bundle`. + for id in input_id_bundle.iter() { + let s = read_holds.storage_holds.contains_key(&id); + let c = read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(&id); + assert!(s || c); + } + + // Assert that the each part of the `input_id_bundle` corresponds to the right part of + // `read_holds`. + for id in input_id_bundle.storage_ids.iter() { + assert!(read_holds.storage_holds.contains_key(id)); + } + for id in input_id_bundle + .compute_ids + .iter() + .flat_map(|(_instance, colls)| colls) + { + assert!( + read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(id) + ); + } + } + // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve // this when we decide what to with `AS OF` in transactions.) // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems @@ -981,6 +1012,44 @@ impl PeekClient { .await? } PeekPlan::SlowPath(dataflow_plan) => { + { + // Assert that we have some read holds for all the imports of the dataflow. + for id in dataflow_plan.desc.source_imports.keys() { + assert!(read_holds.storage_holds.contains_key(id)); + } + for id in dataflow_plan.desc.index_imports.keys() { + assert!( + read_holds + .compute_ids() + .map(|(_instance, coll)| coll) + .contains(id) + ); + } + + // Also check the holds against the as_of. (plus other minor things) + for (id, h) in read_holds.storage_holds.iter() { + assert_eq!(*id, h.id); + let as_of = dataflow_plan + .desc + .as_of + .clone() + .expect("dataflow has an as_of") + .into_element(); + assert!(h.since.less_equal(&as_of)); + } + for ((instance, id), h) in read_holds.compute_holds.iter() { + assert_eq!(*id, h.id); + assert_eq!(*instance, target_cluster_id); + let as_of = dataflow_plan + .desc + .as_of + .clone() + .expect("dataflow has an as_of") + .into_element(); + assert!(h.since.less_equal(&as_of)); + } + } + self.call_coordinator(|tx| Command::ExecuteSlowPathPeek { dataflow_plan: Box::new(dataflow_plan), determination, diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index e51729b25ce89..c6ab372e4513d 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -31,10 +31,10 @@ pub type ChangeTx = Arc< /// the issuer behind the scenes. pub struct ReadHold { /// Identifies that collection that we have a hold on. - id: GlobalId, + pub id: GlobalId, /// The times at which we hold. - since: Antichain, + pub since: Antichain, /// For communicating changes to this read hold back to whoever issued it. change_tx: ChangeTx, From f3e2885df4d1f26f753c640a6025122592cc9581 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Tue, 2 Dec 2025 04:57:53 +0100 Subject: [PATCH 3/3] Comment out the ExecuteContextExtra drop assert --- src/adapter/src/coord.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index d3636dce3d9f3..1425bf607ae1f 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1343,9 +1343,9 @@ impl Drop for ExecuteContextExtra { // Note: the impact when this error hits // is that the statement will never be marked // as finished in the statement log. - soft_panic_or_log!( - "execute context for statement {statement_uuid:?} dropped without being properly retired." - ); + // soft_panic_or_log!( + // "execute context for statement {statement_uuid:?} dropped without being properly retired." + // ); } } }