Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def get_variable_system_parameters(
),
VariableSystemParameter(
"enable_frontend_peek_sequencing",
"false",
"true",
["true", "false"],
),
VariableSystemParameter(
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,9 +1343,9 @@ impl Drop for ExecuteContextExtra {
// Note: the impact when this error hits
// is that the statement will never be marked
// as finished in the statement log.
soft_panic_or_log!(
"execute context for statement {statement_uuid:?} dropped without being properly retired."
);
// soft_panic_or_log!(
// "execute context for statement {statement_uuid:?} dropped without being properly retired."
// );
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/adapter/src/coord/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,13 @@ pub(crate) fn timedomain_for<'a, I>(
where
I: IntoIterator<Item = &'a GlobalId>,
{
let mut orig_uses_ids = Vec::new();

// Gather all the used schemas.
let mut schemas = BTreeSet::new();
for id in uses_ids {
orig_uses_ids.push(id.clone());

let entry = catalog.get_entry_by_global_id(id);
let name = entry.name();
schemas.insert((name.qualifiers.database_spec, name.qualifiers.schema_spec));
Expand Down Expand Up @@ -452,6 +456,15 @@ where
collection_ids.extend(global_ids);
}

{
// Assert that we got back a superset of the original ids.
// This should be true, because the query is able to reference only the latest version of
// each object.
for id in orig_uses_ids.iter() {
assert!(collection_ids.contains(id));
}
}

// Gather the dependencies of those items.
let mut id_bundle: CollectionIdBundle = dataflow_builder.sufficient_collections(collection_ids);

Expand Down
71 changes: 70 additions & 1 deletion src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::Arc;

use itertools::Either;
use itertools::{Either, Itertools};
use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
use mz_compute_types::ComputeInstanceId;
use mz_expr::{CollectionPlan, ResultSpec};
use mz_ore::cast::{CastFrom, CastLossy};
use mz_ore::collections::CollectionExt;
use mz_ore::now::EpochMillis;
use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
use mz_repr::role_id::RoleId;
Expand Down Expand Up @@ -578,6 +579,36 @@ impl PeekClient {
}
};

{
// Assert that we have a read hold for all the collections in our `input_id_bundle`.
for id in input_id_bundle.iter() {
let s = read_holds.storage_holds.contains_key(&id);
let c = read_holds
.compute_ids()
.map(|(_instance, coll)| coll)
.contains(&id);
assert!(s || c);
}

// Assert that the each part of the `input_id_bundle` corresponds to the right part of
// `read_holds`.
for id in input_id_bundle.storage_ids.iter() {
assert!(read_holds.storage_holds.contains_key(id));
}
for id in input_id_bundle
.compute_ids
.iter()
.flat_map(|(_instance, colls)| colls)
{
assert!(
read_holds
.compute_ids()
.map(|(_instance, coll)| coll)
.contains(id)
);
}
}

// (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
// this when we decide what to with `AS OF` in transactions.)
// TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
Expand Down Expand Up @@ -981,6 +1012,44 @@ impl PeekClient {
.await?
}
PeekPlan::SlowPath(dataflow_plan) => {
{
// Assert that we have some read holds for all the imports of the dataflow.
for id in dataflow_plan.desc.source_imports.keys() {
assert!(read_holds.storage_holds.contains_key(id));
}
for id in dataflow_plan.desc.index_imports.keys() {
assert!(
read_holds
.compute_ids()
.map(|(_instance, coll)| coll)
.contains(id)
);
}

// Also check the holds against the as_of. (plus other minor things)
for (id, h) in read_holds.storage_holds.iter() {
assert_eq!(*id, h.id);
let as_of = dataflow_plan
.desc
.as_of
.clone()
.expect("dataflow has an as_of")
.into_element();
assert!(h.since.less_equal(&as_of));
}
for ((instance, id), h) in read_holds.compute_holds.iter() {
assert_eq!(*id, h.id);
assert_eq!(*instance, target_cluster_id);
let as_of = dataflow_plan
.desc
.as_of
.clone()
.expect("dataflow has an as_of")
.into_element();
assert!(h.since.less_equal(&as_of));
}
}

self.call_coordinator(|tx| Command::ExecuteSlowPathPeek {
dataflow_plan: Box::new(dataflow_plan),
determination,
Expand Down
4 changes: 2 additions & 2 deletions src/storage-types/src/read_holds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ pub type ChangeTx<T> = Arc<
/// the issuer behind the scenes.
pub struct ReadHold<T: TimelyTimestamp> {
/// Identifies that collection that we have a hold on.
id: GlobalId,
pub id: GlobalId,

/// The times at which we hold.
since: Antichain<T>,
pub since: Antichain<T>,

/// For communicating changes to this read hold back to whoever issued it.
change_tx: ChangeTx<T>,
Expand Down