Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use uuid::Uuid;
use crate::arrangement::manager::{TraceBundle, TraceManager};
use crate::logging;
use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
use crate::logging::initialize::LoggingTraces;
use crate::metrics::{CollectionMetrics, WorkerMetrics};
use crate::render::{LinearJoinSpec, StartSignal};
use crate::server::{ComputeInstanceContext, ResponseSender};
Expand Down Expand Up @@ -665,10 +666,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
panic!("dataflow server has already initialized logging");
}

let (logger, traces) = logging::initialize(self.timely_worker, &config);
let LoggingTraces {
traces,
dataflow_index,
compute_logger: logger,
} = logging::initialize(self.timely_worker, &config);

let mut log_index_ids = config.index_logs;
for (log, (trace, dataflow_index)) in traces {
for (log, trace) in traces {
// Install trace as maintained index.
let id = log_index_ids
.remove(&log)
Expand Down
28 changes: 16 additions & 12 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

pub mod compute;
mod differential;
mod initialize;
pub(super) mod initialize;
mod reachability;
mod timely;

Expand All @@ -21,7 +21,7 @@ use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use ::timely::container::ContainerBuilder;
use ::timely::container::{CapacityContainerBuilder, ContainerBuilder};
use ::timely::dataflow::channels::pact::Pipeline;
use ::timely::dataflow::channels::pushers::buffer::Session;
use ::timely::dataflow::channels::pushers::{Counter, Tee};
Expand All @@ -36,13 +36,25 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::containers::ColumnBuilder;
use mz_timely_util::operator::consolidate_pact;

use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;

pub use crate::logging::initialize::initialize;

/// An update of value `D` at a time and with a diff.
pub(super) type Update<D> = (D, Timestamp, Diff);
/// A pusher for containers `C`.
pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
/// An output session for the specified container builder.
pub(super) type OutputSession<'a, CB> =
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder<Vec<D>>>;
/// An output session for columnar containers of updates `D`, using a column builder.
pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder<D>>;

/// Logs events as a timely stream, with progress statements.
struct BatchLogger<C, P>
where
Expand Down Expand Up @@ -137,13 +149,11 @@ impl<C, const N: usize> EventQueue<C, N> {
}
}

/// State shared between different logging dataflows.
/// State shared between different logging dataflow fragments.
#[derive(Default)]
struct SharedLoggingState {
/// Activators for arrangement heap size operators.
arrangement_size_activators: BTreeMap<usize, Activator>,
/// Shared compute logger.
compute_logger: Option<ComputeLogger>,
}

/// Helper to pack collections of [`Datum`]s into key and value row.
Expand Down Expand Up @@ -214,14 +224,8 @@ struct LogCollection {
trace: RowRowAgent<Timestamp, Diff>,
/// Token that should be dropped to drop this collection.
token: Rc<dyn Any>,
/// Index of the dataflow exporting this collection.
dataflow_index: usize,
}

pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
pub(super) type OutputSession<'a, CB> =
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;

/// A single-purpose function to consolidate and pack updates for log collection.
///
/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts
Expand Down
112 changes: 55 additions & 57 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,20 @@ use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::{Concatenate, Enter, Operator};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::scheduling::Scheduler;
use timely::worker::Worker;
use timely::{Container, Data};
use tracing::error;
use uuid::Uuid;

use crate::extensions::arrange::MzArrange;
use crate::logging::{
ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
PermutedRowPacker, SharedLoggingState, Update,
};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::typedefs::RowRowSpine;
Expand Down Expand Up @@ -289,24 +285,32 @@ impl LirMetadata {
}
}

/// Constructs the logging dataflow for compute logs.
/// The return type of the [`construct`] function.
pub(super) struct Return {
/// Collections returned by [`construct`].
pub collections: BTreeMap<LogVariant, LogCollection>,
}

