Skip to content

Commit cacf32e

Browse files
committed
Move all logging dataflows into a shared dataflow
Pass compute events through a stream instead of injecting them into the compute event log. Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent aa9bae3 commit cacf32e

File tree

8 files changed

+244
-196
lines changed

8 files changed

+244
-196
lines changed

src/compute/src/compute_state.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use uuid::Uuid;
6565
use crate::arrangement::manager::{TraceBundle, TraceManager};
6666
use crate::logging;
6767
use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
68+
use crate::logging::initialize::LoggingTraces;
6869
use crate::metrics::{CollectionMetrics, WorkerMetrics};
6970
use crate::render::{LinearJoinSpec, StartSignal};
7071
use crate::server::{ComputeInstanceContext, ResponseSender};
@@ -665,10 +666,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
665666
panic!("dataflow server has already initialized logging");
666667
}
667668

668-
let (logger, traces) = logging::initialize(self.timely_worker, &config);
669+
let LoggingTraces {
670+
traces,
671+
dataflow_index,
672+
compute_logger: logger,
673+
} = logging::initialize(self.timely_worker, &config);
669674

670675
let mut log_index_ids = config.index_logs;
671-
for (log, (trace, dataflow_index)) in traces {
676+
for (log, trace) in traces {
672677
// Install trace as maintained index.
673678
let id = log_index_ids
674679
.remove(&log)

src/compute/src/logging.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
1212
pub mod compute;
1313
mod differential;
14-
mod initialize;
14+
pub(super) mod initialize;
1515
mod reachability;
1616
mod timely;
1717

@@ -21,7 +21,7 @@ use std::marker::PhantomData;
2121
use std::rc::Rc;
2222
use std::time::Duration;
2323

24-
use ::timely::container::ContainerBuilder;
24+
use ::timely::container::{CapacityContainerBuilder, ContainerBuilder};
2525
use ::timely::dataflow::channels::pact::Pipeline;
2626
use ::timely::dataflow::channels::pushers::buffer::Session;
2727
use ::timely::dataflow::channels::pushers::{Counter, Tee};
@@ -36,13 +36,25 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
3636
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
3737
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
3838
use mz_timely_util::activator::RcActivator;
39+
use mz_timely_util::containers::ColumnBuilder;
3940
use mz_timely_util::operator::consolidate_pact;
4041

41-
use crate::logging::compute::Logger as ComputeLogger;
4242
use crate::typedefs::RowRowAgent;
4343

4444
pub use crate::logging::initialize::initialize;
4545

46+
/// An update of value `D` at a time and with a diff.
47+
pub(super) type Update<D> = (D, Timestamp, Diff);
48+
/// A pusher for containers `C`.
49+
pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
50+
/// An output session for the specified container builder.
51+
pub(super) type OutputSession<'a, CB> =
52+
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
53+
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
54+
pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder<Vec<D>>>;
55+
/// An output session for columnar containers of updates `D`, using a column builder.
56+
pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder<D>>;
57+
4658
/// Logs events as a timely stream, with progress statements.
4759
struct BatchLogger<C, P>
4860
where
@@ -137,13 +149,11 @@ impl<C, const N: usize> EventQueue<C, N> {
137149
}
138150
}
139151

140-
/// State shared between different logging dataflows.
152+
/// State shared between different logging dataflow fragments.
141153
#[derive(Default)]
142154
struct SharedLoggingState {
143155
/// Activators for arrangement heap size operators.
144156
arrangement_size_activators: BTreeMap<usize, Activator>,
145-
/// Shared compute logger.
146-
compute_logger: Option<ComputeLogger>,
147157
}
148158

149159
/// Helper to pack collections of [`Datum`]s into key and value row.
@@ -214,14 +224,8 @@ struct LogCollection {
214224
trace: RowRowAgent<Timestamp, Diff>,
215225
/// Token that should be dropped to drop this collection.
216226
token: Rc<dyn Any>,
217-
/// Index of the dataflow exporting this collection.
218-
dataflow_index: usize,
219227
}
220228

221-
pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
222-
pub(super) type OutputSession<'a, CB> =
223-
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
224-
225229
/// A single-purpose function to consolidate and pack updates for log collection.
226230
///
227231
/// The function first consolidates worker-local updates using the [`Pipeline`] pact, then converts

src/compute/src/logging/compute.rs

Lines changed: 55 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,20 @@ use mz_ore::cast::CastFrom;
2424
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
2525
use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
2626
use mz_timely_util::replay::MzReplay;
27-
use timely::communication::Allocate;
28-
use timely::container::CapacityContainerBuilder;
2927
use timely::dataflow::channels::pact::Pipeline;
30-
use timely::dataflow::channels::pushers::buffer::Session;
31-
use timely::dataflow::channels::pushers::{Counter, Tee};
3228
use timely::dataflow::operators::core::Map;
3329
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
34-
use timely::dataflow::operators::Operator;
35-
use timely::dataflow::{Scope, Stream};
30+
use timely::dataflow::operators::{Concatenate, Enter, Operator};
31+
use timely::dataflow::{Scope, Stream, StreamCore};
3632
use timely::scheduling::Scheduler;
37-
use timely::worker::Worker;
3833
use timely::{Container, Data};
3934
use tracing::error;
4035
use uuid::Uuid;
4136

4237
use crate::extensions::arrange::MzArrange;
4338
use crate::logging::{
44-
ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
39+
ComputeLog, EventQueue, LogCollection, LogVariant, OutputSessionColumnar, OutputSessionVec,
40+
PermutedRowPacker, SharedLoggingState, Update,
4541
};
4642
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
4743
use crate::typedefs::RowRowSpine;
@@ -289,24 +285,32 @@ impl LirMetadata {
289285
}
290286
}
291287

