@@ -338,8 +338,6 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
338338 let mut peek_out = OutputBuilder :: from ( peek_out) ;
339339 let ( peek_duration_out, peek_duration) = demux. new_output ( ) ;
340340 let mut peek_duration_out = OutputBuilder :: from ( peek_duration_out) ;
341- let ( shutdown_duration_out, shutdown_duration) = demux. new_output ( ) ;
342- let mut shutdown_duration_out = OutputBuilder :: from ( shutdown_duration_out) ;
343341 let ( arrangement_heap_size_out, arrangement_heap_size) = demux. new_output ( ) ;
344342 let mut arrangement_heap_size_out = OutputBuilder :: from ( arrangement_heap_size_out) ;
345343 let ( arrangement_heap_capacity_out, arrangement_heap_capacity) = demux. new_output ( ) ;
@@ -366,7 +364,6 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
366364 let mut import_frontier = import_frontier_out. activate ( ) ;
367365 let mut peek = peek_out. activate ( ) ;
368366 let mut peek_duration = peek_duration_out. activate ( ) ;
369- let mut shutdown_duration = shutdown_duration_out. activate ( ) ;
370367 let mut arrangement_heap_size = arrangement_heap_size_out. activate ( ) ;
371368 let mut arrangement_heap_capacity = arrangement_heap_capacity_out. activate ( ) ;
372369 let mut arrangement_heap_allocations = arrangement_heap_allocations_out. activate ( ) ;
@@ -383,7 +380,6 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
383380 import_frontier : import_frontier. session_with_builder ( & cap) ,
384381 peek : peek. session_with_builder ( & cap) ,
385382 peek_duration : peek_duration. session_with_builder ( & cap) ,
386- shutdown_duration : shutdown_duration. session_with_builder ( & cap) ,
387383 arrangement_heap_allocations : arrangement_heap_allocations. session_with_builder ( & cap) ,
388384 arrangement_heap_capacity : arrangement_heap_capacity. session_with_builder ( & cap) ,
389385 arrangement_heap_size : arrangement_heap_size. session_with_builder ( & cap) ,
@@ -424,7 +420,6 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
424420 ( OperatorHydrationStatus , operator_hydration_status) ,
425421 ( PeekCurrent , peek) ,
426422 ( PeekDuration , peek_duration) ,
427- ( ShutdownDuration , shutdown_duration) ,
428423 ] ;
429424
430425 // Build the output arrangements.
@@ -476,12 +471,6 @@ struct DemuxState<A> {
476471 scratch_string_b : String ,
477472 /// State tracked per dataflow export.
478473 exports : BTreeMap < GlobalId , ExportState > ,
479- /// Maps live dataflows to counts of their exports.
480- dataflow_export_counts : BTreeMap < usize , u32 > ,
481- /// Maps dropped dataflows to their drop time.
482- dataflow_drop_times : BTreeMap < usize , Duration > ,
483- /// Contains dataflows that have shut down but not yet been dropped.
484- shutdown_dataflows : BTreeSet < usize > ,
485474 /// Maps pending peeks to their installation time.
486475 peek_stash : BTreeMap < Uuid , Duration > ,
487476 /// Arrangement size stash.
@@ -514,8 +503,6 @@ struct DemuxState<A> {
514503 peek_duration_packer : PermutedRowPacker ,
515504 /// A row packer for the peek output.
516505 peek_packer : PermutedRowPacker ,
517- /// A row packer for the shutdown duration output.
518- shutdown_duration_packer : PermutedRowPacker ,
519506 /// A row packer for the hydration time output.
520507 hydration_time_packer : PermutedRowPacker ,
521508}
@@ -528,9 +515,6 @@ impl<A: Scheduler> DemuxState<A> {
528515 scratch_string_a : String :: new ( ) ,
529516 scratch_string_b : String :: new ( ) ,
530517 exports : Default :: default ( ) ,
531- dataflow_export_counts : Default :: default ( ) ,
532- dataflow_drop_times : Default :: default ( ) ,
533- shutdown_dataflows : Default :: default ( ) ,
534518 peek_stash : Default :: default ( ) ,
535519 arrangement_size : Default :: default ( ) ,
536520 lir_mapping : Default :: default ( ) ,
@@ -554,7 +538,6 @@ impl<A: Scheduler> DemuxState<A> {
554538 ) ,
555539 peek_duration_packer : PermutedRowPacker :: new ( ComputeLog :: PeekDuration ) ,
556540 peek_packer : PermutedRowPacker :: new ( ComputeLog :: PeekCurrent ) ,
557- shutdown_duration_packer : PermutedRowPacker :: new ( ComputeLog :: ShutdownDuration ) ,
558541 }
559542 }
560543
@@ -727,14 +710,6 @@ impl<A: Scheduler> DemuxState<A> {
727710 Datum :: MzTimestamp ( time) ,
728711 ] )
729712 }
730-
731- /// Pack a shutdown duration update key-value for the given time bucket.
732- fn pack_shutdown_duration_update ( & mut self , bucket : u128 ) -> ( & RowRef , & RowRef ) {
733- self . shutdown_duration_packer . pack_slice ( & [
734- Datum :: UInt64 ( u64:: cast_from ( self . worker_id ) ) ,
735- Datum :: UInt64 ( bucket. try_into ( ) . expect ( "bucket too big" ) ) ,
736- ] )
737- }
738713}
739714
740715/// State tracked for each dataflow export.
@@ -781,7 +756,6 @@ struct DemuxOutput<'a, 'b> {
781756 import_frontier : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
782757 peek : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
783758 peek_duration : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
784- shutdown_duration : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
785759 arrangement_heap_allocations : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
786760 arrangement_heap_capacity : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
787761 arrangement_heap_size : OutputSessionColumnar < ' a , ' b , Update < ( Row , Row ) > > ,
@@ -862,12 +836,6 @@ impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
862836 error ! ( %export_id, "export already registered" ) ;
863837 }
864838
865- * self
866- . state
867- . dataflow_export_counts
868- . entry ( dataflow_index)
869- . or_default ( ) += 1 ;
870-
871839 // Insert hydration time logging for this export.
872840 let datum = self . state . pack_hydration_time_update ( export_id, None ) ;
873841 self . output . hydration_time . give ( ( datum, ts, Diff :: ONE ) ) ;
@@ -889,18 +857,6 @@ impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
889857 let datum = self . state . pack_export_update ( export_id, dataflow_index) ;
890858 self . output . export . give ( ( datum, ts, Diff :: MINUS_ONE ) ) ;
891859
892- match self . state . dataflow_export_counts . get_mut ( & dataflow_index) {
893- entry @ Some ( 0 ) | entry @ None => {
894- error ! (
895- %export_id,
896- %dataflow_index,
897- "invalid dataflow_export_counts entry at time of export drop: {entry:?}" ,
898- ) ;
899- }
900- Some ( 1 ) => self . handle_dataflow_dropped ( dataflow_index) ,
901- Some ( count) => * count -= 1 ,
902- }
903-
904860 // Remove error count logging for this export.
905861 if export. error_count != Diff :: ZERO {
906862 let datum = self
@@ -928,46 +884,12 @@ impl<A: Scheduler> DemuxHandler<'_, '_, '_, A> {
928884 }
929885 }
930886
931- fn handle_dataflow_dropped ( & mut self , dataflow_index : usize ) {
932- let ts = self . ts ( ) ;
933- self . state . dataflow_export_counts . remove ( & dataflow_index) ;
934-
935- if self . state . shutdown_dataflows . remove ( & dataflow_index) {
936- // Dataflow has already shut down before it was dropped.
937- let datum = self . state . pack_shutdown_duration_update ( 0 ) ;
938- self . output . shutdown_duration . give ( ( datum, ts, Diff :: ONE ) ) ;
939- } else {
940- // Dataflow has not yet shut down.
941- let existing = self
942- . state
943- . dataflow_drop_times
944- . insert ( dataflow_index, self . time ) ;
945- if existing. is_some ( ) {
946- error ! ( %dataflow_index, "dataflow already dropped" ) ;
947- }
948- }
949- }
950-
951887 fn handle_dataflow_shutdown (
952888 & mut self ,
953889 DataflowShutdownReference { dataflow_index } : Ref < ' _ , DataflowShutdown > ,
954890 ) {
955891 let ts = self . ts ( ) ;
956892
957- if let Some ( start) = self . state . dataflow_drop_times . remove ( & dataflow_index) {
958- // Dataflow has already been dropped.
959- let elapsed_ns = self . time . saturating_sub ( start) . as_nanos ( ) ;
960- let elapsed_pow = elapsed_ns. next_power_of_two ( ) ;
961- let datum = self . state . pack_shutdown_duration_update ( elapsed_pow) ;
962- self . output . shutdown_duration . give ( ( datum, ts, Diff :: ONE ) ) ;
963- } else {
964- // Dataflow has not yet been dropped.
965- let was_new = self . state . shutdown_dataflows . insert ( dataflow_index) ;
966- if !was_new {
967- error ! ( %dataflow_index, "dataflow already shutdown" ) ;
968- }
969- }
970-
971893 // We deal with any `GlobalId` based mappings in this event.
972894 if let Some ( global_ids) = self . state . dataflow_global_ids . remove ( & dataflow_index) {
973895 for global_id in global_ids {
0 commit comments