@@ -11,12 +11,13 @@ use std::collections::BTreeMap;
1111use std:: collections:: BTreeSet ;
1212use std:: sync:: Arc ;
1313
14- use itertools:: Either ;
14+ use itertools:: { Either , Itertools } ;
1515use mz_adapter_types:: dyncfgs:: CONSTRAINT_BASED_TIMESTAMP_SELECTION ;
1616use mz_adapter_types:: timestamp_selection:: ConstraintBasedTimestampSelection ;
1717use mz_compute_types:: ComputeInstanceId ;
1818use mz_expr:: { CollectionPlan , ResultSpec } ;
1919use mz_ore:: cast:: { CastFrom , CastLossy } ;
20+ use mz_ore:: collections:: CollectionExt ;
2021use mz_ore:: now:: EpochMillis ;
2122use mz_repr:: optimize:: { OptimizerFeatures , OverrideFrom } ;
2223use mz_repr:: role_id:: RoleId ;
@@ -578,6 +579,36 @@ impl PeekClient {
578579 }
579580 } ;
580581
582+ {
583+ // Assert that we have a read hold for all the collections in our `input_id_bundle`.
584+ for id in input_id_bundle. iter ( ) {
585+ let s = read_holds. storage_holds . contains_key ( & id) ;
586+ let c = read_holds
587+ . compute_ids ( )
588+ . map ( |( _instance, coll) | coll)
589+ . contains ( & id) ;
590+ assert ! ( s || c) ;
591+ }
592+
593+ // Assert that the each part of the `input_id_bundle` corresponds to the right part of
594+ // `read_holds`.
595+ for id in input_id_bundle. storage_ids . iter ( ) {
596+ assert ! ( read_holds. storage_holds. contains_key( id) ) ;
597+ }
598+ for id in input_id_bundle
599+ . compute_ids
600+ . iter ( )
601+ . flat_map ( |( _instance, colls) | colls)
602+ {
603+ assert ! (
604+ read_holds
605+ . compute_ids( )
606+ . map( |( _instance, coll) | coll)
607+ . contains( id)
608+ ) ;
609+ }
610+ }
611+
581612 // (TODO(peek-seq): The below TODO is copied from the old peek sequencing. We should resolve
582613 // this when we decide what to with `AS OF` in transactions.)
583614 // TODO: Checking for only `InTransaction` and not `Implied` (also `Started`?) seems
@@ -981,6 +1012,36 @@ impl PeekClient {
9811012 . await ?
9821013 }
9831014 PeekPlan :: SlowPath ( dataflow_plan) => {
1015+ {
1016+ // Assert that we have some read holds for all the imports of the dataflow.
1017+ for id in dataflow_plan. desc . source_imports . keys ( ) {
1018+ assert ! ( read_holds. storage_holds. contains_key( id) ) ;
1019+ }
1020+ for id in dataflow_plan. desc . index_imports . keys ( ) {
1021+ assert ! (
1022+ read_holds
1023+ . compute_ids( )
1024+ . map( |( _instance, coll) | coll)
1025+ . contains( id)
1026+ ) ;
1027+ }
1028+
1029+ // Also check the holds against the as_of. (plus other minor things)
1030+ for ( id, h) in read_holds. storage_holds . iter ( ) {
1031+ assert_eq ! ( * id, h. id) ;
1032+ let as_of =
1033+ dataflow_plan. desc . as_of . clone ( ) . unwrap ( ) . into_element ( ) ;
1034+ assert ! ( h. since. less_equal( & as_of) ) ;
1035+ }
1036+ for ( ( instance, id) , h) in read_holds. compute_holds . iter ( ) {
1037+ assert_eq ! ( * id, h. id) ;
1038+ assert_eq ! ( * instance, target_cluster_id) ;
1039+ let as_of =
1040+ dataflow_plan. desc . as_of . clone ( ) . unwrap ( ) . into_element ( ) ;
1041+ assert ! ( h. since. less_equal( & as_of) ) ;
1042+ }
1043+ }
1044+
9841045 self . call_coordinator ( |tx| Command :: ExecuteSlowPathPeek {
9851046 dataflow_plan : Box :: new ( dataflow_plan) ,
9861047 determination,
0 commit comments