292-
/// Constructs the logging dataflow for compute logs.
288+
/// The return type of the [`construct`] function.
289+
pub(super) struct Return {
290+
/// Collections returned by [`construct`].
291+
pub collections: BTreeMap<LogVariant, LogCollection>,
292+
}
293+
294+
/// Constructs the logging dataflow fragment for compute logs.
293295
///
294296
/// Params
295-
/// * `worker`: The Timely worker hosting the log analysis dataflow.
297+
/// * `scope`: The Timely scope hosting the log analysis dataflow.
298+
/// * `scheduler`: The timely scheduler to obtainer activators.
296299
/// * `config`: Logging configuration.
297300
/// * `event_queue`: The source to read compute log events from.
298-
pub(super) fn construct<A: Allocate + 'static>(
299-
worker: &mut timely::worker::Worker<A>,
301+
/// * `compute_event_streams`: Additional compute event streams to absorb.
302+
/// * `shared_state`: Shared state between logging dataflow fragments.
303+
pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>(
304+
mut scope: G,
305+
scheduler: S,
300306
config: &mz_compute_client::logging::LoggingConfig,
301307
event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
308+
compute_event_streams: impl IntoIterator<Item = StreamCore<G, Column<(Duration, ComputeEvent)>>>,
302309
shared_state: Rc<RefCell<SharedLoggingState>>,
303-
) -> BTreeMap<LogVariant, LogCollection> {
310+
) -> Return {
304311
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
305-
let worker_id = worker.index();
306-
let worker2 = worker.clone();
307-
let dataflow_index = worker.next_dataflow_index();
308312

309-
worker.dataflow_named("Dataflow: compute logging", move |scope| {
313+
scope.scoped("compute logging", move |scope| {
310314
let enable_logging = config.enable_logging;
311315
let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
312316
scope,
@@ -322,6 +326,12 @@ pub(super) fn construct<A: Allocate + 'static>(
322326
},
323327
);
324328

329+
let logs = compute_event_streams
330+
.into_iter()
331+
.map(|stream| stream.enter(scope))
332+
.chain(std::iter::once(logs));
333+
let logs = scope.concatenate(logs);
334+
325335
// Build a demux operator that splits the replayed event stream up into the separate
326336
// logging streams.
327337
let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
@@ -341,7 +351,7 @@ pub(super) fn construct<A: Allocate + 'static>(
341351
let (mut lir_mapping_out, lir_mapping) = demux.new_output();
342352
let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
343353

344-
let mut demux_state = DemuxState::new(worker2);
354+
let mut demux_state = DemuxState::new(scheduler);
345355
demux.build(move |_capability| {
346356
move |_frontiers| {
347357
let mut export = export_out.activate();
@@ -390,6 +400,8 @@ pub(super) fn construct<A: Allocate + 'static>(
390400
}
391401
});
392402

403+
let worker_id = scope.index();
404+
393405
// Encode the contents of each logging stream into its expected `Row` format.
394406
let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
395407
let dataflow_current = export.as_collection().map({
@@ -557,7 +569,7 @@ pub(super) fn construct<A: Allocate + 'static>(
557569
];
558570

559571
// Build the output arrangements.
560-
let mut result = BTreeMap::new();
572+
let mut collections = BTreeMap::new();
561573
for (variant, collection) in logs {
562574
let variant = LogVariant::Compute(variant);
563575
if config.index_logs.contains_key(&variant) {
@@ -569,13 +581,12 @@ pub(super) fn construct<A: Allocate + 'static>(
569581
let collection = LogCollection {
570582
trace,
571583
token: Rc::clone(&token),
572-
dataflow_index,
573584
};
574-
result.insert(variant, collection);
585+
collections.insert(variant, collection);
575586
}
576587
}
577588

578-
result
589+
Return { collections }
579590
})
580591
}
581592

@@ -594,9 +605,9 @@ where
594605
}
595606

