@@ -11,12 +11,14 @@ use std::collections::BTreeMap;
1111use std:: collections:: BTreeSet ;
1212use std:: sync:: Arc ;
1313
14- use itertools:: Either ;
14+ use itertools:: { Either , Itertools } ;
1515use mz_adapter_types:: dyncfgs:: CONSTRAINT_BASED_TIMESTAMP_SELECTION ;
1616use mz_adapter_types:: timestamp_selection:: ConstraintBasedTimestampSelection ;
17+ use mz_compute_client:: controller:: error:: DataflowCreationError :: SinceViolation ;
1718use mz_compute_types:: ComputeInstanceId ;
1819use mz_expr:: { CollectionPlan , ResultSpec } ;
1920use mz_ore:: cast:: { CastFrom , CastLossy } ;
21+ use mz_ore:: collections:: CollectionExt ;
2022use mz_ore:: now:: EpochMillis ;
2123use mz_repr:: optimize:: { OptimizerFeatures , OverrideFrom } ;
2224use mz_repr:: role_id:: RoleId ;
@@ -30,6 +32,7 @@ use mz_sql_parser::ast::{CopyDirection, ExplainStage, Statement};
3032use mz_transform:: EmptyStatisticsOracle ;
3133use mz_transform:: dataflow:: DataflowMetainfo ;
3234use opentelemetry:: trace:: TraceContextExt ;
35+ use timely:: PartialOrder ;
3336use tracing:: { Span , debug} ;
3437use tracing_opentelemetry:: OpenTelemetrySpanExt ;
3538
@@ -578,6 +581,36 @@ impl PeekClient {
578581 }
579582 } ;
580583
584+ {
585+ // Assert that we have a read hold for all the collections in our `input_id_bundle`.
586+ for id in input_id_bundle. iter ( ) {
587+ let s = read_holds. storage_holds . contains_key ( & id) ;
588+ let c = read_holds
589+ . compute_ids ( )
590+ . map ( |( _instance, coll) | coll)
591+ . contains ( & id) ;
592+ assert ! ( s || c) ;
593+ }
594+
595+ // Assert that the each part of the `input_id_bundle` corresponds to the right part of
596+ // `read_holds`.
597+ for id in input_id_bundle. storage_ids . iter ( ) {
598+ assert ! ( read_holds. storage_holds. contains_key( id) ) ;
599+ }
600+ for id in input_id_bundle
601+ . compute_ids
602+ . iter ( )
603+ . flat_map ( |( _instance, colls) | colls)
604+ {
605+ assert ! (
606+ read_holds
607+ . compute_ids( )
608+ . map( |( _instance, coll) | coll)
609+ . contains( id)
610+ ) ;
611+ }
612+ }
613+
581614 // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
582615 // this when we decide what to with `AS OF` in transactions.)
583616 // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
@@ -981,6 +1014,34 @@ impl PeekClient {
9811014 . await ?
9821015 }
9831016 PeekPlan :: SlowPath ( dataflow_plan) => {
1017+ {
1018+ // Assert that we have some read holds for all the imports of the dataflow.
1019+ for id in dataflow_plan. desc . source_imports . keys ( ) {
1020+ assert ! ( read_holds. storage_holds. contains_key( id) ) ;
1021+ }
1022+ for id in dataflow_plan. desc . index_imports . keys ( ) {
1023+ assert ! (
1024+ read_holds
1025+ . compute_ids( )
1026+ . map( |( _instance, coll) | coll)
1027+ . contains( id)
1028+ ) ;
1029+ }
1030+
1031+ // Also check the holds against the as_of. (plus other minor things)
1032+ for ( id, h) in read_holds. storage_holds . iter ( ) {
1033+ assert_eq ! ( * id, h. id) ;
1034+ let as_of = dataflow_plan. desc . as_of . unwrap ( ) . into_element ( ) ;
1035+ assert ! ( h. since. less_equal( & as_of) ) ;
1036+ }
1037+ for ( ( instance, id) , h) in read_holds. compute_holds . iter ( ) {
1038+ assert_eq ! ( * id, h. id) ;
1039+ assert_eq ! ( * instance, target_cluster_id) ;
1040+ let as_of = dataflow_plan. desc . as_of . unwrap ( ) . into_element ( ) ;
1041+ assert ! ( h. since. less_equal( & as_of) ) ;
1042+ }
1043+ }
1044+
9841045 self . call_coordinator ( |tx| Command :: ExecuteSlowPathPeek {
9851046 dataflow_plan : Box :: new ( dataflow_plan) ,
9861047 determination,
0 commit comments