/// Constructs the logging dataflow fragment for compute logs.
///
/// Params
/// * `worker`: The Timely worker hosting the log analysis dataflow.
/// * `scope`: The Timely scope hosting the log analysis dataflow.
/// * `scheduler`: The timely scheduler to obtainer activators.
/// * `config`: Logging configuration.
/// * `event_queue`: The source to read compute log events from.
pub(super) fn construct<A: Allocate + 'static>(
worker: &mut timely::worker::Worker<A>,
/// * `compute_event_streams`: Additional compute event streams to absorb.
/// * `shared_state`: Shared state between logging dataflow fragments.
pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
mut scope: G,
scheduler: S,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
compute_event_streams: impl IntoIterator<Item = StreamCore<G, Column<(Duration, ComputeEvent)>>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
) -> Return {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
let worker2 = worker.clone();
let dataflow_index = worker.next_dataflow_index();

worker.dataflow_named("Dataflow: compute logging", move |scope| {
scope.scoped("compute logging", move |scope| {
let enable_logging = config.enable_logging;
let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
Expand All @@ -322,6 +326,12 @@ pub(super) fn construct<A: Allocate + 'static>(
},
);

let logs = compute_event_streams
.into_iter()
.map(|stream| stream.enter(scope))
.chain(std::iter::once(logs));
let logs = scope.concatenate(logs);

// Build a demux operator that splits the replayed event stream up into the separate
// logging streams.
let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
Expand All @@ -341,7 +351,7 @@ pub(super) fn construct<A: Allocate + 'static>(
let (mut lir_mapping_out, lir_mapping) = demux.new_output();
let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();

let mut demux_state = DemuxState::new(worker2);
let mut demux_state = DemuxState::new(scheduler);
demux.build(move |_capability| {
move |_frontiers| {
let mut export = export_out.activate();
Expand Down Expand Up @@ -390,6 +400,8 @@ pub(super) fn construct<A: Allocate + 'static>(
}
});

let worker_id = scope.index();

// Encode the contents of each logging stream into its expected `Row` format.
let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
let dataflow_current = export.as_collection().map({
Expand Down Expand Up @@ -557,7 +569,7 @@ pub(super) fn construct<A: Allocate + 'static>(
];

// Build the output arrangements.
let mut result = BTreeMap::new();
let mut collections = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Compute(variant);
if config.index_logs.contains_key(&variant) {
Expand All @@ -569,13 +581,12 @@ pub(super) fn construct<A: Allocate + 'static>(
let collection = LogCollection {
trace,
token: Rc::clone(&token),
dataflow_index,
};
result.insert(variant, collection);
collections.insert(variant, collection);
}
}

result
Return { collections }
})
}

Expand All @@ -594,9 +605,9 @@ where
}

/// State maintained by the demux operator.
struct DemuxState<A: Allocate> {
/// The worker hosting this operator.
worker: Worker<A>,
struct DemuxState<A> {
/// The timely scheduler.
scheduler: A,
/// State tracked per dataflow export.
exports: BTreeMap<GlobalId, ExportState>,
/// Maps live dataflows to counts of their exports.
Expand All @@ -615,10 +626,10 @@ struct DemuxState<A: Allocate> {
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
}

impl<A: Allocate> DemuxState<A> {
fn new(worker: Worker<A>) -> Self {
impl<A: Scheduler> DemuxState<A> {
fn new(scheduler: A) -> Self {
Self {
worker,
scheduler,
exports: Default::default(),
dataflow_export_counts: Default::default(),
dataflow_drop_times: Default::default(),
Expand Down Expand Up @@ -665,34 +676,21 @@ struct ArrangementSizeState {
count: isize,
}

/// An update of value `D` at a time and with a diff.
type Update<D> = (D, Timestamp, Diff);
/// A pusher for updates of value `D` for vector-based containers.
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
/// A pusher for updates of value `D` for columnar containers.
type PusherColumnar<D> = Counter<Timestamp, Column<Update<D>>, Tee<Timestamp, Column<Update<D>>>>;
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
type OutputSession<'a, D> =
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;
/// An output session for columnar containers of updates `D`, using a column builder.
type OutputSessionColumnar<'a, D> =
Session<'a, Timestamp, ColumnBuilder<Update<D>>, PusherColumnar<D>>;

/// Bundled output sessions used by the demux operator.
struct DemuxOutput<'a> {
export: OutputSession<'a, ExportDatum>,
frontier: OutputSession<'a, FrontierDatum>,
import_frontier: OutputSession<'a, ImportFrontierDatum>,
peek: OutputSession<'a, PeekDatum>,
peek_duration: OutputSession<'a, PeekDurationDatum>,
shutdown_duration: OutputSession<'a, u128>,
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
hydration_time: OutputSession<'a, HydrationTimeDatum>,
error_count: OutputSession<'a, ErrorCountDatum>,
lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>,
dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>,
export: OutputSessionVec<'a, Update<ExportDatum>>,
frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
peek: OutputSessionVec<'a, Update<PeekDatum>>,
peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
shutdown_duration: OutputSessionVec<'a, Update<u128>>,
arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -763,7 +761,7 @@ struct DataflowGlobalDatum {
}

/// Event handler of the demux operator.
struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
struct DemuxHandler<'a, 'b, A: Scheduler> {
/// State kept by the demux operator.
state: &'a mut DemuxState<A>,
/// State shared across log receivers.
Expand All @@ -776,7 +774,7 @@ struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
time: Duration,
}

impl<A: Allocate> DemuxHandler<'_, '_, A> {
impl<A: Scheduler> DemuxHandler<'_, '_, A> {
/// Return the timestamp associated with the current event, based on the event time and the
/// logging interval.
fn ts(&self) -> Timestamp {
Expand Down Expand Up @@ -1198,7 +1196,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
) {
let activator = self
.state
.worker
.scheduler
.activator_for(address.into_iter().collect());
let existing = self
.state
Expand Down
Loading