@@ -24,24 +24,20 @@ use mz_ore::cast::CastFrom;
2424use mz_repr:: { Datum , Diff , GlobalId , Timestamp } ;
2525use mz_timely_util:: containers:: { Column , ColumnBuilder , ProvidedBuilder } ;
2626use mz_timely_util:: replay:: MzReplay ;
27- use timely:: communication:: Allocate ;
28- use timely:: container:: CapacityContainerBuilder ;
2927use timely:: dataflow:: channels:: pact:: Pipeline ;
30- use timely:: dataflow:: channels:: pushers:: buffer:: Session ;
31- use timely:: dataflow:: channels:: pushers:: { Counter , Tee } ;
3228use timely:: dataflow:: operators:: core:: Map ;
3329use timely:: dataflow:: operators:: generic:: builder_rc:: OperatorBuilder ;
34- use timely:: dataflow:: operators:: Operator ;
35- use timely:: dataflow:: { Scope , Stream } ;
30+ use timely:: dataflow:: operators:: { Concatenate , Enter , Operator } ;
31+ use timely:: dataflow:: { Scope , Stream , StreamCore } ;
3632use timely:: scheduling:: Scheduler ;
37- use timely:: worker:: Worker ;
3833use timely:: { Container , Data } ;
3934use tracing:: error;
4035use uuid:: Uuid ;
4136
4237use crate :: extensions:: arrange:: MzArrange ;
4338use crate :: logging:: {
44- ComputeLog , EventQueue , LogCollection , LogVariant , PermutedRowPacker , SharedLoggingState ,
39+ ComputeLog , EventQueue , LogCollection , LogVariant , OutputSessionColumnar , OutputSessionVec ,
40+ PermutedRowPacker , SharedLoggingState , Update ,
4541} ;
4642use crate :: row_spine:: { RowRowBatcher , RowRowBuilder } ;
4743use crate :: typedefs:: RowRowSpine ;
@@ -289,24 +285,32 @@ impl LirMetadata {
289285 }
290286}
291287
292- /// Constructs the logging dataflow for compute logs.
288+ /// The return type of the [`construct`] function.
289+ pub ( super ) struct Return {
290+ /// Collections returned by [`construct`].
291+ pub collections : BTreeMap < LogVariant , LogCollection > ,
292+ }
293+
294+ /// Constructs the logging dataflow fragment for compute logs.
293295///
294296/// Params
295- /// * `worker`: The Timely worker hosting the log analysis dataflow.
297+ /// * `scope`: The Timely scope hosting the log analysis dataflow.
298+ /// * `scheduler`: The timely scheduler to obtainer activators.
296299/// * `config`: Logging configuration.
297300/// * `event_queue`: The source to read compute log events from.
298- pub ( super ) fn construct < A : Allocate + ' static > (
299- worker : & mut timely:: worker:: Worker < A > ,
301+ /// * `compute_event_streams`: Additional compute event streams to absorb.
302+ /// * `shared_state`: Shared state between logging dataflow fragments.
303+ pub ( super ) fn construct < S : Scheduler + ' static , G : Scope < Timestamp = Timestamp > > (
304+ mut scope : G ,
305+ scheduler : S ,
300306 config : & mz_compute_client:: logging:: LoggingConfig ,
301307 event_queue : EventQueue < Column < ( Duration , ComputeEvent ) > > ,
308+ compute_event_streams : impl IntoIterator < Item = StreamCore < G , Column < ( Duration , ComputeEvent ) > > > ,
302309 shared_state : Rc < RefCell < SharedLoggingState > > ,
303- ) -> BTreeMap < LogVariant , LogCollection > {
310+ ) -> Return {
304311 let logging_interval_ms = std:: cmp:: max ( 1 , config. interval . as_millis ( ) ) ;
305- let worker_id = worker. index ( ) ;
306- let worker2 = worker. clone ( ) ;
307- let dataflow_index = worker. next_dataflow_index ( ) ;
308312
309- worker . dataflow_named ( "Dataflow: compute logging", move |scope| {
313+ scope . scoped ( " compute logging", move |scope| {
310314 let enable_logging = config. enable_logging ;
311315 let ( logs, token) = event_queue. links . mz_replay :: < _ , ProvidedBuilder < _ > , _ > (
312316 scope,
@@ -322,6 +326,12 @@ pub(super) fn construct<A: Allocate + 'static>(
322326 } ,
323327 ) ;
324328
329+ let logs = compute_event_streams
330+ . into_iter ( )
331+ . map ( |stream| stream. enter ( scope) )
332+ . chain ( std:: iter:: once ( logs) ) ;
333+ let logs = scope. concatenate ( logs) ;
334+
325335 // Build a demux operator that splits the replayed event stream up into the separate
326336 // logging streams.
327337 let mut demux = OperatorBuilder :: new ( "Compute Logging Demux" . to_string ( ) , scope. clone ( ) ) ;
@@ -341,7 +351,7 @@ pub(super) fn construct<A: Allocate + 'static>(
341351 let ( mut lir_mapping_out, lir_mapping) = demux. new_output ( ) ;
342352 let ( mut dataflow_global_ids_out, dataflow_global_ids) = demux. new_output ( ) ;
343353
344- let mut demux_state = DemuxState :: new ( worker2 ) ;
354+ let mut demux_state = DemuxState :: new ( scheduler ) ;
345355 demux. build ( move |_capability| {
346356 move |_frontiers| {
347357 let mut export = export_out. activate ( ) ;
@@ -390,6 +400,8 @@ pub(super) fn construct<A: Allocate + 'static>(
390400 }
391401 } ) ;
392402
403+ let worker_id = scope. index ( ) ;
404+
393405 // Encode the contents of each logging stream into its expected `Row` format.
394406 let mut packer = PermutedRowPacker :: new ( ComputeLog :: DataflowCurrent ) ;
395407 let dataflow_current = export. as_collection ( ) . map ( {
@@ -557,7 +569,7 @@ pub(super) fn construct<A: Allocate + 'static>(
557569 ] ;
558570
559571 // Build the output arrangements.
560- let mut result = BTreeMap :: new ( ) ;
572+ let mut collections = BTreeMap :: new ( ) ;
561573 for ( variant, collection) in logs {
562574 let variant = LogVariant :: Compute ( variant) ;
563575 if config. index_logs . contains_key ( & variant) {
@@ -569,13 +581,12 @@ pub(super) fn construct<A: Allocate + 'static>(
569581 let collection = LogCollection {
570582 trace,
571583 token : Rc :: clone ( & token) ,
572- dataflow_index,
573584 } ;
574- result . insert ( variant, collection) ;
585+ collections . insert ( variant, collection) ;
575586 }
576587 }
577588
578- result
589+ Return { collections }
579590 } )
580591}
581592
@@ -594,9 +605,9 @@ where
594605}
595606
596607/// State maintained by the demux operator.
597- struct DemuxState < A : Allocate > {
598- /// The worker hosting this operator .
599- worker : Worker < A > ,
608+ struct DemuxState < A > {
609+ /// The timely scheduler .
610+ scheduler : A ,
600611 /// State tracked per dataflow export.
601612 exports : BTreeMap < GlobalId , ExportState > ,
602613 /// Maps live dataflows to counts of their exports.
@@ -615,10 +626,10 @@ struct DemuxState<A: Allocate> {
615626 dataflow_global_ids : BTreeMap < usize , BTreeSet < GlobalId > > ,
616627}
617628
618- impl < A : Allocate > DemuxState < A > {
619- fn new ( worker : Worker < A > ) -> Self {
629+ impl < A : Scheduler > DemuxState < A > {
630+ fn new ( scheduler : A ) -> Self {
620631 Self {
621- worker ,
632+ scheduler ,
622633 exports : Default :: default ( ) ,
623634 dataflow_export_counts : Default :: default ( ) ,
624635 dataflow_drop_times : Default :: default ( ) ,
@@ -665,34 +676,21 @@ struct ArrangementSizeState {
665676 count : isize ,
666677}
667678
668- /// An update of value `D` at a time and with a diff.
669- type Update < D > = ( D , Timestamp , Diff ) ;
670- /// A pusher for updates of value `D` for vector-based containers.
671- type Pusher < D > = Counter < Timestamp , Vec < Update < D > > , Tee < Timestamp , Vec < Update < D > > > > ;
672- /// A pusher for updates of value `D` for columnar containers.
673- type PusherColumnar < D > = Counter < Timestamp , Column < Update < D > > , Tee < Timestamp , Column < Update < D > > > > ;
674- /// An output session for vector-based containers of updates `D`, using a capacity container builder.
675- type OutputSession < ' a , D > =
676- Session < ' a , Timestamp , CapacityContainerBuilder < Vec < Update < D > > > , Pusher < D > > ;
677- /// An output session for columnar containers of updates `D`, using a column builder.
678- type OutputSessionColumnar < ' a , D > =
679- Session < ' a , Timestamp , ColumnBuilder < Update < D > > , PusherColumnar < D > > ;
680-
681679/// Bundled output sessions used by the demux operator.
682680struct DemuxOutput < ' a > {
683- export : OutputSession < ' a , ExportDatum > ,
684- frontier : OutputSession < ' a , FrontierDatum > ,
685- import_frontier : OutputSession < ' a , ImportFrontierDatum > ,
686- peek : OutputSession < ' a , PeekDatum > ,
687- peek_duration : OutputSession < ' a , PeekDurationDatum > ,
688- shutdown_duration : OutputSession < ' a , u128 > ,
689- arrangement_heap_size : OutputSession < ' a , ArrangementHeapDatum > ,
690- arrangement_heap_capacity : OutputSession < ' a , ArrangementHeapDatum > ,
691- arrangement_heap_allocations : OutputSession < ' a , ArrangementHeapDatum > ,
692- hydration_time : OutputSession < ' a , HydrationTimeDatum > ,
693- error_count : OutputSession < ' a , ErrorCountDatum > ,
694- lir_mapping : OutputSessionColumnar < ' a , LirMappingDatum > ,
695- dataflow_global_ids : OutputSession < ' a , DataflowGlobalDatum > ,
681+ export : OutputSessionVec < ' a , Update < ExportDatum > > ,
682+ frontier : OutputSessionVec < ' a , Update < FrontierDatum > > ,
683+ import_frontier : OutputSessionVec < ' a , Update < ImportFrontierDatum > > ,
684+ peek : OutputSessionVec < ' a , Update < PeekDatum > > ,
685+ peek_duration : OutputSessionVec < ' a , Update < PeekDurationDatum > > ,
686+ shutdown_duration : OutputSessionVec < ' a , Update < u128 > > ,
687+ arrangement_heap_size : OutputSessionVec < ' a , Update < ArrangementHeapDatum > > ,
688+ arrangement_heap_capacity : OutputSessionVec < ' a , Update < ArrangementHeapDatum > > ,
689+ arrangement_heap_allocations : OutputSessionVec < ' a , Update < ArrangementHeapDatum > > ,
690+ hydration_time : OutputSessionVec < ' a , Update < HydrationTimeDatum > > ,
691+ error_count : OutputSessionVec < ' a , Update < ErrorCountDatum > > ,
692+ lir_mapping : OutputSessionColumnar < ' a , Update < LirMappingDatum > > ,
693+ dataflow_global_ids : OutputSessionVec < ' a , Update < DataflowGlobalDatum > > ,
696694}
697695
698696#[ derive( Clone ) ]
@@ -763,7 +761,7 @@ struct DataflowGlobalDatum {
763761}
764762
765763/// Event handler of the demux operator.
766- struct DemuxHandler < ' a , ' b , A : Allocate + ' static > {
764+ struct DemuxHandler < ' a , ' b , A : Scheduler > {
767765 /// State kept by the demux operator.
768766 state : & ' a mut DemuxState < A > ,
769767 /// State shared across log receivers.
@@ -776,7 +774,7 @@ struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
776774 time : Duration ,
777775}
778776
779- impl < A : Allocate > DemuxHandler < ' _ , ' _ , A > {
777+ impl < A : Scheduler > DemuxHandler < ' _ , ' _ , A > {
780778 /// Return the timestamp associated with the current event, based on the event time and the
781779 /// logging interval.
782780 fn ts ( & self ) -> Timestamp {
@@ -1198,7 +1196,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
11981196 ) {
11991197 let activator = self
12001198 . state
1201- . worker
1199+ . scheduler
12021200 . activator_for ( address. into_iter ( ) . collect ( ) ) ;
12031201 let existing = self
12041202 . state
0 commit comments