596607
/// State maintained by the demux operator.
597-
struct DemuxState<A: Allocate> {
598-
/// The worker hosting this operator.
599-
worker: Worker<A>,
608+
struct DemuxState<A> {
609+
/// The timely scheduler.
610+
scheduler: A,
600611
/// State tracked per dataflow export.
601612
exports: BTreeMap<GlobalId, ExportState>,
602613
/// Maps live dataflows to counts of their exports.
@@ -615,10 +626,10 @@ struct DemuxState<A: Allocate> {
615626
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
616627
}
617628

618-
impl<A: Allocate> DemuxState<A> {
619-
fn new(worker: Worker<A>) -> Self {
629+
impl<A: Scheduler> DemuxState<A> {
630+
fn new(scheduler: A) -> Self {
620631
Self {
621-
worker,
632+
scheduler,
622633
exports: Default::default(),
623634
dataflow_export_counts: Default::default(),
624635
dataflow_drop_times: Default::default(),
@@ -665,34 +676,21 @@ struct ArrangementSizeState {
665676
count: isize,
666677
}
667678

668-
/// An update of value `D` at a time and with a diff.
669-
type Update<D> = (D, Timestamp, Diff);
670-
/// A pusher for updates of value `D` for vector-based containers.
671-
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
672-
/// A pusher for updates of value `D` for columnar containers.
673-
type PusherColumnar<D> = Counter<Timestamp, Column<Update<D>>, Tee<Timestamp, Column<Update<D>>>>;
674-
/// An output session for vector-based containers of updates `D`, using a capacity container builder.
675-
type OutputSession<'a, D> =
676-
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;
677-
/// An output session for columnar containers of updates `D`, using a column builder.
678-
type OutputSessionColumnar<'a, D> =
679-
Session<'a, Timestamp, ColumnBuilder<Update<D>>, PusherColumnar<D>>;
680-
681679
/// Bundled output sessions used by the demux operator.
682680
struct DemuxOutput<'a> {
683-
export: OutputSession<'a, ExportDatum>,
684-
frontier: OutputSession<'a, FrontierDatum>,
685-
import_frontier: OutputSession<'a, ImportFrontierDatum>,
686-
peek: OutputSession<'a, PeekDatum>,
687-
peek_duration: OutputSession<'a, PeekDurationDatum>,
688-
shutdown_duration: OutputSession<'a, u128>,
689-
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
690-
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
691-
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
692-
hydration_time: OutputSession<'a, HydrationTimeDatum>,
693-
error_count: OutputSession<'a, ErrorCountDatum>,
694-
lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>,
695-
dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>,
681+
export: OutputSessionVec<'a, Update<ExportDatum>>,
682+
frontier: OutputSessionVec<'a, Update<FrontierDatum>>,
683+
import_frontier: OutputSessionVec<'a, Update<ImportFrontierDatum>>,
684+
peek: OutputSessionVec<'a, Update<PeekDatum>>,
685+
peek_duration: OutputSessionVec<'a, Update<PeekDurationDatum>>,
686+
shutdown_duration: OutputSessionVec<'a, Update<u128>>,
687+
arrangement_heap_size: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
688+
arrangement_heap_capacity: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
689+
arrangement_heap_allocations: OutputSessionVec<'a, Update<ArrangementHeapDatum>>,
690+
hydration_time: OutputSessionVec<'a, Update<HydrationTimeDatum>>,
691+
error_count: OutputSessionVec<'a, Update<ErrorCountDatum>>,
692+
lir_mapping: OutputSessionColumnar<'a, Update<LirMappingDatum>>,
693+
dataflow_global_ids: OutputSessionVec<'a, Update<DataflowGlobalDatum>>,
696694
}
697695

698696
#[derive(Clone)]
@@ -763,7 +761,7 @@ struct DataflowGlobalDatum {
763761
}
764762

765763
/// Event handler of the demux operator.
766-
struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
764+
struct DemuxHandler<'a, 'b, A: Scheduler> {
767765
/// State kept by the demux operator.
768766
state: &'a mut DemuxState<A>,
769767
/// State shared across log receivers.
@@ -776,7 +774,7 @@ struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
776774
time: Duration,
777775
}
778776

779-
impl<A: Allocate> DemuxHandler<'_, '_, A> {
777+
impl<A: Scheduler> DemuxHandler<'_, '_, A> {
780778
/// Return the timestamp associated with the current event, based on the event time and the
781779
/// logging interval.
782780
fn ts(&self) -> Timestamp {
@@ -1198,7 +1196,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
11981196
) {
11991197
let activator = self
12001198
.state
1201-
.worker
1199+
.scheduler
12021200
.activator_for(address.into_iter().collect());
12031201
let existing = self
12041202
.state

0 commit comments

Comments
 (0)