diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 30d272724aa0f..e3b5ef3fb0896 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -65,6 +65,7 @@ use uuid::Uuid; use crate::arrangement::manager::{TraceBundle, TraceManager}; use crate::logging; use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent}; +use crate::logging::initialize::LoggingTraces; use crate::metrics::{CollectionMetrics, WorkerMetrics}; use crate::render::{LinearJoinSpec, StartSignal}; use crate::server::{ComputeInstanceContext, ResponseSender}; @@ -665,10 +666,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { panic!("dataflow server has already initialized logging"); } - let (logger, traces) = logging::initialize(self.timely_worker, &config); + let LoggingTraces { + traces, + dataflow_index, + compute_logger: logger, + } = logging::initialize(self.timely_worker, &config); let mut log_index_ids = config.index_logs; - for (log, (trace, dataflow_index)) in traces { + for (log, trace) in traces { // Install trace as maintained index. let id = log_index_ids .remove(&log) diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index ede369d705c49..f43998216ea5e 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -11,7 +11,7 @@ pub mod compute; mod differential; -mod initialize; +pub(super) mod initialize; mod reachability; mod timely; @@ -21,7 +21,7 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; -use ::timely::container::ContainerBuilder; +use ::timely::container::{CapacityContainerBuilder, ContainerBuilder}; use ::timely::dataflow::channels::pact::Pipeline; use ::timely::dataflow::channels::pushers::buffer::Session; use ::timely::dataflow::channels::pushers::{Counter, Tee}; @@ -36,13 +36,25 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp}; use mz_timely_util::activator::RcActivator; +use mz_timely_util::containers::ColumnBuilder; use mz_timely_util::operator::consolidate_pact; -use crate::logging::compute::Logger as ComputeLogger; use crate::typedefs::RowRowAgent; pub use crate::logging::initialize::initialize; +/// An update of value `D` at a time and with a diff. +pub(super) type Update = (D, Timestamp, Diff); +/// A pusher for containers `C`. +pub(super) type Pusher = Counter>; +/// An output session for the specified container builder. +pub(super) type OutputSession<'a, CB> = + Session<'a, Timestamp, CB, Pusher<::Container>>; +/// An output session for vector-based containers of updates `D`, using a capacity container builder. +pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder>>; +/// An output session for columnar containers of updates `D`, using a column builder. +pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder>; + /// Logs events as a timely stream, with progress statements. struct BatchLogger where @@ -137,13 +149,11 @@ impl EventQueue { } } -/// State shared between different logging dataflows. +/// State shared between different logging dataflow fragments. #[derive(Default)] struct SharedLoggingState { /// Activators for arrangement heap size operators. arrangement_size_activators: BTreeMap, - /// Shared compute logger. - compute_logger: Option, } /// Helper to pack collections of [`Datum`]s into key and value row. @@ -214,14 +224,8 @@ struct LogCollection { trace: RowRowAgent, /// Token that should be dropped to drop this collection. token: Rc, - /// Index of the dataflow exporting this collection. - dataflow_index: usize, } -pub(super) type Pusher = Counter>; -pub(super) type OutputSession<'a, CB> = - Session<'a, Timestamp, CB, Pusher<::Container>>; - /// A single-purpose function to consolidate and pack updates for log collection. /// /// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 8c0610d3b055a..59aa439c03acb 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -24,24 +24,20 @@ use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, GlobalId, Timestamp}; use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder}; use mz_timely_util::replay::MzReplay; -use timely::communication::Allocate; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::channels::pushers::buffer::Session; -use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::core::Map; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::Operator; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::operators::{Concatenate, Enter, Operator}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::scheduling::Scheduler; -use timely::worker::Worker; use timely::{Container, Data}; use tracing::error; use uuid::Uuid; use crate::extensions::arrange::MzArrange; use crate::logging::{ - ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState, + ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec, + PermutedRowPacker, SharedLoggingState, Update, }; use crate::row_spine::{RowRowBatcher, RowRowBuilder}; use crate::typedefs::RowRowSpine; @@ -289,24 +285,32 @@ impl LirMetadata { } } -/// Constructs the logging dataflow for compute logs. +/// The return type of the [`construct`] function. +pub(super) struct Return { + /// Collections returned by [`construct`]. + pub collections: BTreeMap, +} + +/// Constructs the logging dataflow fragment for compute logs. /// /// Params -/// * `worker`: The Timely worker hosting the log analysis dataflow. +/// * `scope`: The Timely scope hosting the log analysis dataflow. +/// * `scheduler`: The timely scheduler to obtainer activators. /// * `config`: Logging configuration. /// * `event_queue`: The source to read compute log events from. -pub(super) fn construct( - worker: &mut timely::worker::Worker, +/// * `compute_event_streams`: Additional compute event streams to absorb. +/// * `shared_state`: Shared state between logging dataflow fragments. +pub(super) fn construct>( + mut scope: G, + scheduler: S, config: &mz_compute_client::logging::LoggingConfig, event_queue: EventQueue>, + compute_event_streams: impl IntoIterator>>, shared_state: Rc>, -) -> BTreeMap { +) -> Return { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); - let worker_id = worker.index(); - let worker2 = worker.clone(); - let dataflow_index = worker.next_dataflow_index(); - worker.dataflow_named("Dataflow: compute logging", move |scope| { + scope.scoped("compute logging", move |scope| { let enable_logging = config.enable_logging; let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( scope, @@ -322,6 +326,12 @@ pub(super) fn construct( }, ); + let logs = compute_event_streams + .into_iter() + .map(|stream| stream.enter(scope)) + .chain(std::iter::once(logs)); + let logs = scope.concatenate(logs); + // Build a demux operator that splits the replayed event stream up into the separate // logging streams. let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone()); @@ -341,7 +351,7 @@ pub(super) fn construct( let (mut lir_mapping_out, lir_mapping) = demux.new_output(); let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output(); - let mut demux_state = DemuxState::new(worker2); + let mut demux_state = DemuxState::new(scheduler); demux.build(move |_capability| { move |_frontiers| { let mut export = export_out.activate(); @@ -390,6 +400,8 @@ pub(super) fn construct( } }); + let worker_id = scope.index(); + // Encode the contents of each logging stream into its expected `Row` format. let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent); let dataflow_current = export.as_collection().map({ @@ -557,7 +569,7 @@ pub(super) fn construct( ]; // Build the output arrangements. - let mut result = BTreeMap::new(); + let mut collections = BTreeMap::new(); for (variant, collection) in logs { let variant = LogVariant::Compute(variant); if config.index_logs.contains_key(&variant) { @@ -569,13 +581,12 @@ pub(super) fn construct( let collection = LogCollection { trace, token: Rc::clone(&token), - dataflow_index, }; - result.insert(variant, collection); + collections.insert(variant, collection); } } - result + Return { collections } }) } @@ -594,9 +605,9 @@ where } /// State maintained by the demux operator. -struct DemuxState { - /// The worker hosting this operator. - worker: Worker, +struct DemuxState { + /// The timely scheduler. + scheduler: A, /// State tracked per dataflow export. exports: BTreeMap, /// Maps live dataflows to counts of their exports. @@ -615,10 +626,10 @@ struct DemuxState { dataflow_global_ids: BTreeMap>, } -impl DemuxState { - fn new(worker: Worker) -> Self { +impl DemuxState { + fn new(scheduler: A) -> Self { Self { - worker, + scheduler, exports: Default::default(), dataflow_export_counts: Default::default(), dataflow_drop_times: Default::default(), @@ -665,34 +676,21 @@ struct ArrangementSizeState { count: isize, } -/// An update of value `D` at a time and with a diff. -type Update = (D, Timestamp, Diff); -/// A pusher for updates of value `D` for vector-based containers. -type Pusher = Counter>, Tee>>>; -/// A pusher for updates of value `D` for columnar containers. -type PusherColumnar = Counter>, Tee>>>; -/// An output session for vector-based containers of updates `D`, using a capacity container builder. -type OutputSession<'a, D> = - Session<'a, Timestamp, CapacityContainerBuilder>>, Pusher>; -/// An output session for columnar containers of updates `D`, using a column builder. -type OutputSessionColumnar<'a, D> = - Session<'a, Timestamp, ColumnBuilder>, PusherColumnar>; - /// Bundled output sessions used by the demux operator. struct DemuxOutput<'a> { - export: OutputSession<'a, ExportDatum>, - frontier: OutputSession<'a, FrontierDatum>, - import_frontier: OutputSession<'a, ImportFrontierDatum>, - peek: OutputSession<'a, PeekDatum>, - peek_duration: OutputSession<'a, PeekDurationDatum>, - shutdown_duration: OutputSession<'a, u128>, - arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>, - arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>, - arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>, - hydration_time: OutputSession<'a, HydrationTimeDatum>, - error_count: OutputSession<'a, ErrorCountDatum>, - lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>, - dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>, + export: OutputSessionVec<'a, Update>, + frontier: OutputSessionVec<'a, Update>, + import_frontier: OutputSessionVec<'a, Update>, + peek: OutputSessionVec<'a, Update>, + peek_duration: OutputSessionVec<'a, Update>, + shutdown_duration: OutputSessionVec<'a, Update>, + arrangement_heap_size: OutputSessionVec<'a, Update>, + arrangement_heap_capacity: OutputSessionVec<'a, Update>, + arrangement_heap_allocations: OutputSessionVec<'a, Update>, + hydration_time: OutputSessionVec<'a, Update>, + error_count: OutputSessionVec<'a, Update>, + lir_mapping: OutputSessionColumnar<'a, Update>, + dataflow_global_ids: OutputSessionVec<'a, Update>, } #[derive(Clone)] @@ -763,7 +761,7 @@ struct DataflowGlobalDatum { } /// Event handler of the demux operator. -struct DemuxHandler<'a, 'b, A: Allocate + 'static> { +struct DemuxHandler<'a, 'b, A: Scheduler> { /// State kept by the demux operator. state: &'a mut DemuxState, /// State shared across log receivers. @@ -776,7 +774,7 @@ struct DemuxHandler<'a, 'b, A: Allocate + 'static> { time: Duration, } -impl DemuxHandler<'_, '_, A> { +impl DemuxHandler<'_, '_, A> { /// Return the timestamp associated with the current event, based on the event time and the /// logging interval. fn ts(&self) -> Timestamp { @@ -1198,7 +1196,7 @@ impl DemuxHandler<'_, '_, A> { ) { let activator = self .state - .worker + .scheduler .activator_for(address.into_iter().collect()); let existing = self .state diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 7e3affdc5be04..e495791828383 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -21,42 +21,49 @@ use differential_dataflow::logging::{ use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::containers::{ - columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, + columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder, }; use mz_timely_util::replay::MzReplay; -use timely::communication::Allocate; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::Stream; +use timely::dataflow::operators::Leave; +use timely::dataflow::{Scope, Stream, StreamCore}; use crate::extensions::arrange::MzArrangeCore; use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent}; use crate::logging::{ consolidate_and_pack, DifferentialLog, EventQueue, LogCollection, LogVariant, - SharedLoggingState, + OutputSessionColumnar, SharedLoggingState, }; use crate::row_spine::RowRowBuilder; use crate::typedefs::{KeyBatcher, RowRowSpine}; -/// Constructs the logging dataflow for differential logs. +/// The return type of [`construct`]. +pub(super) struct Return { + /// Collections to export. + pub collections: BTreeMap, + /// Stream of compute events generated by the differential logger. + pub compute_events: StreamCore>, +} + +/// Constructs the logging dataflow fragment for differential logs. /// /// Params -/// * `worker`: The Timely worker hosting the log analysis dataflow. +/// * `scope`: The Timely scope hosting the log analysis dataflow. /// * `config`: Logging configuration /// * `event_queue`: The source to read log events from. -pub(super) fn construct( - worker: &mut timely::worker::Worker, +/// * `shared_state`: Shared state across all logging dataflow fragments. +pub(super) fn construct>( + mut scope: G, config: &mz_compute_client::logging::LoggingConfig, event_queue: EventQueue>, shared_state: Rc>, -) -> BTreeMap { +) -> Return { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); - let worker_id = worker.index(); - let dataflow_index = worker.next_dataflow_index(); - worker.dataflow_named("Dataflow: differential logging", move |scope| { + scope.scoped("differential logging", move |scope| { let enable_logging = config.enable_logging; let (logs, token) = event_queue.links .mz_replay::<_, ProvidedBuilder<_>, _>( @@ -85,6 +92,7 @@ pub(super) fn construct( let (mut batcher_size_out, batcher_size) = demux.new_output(); let (mut batcher_capacity_out, batcher_capacity) = demux.new_output(); let (mut batcher_allocations_out, batcher_allocations) = demux.new_output(); + let (mut compute_events_out, compute_events) = demux.new_output(); let mut demux_state = Default::default(); demux.build(move |_capability| { @@ -96,6 +104,7 @@ pub(super) fn construct( let mut batcher_size = batcher_size_out.activate(); let mut batcher_capacity = batcher_capacity_out.activate(); let mut batcher_allocations = batcher_allocations_out.activate(); + let mut compute_events_out = compute_events_out.activate(); input.for_each(|cap, data| { let mut output_buffers = DemuxOutput { @@ -106,6 +115,7 @@ pub(super) fn construct( batcher_size: batcher_size.session_with_builder(&cap), batcher_capacity: batcher_capacity.session_with_builder(&cap), batcher_allocations: batcher_allocations.session_with_builder(&cap), + compute_events: compute_events_out.session_with_builder(&cap), }; for (time, event) in data.drain(..) { @@ -125,6 +135,7 @@ pub(super) fn construct( // We're lucky and the differential logs all have the same stream format, so just implement // the call once. let stream_to_collection = |input: &Stream<_, ((usize, ()), Timestamp, Diff)>, log| { + let worker_id = scope.index(); consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>( input, log, @@ -159,7 +170,7 @@ pub(super) fn construct( ]; // Build the output arrangements. - let mut result = BTreeMap::new(); + let mut collections = BTreeMap::new(); for (variant, collection) in logs { let variant = LogVariant::Differential(variant); if config.index_logs.contains_key(&variant) { @@ -172,13 +183,12 @@ pub(super) fn construct( let collection = LogCollection { trace, token: Rc::clone(&token), - dataflow_index, }; - result.insert(variant, collection); + collections.insert(variant, collection); } } - result + Return { collections, compute_events: compute_events.leave(), } }) } @@ -196,6 +206,7 @@ struct DemuxOutput<'a> { batcher_size: OutputSession<'a, (usize, ())>, batcher_capacity: OutputSession<'a, (usize, ())>, batcher_allocations: OutputSession<'a, (usize, ())>, + compute_events: OutputSessionColumnar<'a, (Duration, ComputeEvent)>, } /// State maintained by the demux operator. @@ -287,17 +298,18 @@ impl DemuxHandler<'_, '_> { debug_assert_ne!(diff, 0); self.output.sharing.give(((operator_id, ()), ts, diff)); - if let Some(logger) = &mut self.shared_state.compute_logger { - let sharing = self.state.sharing.entry(operator_id).or_default(); - *sharing = (i64::try_from(*sharing).expect("must fit") + diff) - .try_into() - .expect("under/overflow"); - if *sharing == 0 { - self.state.sharing.remove(&operator_id); - logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop( - ArrangementHeapSizeOperatorDrop { operator_id }, - )); - } + let sharing = self.state.sharing.entry(operator_id).or_default(); + *sharing = (i64::try_from(*sharing).expect("must fit") + diff) + .try_into() + .expect("under/overflow"); + if *sharing == 0 { + self.state.sharing.remove(&operator_id); + self.output.compute_events.give(&( + self.time, + ComputeEvent::ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop { + operator_id, + }), + )); } } diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 3b9d7f65b9899..e9ddc73f80208 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -21,6 +21,7 @@ use mz_timely_util::containers::{Column, ColumnBuilder}; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::{ContainerBuilder, PushInto}; +use timely::dataflow::Scope; use timely::logging::{TimelyEvent, TimelyEventBuilder}; use timely::logging_core::{Logger, Registry}; use timely::order::Product; @@ -39,10 +40,7 @@ use crate::typedefs::{ErrBatcher, ErrBuilder}; pub fn initialize( worker: &mut timely::worker::Worker, config: &LoggingConfig, -) -> ( - super::compute::Logger, - BTreeMap, -) { +) -> LoggingTraces { let interval_ms = std::cmp::max(1, config.interval.as_millis()); // Track time relative to the Unix epoch, rather than when the server @@ -68,17 +66,22 @@ pub fn initialize( // Depending on whether we should log the creation of the logging dataflows, we register the // loggers with timely either before or after creating them. + let dataflow_index = context.worker.next_dataflow_index(); let traces = if config.log_logging { context.register_loggers(); - context.construct_dataflows() + context.construct_dataflow() } else { - let traces = context.construct_dataflows(); + let traces = context.construct_dataflow(); context.register_loggers(); traces }; - let logger = worker.log_register().get("materialize/compute").unwrap(); - (logger, traces) + let compute_logger = worker.log_register().get("materialize/compute").unwrap(); + LoggingTraces { + traces, + dataflow_index, + compute_logger, + } } pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>); @@ -96,36 +99,59 @@ struct LoggingContext<'a, A: Allocate> { shared_state: Rc>, } +pub(crate) struct LoggingTraces { + /// Exported traces, by log variant. + pub traces: BTreeMap, + /// The index of the dataflow that exports the traces. + pub dataflow_index: usize, + /// The compute logger. + pub compute_logger: super::compute::Logger, +} + impl LoggingContext<'_, A> { - fn construct_dataflows(&mut self) -> BTreeMap { - let mut collections = BTreeMap::new(); - collections.extend(super::timely::construct( - self.worker, - self.config, - self.t_event_queue.clone(), - Rc::clone(&self.shared_state), - )); - collections.extend(super::reachability::construct( - self.worker, - self.config, - self.r_event_queue.clone(), - )); - collections.extend(super::differential::construct( - self.worker, - self.config, - self.d_event_queue.clone(), - Rc::clone(&self.shared_state), - )); - collections.extend(super::compute::construct( - self.worker, - self.config, - self.c_event_queue.clone(), - Rc::clone(&self.shared_state), - )); - - let errs = self - .worker - .dataflow_named("Dataflow: logging errors", |scope| { + fn construct_dataflow(&mut self) -> BTreeMap { + self.worker.dataflow_named("Dataflow: logging", |scope| { + let mut collections = BTreeMap::new(); + + let super::timely::Return { + collections: timely_collections, + compute_events: compute_events_timely, + } = super::timely::construct(scope.clone(), self.config, self.t_event_queue.clone()); + collections.extend(timely_collections); + + let super::reachability::Return { + collections: reachability_collections, + } = super::reachability::construct( + scope.clone(), + self.config, + self.r_event_queue.clone(), + ); + collections.extend(reachability_collections); + + let super::differential::Return { + collections: differential_collections, + compute_events: compute_events_differential, + } = super::differential::construct( + scope.clone(), + self.config, + self.d_event_queue.clone(), + Rc::clone(&self.shared_state), + ); + collections.extend(differential_collections); + + let super::compute::Return { + collections: compute_collections, + } = super::compute::construct( + scope.clone(), + scope.parent.clone(), + self.config, + self.c_event_queue.clone(), + [compute_events_timely, compute_events_differential], + Rc::clone(&self.shared_state), + ); + collections.extend(compute_collections); + + let errs = scope.scoped("logging errors", |scope| { let collection: KeyCollection<_, DataflowError, Diff> = Collection::empty(scope).into(); collection @@ -133,17 +159,16 @@ impl LoggingContext<'_, A> { .trace }); - // TODO(vmarcos): If we introduce introspection sources that would match - // type specialization for keys, we'd need to ensure that type specialized - // variants reach the map below (issue database-issues#6763). - collections - .into_iter() - .map(|(log, collection)| { - let bundle = - TraceBundle::new(collection.trace, errs.clone()).with_drop(collection.token); - (log, (bundle, collection.dataflow_index)) - }) - .collect() + let traces = collections + .into_iter() + .map(|(log, collection)| { + let bundle = TraceBundle::new(collection.trace, errs.clone()) + .with_drop(collection.token); + (log, bundle) + }) + .collect(); + traces + }) } /// Construct a new reachability logger for timestamp type `T`. @@ -177,8 +202,6 @@ impl LoggingContext<'_, A> { self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2); register.insert_logger("differential/arrange", d_logger); register.insert_logger("materialize/compute", c_logger.clone()); - - self.shared_state.borrow_mut().compute_logger = Some(c_logger); } fn simple_logger( diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 463a30f21430b..0ea18135ee4bb 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -20,8 +20,8 @@ use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Row, Timestamp}; use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, Column, ColumnBuilder}; use mz_timely_util::replay::MzReplay; -use timely::communication::Allocate; use timely::dataflow::channels::pact::ExchangeCore; +use timely::dataflow::Scope; use timely::Container; use crate::extensions::arrange::MzArrangeCore; @@ -30,24 +30,26 @@ use crate::logging::{consolidate_and_pack, EventQueue, LogCollection, LogVariant use crate::row_spine::RowRowBuilder; use crate::typedefs::RowRowSpine; -/// Constructs the logging dataflow for reachability logs. +/// The return type of [`construct`]. +pub(super) struct Return { + /// Collections to export. + pub collections: BTreeMap, +} + +/// Constructs the logging dataflow fragment for reachability logs. /// /// Params -/// * `worker`: The Timely worker hosting the log analysis dataflow. +/// * `scope`: The Timely scope hosting the log analysis dataflow. /// * `config`: Logging configuration /// * `event_queue`: The source to read log events from. -pub(super) fn construct( - worker: &mut timely::worker::Worker, +pub(super) fn construct>( + mut scope: G, config: &LoggingConfig, event_queue: EventQueue, 3>, -) -> BTreeMap { - let interval_ms = std::cmp::max(1, config.interval.as_millis()); - let worker_index = worker.index(); - let dataflow_index = worker.next_dataflow_index(); - - // A dataflow for multiple log-derived arrangements. - let traces = worker.dataflow_named("Dataflow: timely reachability logging", move |scope| { +) -> Return { + let collections = scope.scoped("timely reachability logging", move |scope| { let enable_logging = config.enable_logging; + let interval_ms = std::cmp::max(1, config.interval.as_millis()); type UpdatesKey = (bool, usize, usize, usize, Timestamp); type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>; @@ -75,6 +77,7 @@ pub(super) fn construct( // Restrict results by those logs that are meant to be active. let logs_active = [LogVariant::Timely(TimelyLog::Reachability)]; + let worker_id = scope.index(); let updates = consolidate_and_pack::<_, Col2ValBatcher, ColumnBuilder<_>, _, _>( &updates, @@ -84,7 +87,7 @@ pub(super) fn construct( let update_type = if *update_type { "source" } else { "target" }; let data = packer.pack_slice(&[ Datum::UInt64(u64::cast_from(*operator_id)), - Datum::UInt64(u64::cast_from(worker_index)), + Datum::UInt64(u64::cast_from(worker_id)), Datum::UInt64(u64::cast_from(*source)), Datum::UInt64(u64::cast_from(*port)), Datum::String(update_type), @@ -106,7 +109,6 @@ pub(super) fn construct( let collection = LogCollection { trace, token: Rc::clone(&token), - dataflow_index, }; result.insert(variant, collection); } @@ -114,5 +116,5 @@ pub(super) fn construct( result }); - traces + Return { collections } } diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index d8e0776e85a43..671a54a52cd42 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -9,7 +9,6 @@ //! Logging dataflows for events generated by timely dataflow. -use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; @@ -19,15 +18,16 @@ use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::containers::{ - columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, + columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder, }; use mz_timely_util::replay::MzReplay; -use timely::communication::Allocate; use timely::container::columnation::{Columnation, CopyRegion}; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::operators::Leave; +use timely::dataflow::{Scope, StreamCore}; use timely::logging::{ ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent, TimelyEvent, @@ -37,29 +37,31 @@ use tracing::error; use crate::extensions::arrange::MzArrangeCore; use crate::logging::compute::{ComputeEvent, DataflowShutdown}; -use crate::logging::{consolidate_and_pack, LogCollection}; -use crate::logging::{EventQueue, LogVariant, SharedLoggingState, TimelyLog}; +use crate::logging::{consolidate_and_pack, LogCollection, OutputSessionColumnar}; +use crate::logging::{EventQueue, LogVariant, TimelyLog}; use crate::row_spine::RowRowBuilder; use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine}; -/// Constructs the logging dataflow for timely logs. +/// The return type of [`construct`]. +pub(super) struct Return { + /// Collections to export. + pub collections: BTreeMap, + /// Stream of compute events generated by Timely logging. + pub compute_events: StreamCore>, +} + +/// Constructs the logging dataflow fragment for timely logs. /// /// Params -/// * `worker`: The Timely worker hosting the log analysis dataflow. +/// * `scope`: The Timely scope hosting the log analysis dataflow. /// * `config`: Logging configuration /// * `event_queue`: The source to read log events from. -pub(super) fn construct( - worker: &mut timely::worker::Worker, +pub(super) fn construct>( + mut scope: G, config: &LoggingConfig, event_queue: EventQueue>, - shared_state: Rc>, -) -> BTreeMap { - let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); - let worker_id = worker.index(); - let peers = worker.peers(); - let dataflow_index = worker.next_dataflow_index(); - - worker.dataflow_named("Dataflow: timely logging", move |scope| { +) -> Return { + scope.scoped("timely logging", move |scope| { let enable_logging = config.enable_logging; let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( @@ -90,9 +92,13 @@ pub(super) fn construct( let (mut schedules_histogram_out, schedules_histogram) = demux.new_output(); let (mut batches_sent_out, batches_sent) = demux.new_output(); let (mut batches_received_out, batches_received) = demux.new_output(); + let (mut compute_events_out, compute_events) = demux.new_output(); + let worker_id = scope.index(); let mut demux_state = DemuxState::default(); - demux.build(move |_capability| { + demux.build(|_capability| { + let peers = scope.peers(); + let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); move |_frontiers| { let mut operates = operates_out.activate(); let mut channels = channels_out.activate(); @@ -104,6 +110,7 @@ pub(super) fn construct( let mut batches_received = batches_received_out.activate(); let mut schedules_duration = schedules_duration_out.activate(); let mut schedules_histogram = schedules_histogram_out.activate(); + let mut compute_events_out = compute_events_out.activate(); input.for_each(|cap, data| { let mut output_buffers = DemuxOutput { @@ -117,6 +124,7 @@ pub(super) fn construct( schedules_histogram: schedules_histogram.session_with_builder(&cap), batches_sent: batches_sent.session_with_builder(&cap), batches_received: batches_received.session_with_builder(&cap), + compute_events: compute_events_out.session_with_builder(&cap), }; for (time, event) in data.drain(..) { @@ -129,7 +137,6 @@ pub(super) fn construct( DemuxHandler { state: &mut demux_state, - shared_state: &mut shared_state.borrow_mut(), output: &mut output_buffers, logging_interval_ms, peers, @@ -303,7 +310,7 @@ pub(super) fn construct( }; // Build the output arrangements. - let mut result = BTreeMap::new(); + let mut collections = BTreeMap::new(); for (variant, collection) in logs { let variant = LogVariant::Timely(variant); if config.index_logs.contains_key(&variant) { @@ -316,13 +323,12 @@ pub(super) fn construct( let collection = LogCollection { trace, token: Rc::clone(&token), - dataflow_index, }; - result.insert(variant, collection); + collections.insert(variant, collection); } } - result + Return { collections, compute_events: compute_events.leave(), } }) } @@ -383,6 +389,7 @@ struct DemuxOutput<'a> { messages_received: OutputSession<'a, (MessageDatum, ())>, schedules_duration: OutputSession<'a, (usize, ())>, schedules_histogram: OutputSession<'a, (ScheduleHistogramDatum, ())>, + compute_events: OutputSessionColumnar<'a, (Duration, ComputeEvent)>, } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -430,8 +437,6 @@ impl Columnation for ScheduleHistogramDatum { struct DemuxHandler<'a, 'b> { /// State kept by the demux operator. state: &'a mut DemuxState, - /// State shared across log receivers. - shared_state: &'a mut SharedLoggingState, /// Demux output buffers. output: &'a mut DemuxOutput<'b>, /// The logging interval specifying the time granularity for the updates. @@ -546,11 +551,10 @@ impl DemuxHandler<'_, '_> { fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) { // Notify compute logging about the shutdown. - if let Some(logger) = &self.shared_state.compute_logger { - logger.log(&ComputeEvent::DataflowShutdown(DataflowShutdown { - dataflow_index, - })); - } + self.output.compute_events.give(&( + self.time, + ComputeEvent::DataflowShutdown(DataflowShutdown { dataflow_index }), + )); // When a dataflow shuts down, we need to retract all its channels. let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else { diff --git a/test/sqllogictest/introspection/attribution_sources.slt b/test/sqllogictest/introspection/attribution_sources.slt index cfc585a069977..2d83627352f55 100644 --- a/test/sqllogictest/introspection/attribution_sources.slt +++ b/test/sqllogictest/introspection/attribution_sources.slt @@ -32,8 +32,8 @@ SELECT mz_unsafe.mz_sleep(8) query IT SELECT id, global_id FROM mz_internal.mz_dataflow_global_ids ORDER BY id, global_id; ---- -8 u2 -8 u3 +4 u2 +4 u3 query TI SELECT global_id, lir_id FROM mz_internal.mz_lir_mapping ORDER BY global_id, lir_id DESC; @@ -117,7 +117,7 @@ SELECT mz_unsafe.mz_sleep(8) query IT SELECT id, global_id FROM mz_internal.mz_dataflow_global_ids ORDER BY id, global_id; ---- -13 t59 +9 t59 query TI