diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 84baf4fe35a29..9236f6f7dbb47 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -467,6 +467,7 @@ pub struct PeekStageLinearizeTimestamp { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, max_query_result_size: Option, + max_query_heap_size: Option, source_ids: BTreeSet, target_replica: Option, timeline_context: TimelineContext, @@ -481,6 +482,7 @@ pub struct PeekStageRealTimeRecency { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, max_query_result_size: Option, + max_query_heap_size: Option, source_ids: BTreeSet, target_replica: Option, timeline_context: TimelineContext, @@ -496,6 +498,7 @@ pub struct PeekStageTimestampReadHold { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, max_query_result_size: Option, + max_query_heap_size: Option, source_ids: BTreeSet, target_replica: Option, timeline_context: TimelineContext, @@ -512,6 +515,7 @@ pub struct PeekStageOptimize { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, max_query_result_size: Option, + max_query_heap_size: Option, source_ids: BTreeSet, id_bundle: CollectionIdBundle, target_replica: Option, @@ -527,6 +531,7 @@ pub struct PeekStageFinish { validity: PlanValidity, plan: mz_sql::plan::SelectPlan, max_query_result_size: Option, + max_query_heap_size: Option, id_bundle: CollectionIdBundle, target_replica: Option, source_ids: BTreeSet, diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 61e7665e36cc3..93efff8c9eb12 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -379,8 +379,10 @@ impl Coordinator { self.sequence_end_transaction(ctx, action).await; } Plan::Select(plan) => { - let max = Some(ctx.session().vars().max_query_result_size()); - self.sequence_peek(ctx, plan, target_cluster, max).await; + let max_query = Some(ctx.session().vars().max_query_result_size()); + let max_heap = ctx.session().vars().max_query_heap_size(); + self.sequence_peek(ctx, plan, target_cluster, max_query, max_heap) + .await; } Plan::Subscribe(plan) => { self.sequence_subscribe(ctx, plan, target_cluster).await; @@ -392,9 +394,16 @@ impl Coordinator { ctx.retire(Ok(Self::send_immediate_rows(plan.row))); } Plan::ShowColumns(show_columns_plan) => { - let max = Some(ctx.session().vars().max_query_result_size()); - self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max) - .await; + let max_query = Some(ctx.session().vars().max_query_result_size()); + let max_heap = ctx.session().vars().max_query_heap_size(); + self.sequence_peek( + ctx, + show_columns_plan.select_plan, + target_cluster, + max_query, + max_heap, + ) + .await; } Plan::CopyFrom(plan) => match plan.source { CopyFromSource::Stdin => { diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 6544a9b2c6b3e..f421aa423c46a 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -2580,6 +2580,7 @@ impl Coordinator { None, ExplainContext::Pushdown, Some(ctx.session().vars().max_query_result_size()), + ctx.session().vars().max_query_heap_size(), ), ctx ); @@ -3004,6 +3005,7 @@ impl Coordinator { }, TargetCluster::Active, None, + None, ) .await; diff --git a/src/adapter/src/coord/sequencer/inner/peek.rs b/src/adapter/src/coord/sequencer/inner/peek.rs index 7397a47e99d8d..7d0e19f6d93b4 100644 --- a/src/adapter/src/coord/sequencer/inner/peek.rs +++ b/src/adapter/src/coord/sequencer/inner/peek.rs @@ -128,6 +128,7 @@ impl Coordinator { plan: plan::SelectPlan, target_cluster: TargetCluster, max_query_result_size: Option, + max_query_heap_size: Option, ) { let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() { let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths()); @@ -143,7 +144,8 @@ impl Coordinator { target_cluster, None, explain_ctx, - max_query_result_size + max_query_result_size, + max_query_heap_size, ), ctx ); @@ -209,6 +211,7 @@ impl Coordinator { }), ExplainContext::None, Some(ctx.session().vars().max_query_result_size()), + ctx.session().vars().max_query_heap_size(), ), ctx ); @@ -258,6 +261,7 @@ impl Coordinator { optimizer_trace, }), Some(ctx.session().vars().max_query_result_size()), + ctx.session().vars().max_query_heap_size(), ), ctx ); @@ -274,6 +278,7 @@ impl Coordinator { copy_to_ctx: Option, explain_ctx: ExplainContext, max_query_result_size: Option, + max_query_heap_size: Option, ) -> Result { // Collect optimizer parameters. let catalog = self.owned_catalog(); @@ -391,6 +396,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, source_ids, target_replica, timeline_context, @@ -409,6 +415,7 @@ impl Coordinator { source_ids, plan, max_query_result_size, + max_query_heap_size, target_replica, timeline_context, optimizer, @@ -424,6 +431,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, source_ids, target_replica, timeline_context, @@ -468,6 +476,7 @@ impl Coordinator { mut validity, plan, max_query_result_size, + max_query_heap_size, source_ids, target_replica, timeline_context, @@ -508,6 +517,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, source_ids, id_bundle, target_replica, @@ -526,6 +536,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, source_ids, id_bundle, target_replica, @@ -640,6 +651,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, id_bundle, target_replica, source_ids, @@ -656,6 +668,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, id_bundle, target_replica, source_ids, @@ -745,6 +758,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, source_ids, target_replica, timeline_context, @@ -772,6 +786,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, target_replica, timeline_context, source_ids, @@ -790,6 +805,7 @@ impl Coordinator { validity, plan, max_query_result_size, + max_query_heap_size, target_replica, timeline_context, source_ids, @@ -810,6 +826,7 @@ impl Coordinator { validity: _, plan, max_query_result_size, + max_query_heap_size, id_bundle, target_replica, source_ids, @@ -833,9 +850,13 @@ impl Coordinator { let session = ctx.session_mut(); let conn_id = session.conn_id().clone(); - let (peek_plan, df_meta, typ) = global_lir_plan.unapply(); + let (mut peek_plan, df_meta, typ) = global_lir_plan.unapply(); let source_arity = typ.arity(); + if let peek::PeekPlan::SlowPath(PeekDataflowPlan { desc, .. }) = &mut peek_plan { + desc.heap_size_limit = max_query_heap_size; + } + self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices); let target_cluster = self.catalog().get_cluster(cluster_id); diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index 26274f2c51c34..f2aeac41be2c1 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -355,7 +355,9 @@ impl Coordinator { }; active_subscribe.initialize(); - let (df_desc, df_meta) = global_lir_plan.unapply(); + let (mut df_desc, df_meta) = global_lir_plan.unapply(); + + df_desc.heap_size_limit = ctx.session().vars().max_query_heap_size(); // Emit notices. self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices); diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index 53d3bd2dec618..7de21f1cae0c2 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -1103,6 +1103,7 @@ mod tests { refresh_schedule: Default::default(), debug_name: Default::default(), time_dependence: None, + heap_size_limit: None, } } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 67e7f9b1a1708..e50570d4e3c16 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -65,8 +65,8 @@ use crate::protocol::command::{ }; use crate::protocol::history::ComputeCommandHistory; use crate::protocol::response::{ - ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse, - StatusResponse, SubscribeBatch, SubscribeResponse, + ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse, + OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeBatch, SubscribeResponse, }; use crate::service::{ComputeClient, ComputeGrpcClient}; @@ -1360,6 +1360,7 @@ where refresh_schedule: dataflow.refresh_schedule, debug_name: dataflow.debug_name, time_dependence: dataflow.time_dependence, + heap_size_limit: dataflow.heap_size_limit, }; if augmented_dataflow.is_transient() { @@ -2015,6 +2016,40 @@ where StatusResponse::OperatorHydration(status) => { self.update_operator_hydration_status(replica_id, status) } + StatusResponse::DataflowLimitExceeded(status) => { + self.handle_dataflow_limit_status(replica_id, status) + } + } + } + + /// Handle a dataflow limit exceeded response. + fn handle_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) { + tracing::debug!(%replica_id, ?status, "dataflow limit exceeded"); + if let Some(subscribe) = self.subscribes.get(&status.collection_id) { + self.deliver_response(ComputeControllerResponse::SubscribeResponse( + status.collection_id, + SubscribeBatch { + lower: subscribe.frontier.clone(), + upper: subscribe.frontier.clone(), + updates: Err("Dataflow limit exceeded".to_string()), + }, + )) + // Look for a matching peek + } else if let Some((uuid, _)) = self + .peeks + .iter() + .find(|(_, peek)| peek.read_hold.id() == status.collection_id) + { + self.cancel_peek( + *uuid, + PeekResponse::Error("Dataflow limit exceeded".to_string()), + ); + } else { + tracing::warn!( + %replica_id, + collection_id = %status.collection_id, + "dataflow limit exceeded for unknown collection" + ); } } diff --git a/src/compute-client/src/protocol/response.proto b/src/compute-client/src/protocol/response.proto index cd73d62bc29b8..cf678574c26a8 100644 --- a/src/compute-client/src/protocol/response.proto +++ b/src/compute-client/src/protocol/response.proto @@ -107,6 +107,7 @@ message ProtoSubscribeBatch { message ProtoStatusResponse { oneof kind { ProtoOperatorHydrationStatus operator_hydration = 1; + ProtoDataflowLimitStatus dataflow_limit_status = 2; } } @@ -116,3 +117,7 @@ message ProtoOperatorHydrationStatus { uint64 worker_id = 3; bool hydrated = 4; } + +message ProtoDataflowLimitStatus { + mz_repr.global_id.ProtoGlobalId collection_id = 1; +} diff --git a/src/compute-client/src/protocol/response.rs b/src/compute-client/src/protocol/response.rs index b08ea181718db..15cac40797cdd 100644 --- a/src/compute-client/src/protocol/response.rs +++ b/src/compute-client/src/protocol/response.rs @@ -589,6 +589,8 @@ impl Arbitrary for SubscribeBatch { pub enum StatusResponse { /// Reports the hydration status of dataflow operators. OperatorHydration(OperatorHydrationStatus), + /// Reports limit violations for dataflows. + DataflowLimitExceeded(DataflowLimitStatus), } impl RustType for StatusResponse { @@ -597,6 +599,7 @@ impl RustType for StatusResponse { let kind = match self { Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()), + Self::DataflowLimitExceeded(status) => Kind::DataflowLimitStatus(status.into_proto()), }; ProtoStatusResponse { kind: Some(kind) } } @@ -608,6 +611,9 @@ impl RustType for StatusResponse { Some(Kind::OperatorHydration(status)) => { Ok(Self::OperatorHydration(status.into_rust()?)) } + Some(Kind::DataflowLimitStatus(status)) => { + Ok(Self::DataflowLimitExceeded(status.into_rust()?)) + } None => Err(TryFromProtoError::missing_field( "ProtoStatusResponse::kind", )), @@ -650,6 +656,29 @@ impl RustType for OperatorHydrationStatus { } } +/// A dataflow exceeded some limit. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] +pub struct DataflowLimitStatus { + /// The ID of the compute collection exported by the dataflow. + pub collection_id: GlobalId, +} + +impl RustType for DataflowLimitStatus { + fn into_proto(&self) -> ProtoDataflowLimitStatus { + ProtoDataflowLimitStatus { + collection_id: Some(self.collection_id.into_proto()), + } + } + + fn from_proto(proto: ProtoDataflowLimitStatus) -> Result { + Ok(Self { + collection_id: proto + .collection_id + .into_rust_if_some("ProtoDataflowLimitStatus::collection_id")?, + }) + } +} + #[cfg(test)] mod tests { use mz_ore::assert_ok; diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index d5b94e850be08..8d5d0c88c9538 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -62,6 +62,8 @@ message ProtoDataflowDescription { optional mz_storage_types.time_dependence.ProtoTimeDependence time_dependence = 11; string debug_name = 8; + + optional uint64 heap_size_limit = 12; } message ProtoIndexDesc { diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index 0ad6ad8e924ee..769214e669799 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -75,6 +75,8 @@ pub struct DataflowDescription { pub debug_name: String, /// Description of how the dataflow's progress relates to wall-clock time. None for unknown. pub time_dependence: Option, + /// Optional heap size limit for this dataflow, in bytes. + pub heap_size_limit: Option, } impl DataflowDescription { @@ -147,6 +149,7 @@ impl DataflowDescription { refresh_schedule: None, debug_name: name, time_dependence: None, + heap_size_limit: None, } } @@ -568,6 +571,7 @@ where refresh_schedule: self.refresh_schedule.clone(), debug_name: self.debug_name.clone(), time_dependence: self.time_dependence.clone(), + heap_size_limit: self.heap_size_limit, } } } @@ -586,6 +590,7 @@ impl RustType for DataflowDescription for DataflowDescription(), proptest::string::string_regex(".*").unwrap(), ), - any::>(), + (any::>(), any::>()), ) .prop_map( |( @@ -814,7 +820,7 @@ where refresh_schedule, debug_name, ), - time_dependence, + (time_dependence, heap_size_limit), )| DataflowDescription { source_imports: BTreeMap::from_iter(source_imports), index_imports: BTreeMap::from_iter(index_imports), @@ -839,6 +845,7 @@ where }, debug_name, time_dependence, + heap_size_limit, }, ) } diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 3edd3a09ca472..bd0dd050c48a3 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -222,33 +222,41 @@ pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config = Config "Round observed timestamps to slack.", ); +/// Whether to render the logging watchdog dataflow fragment. +pub const ENABLE_COMPUTE_HEAP_SIZE_LIMIT: Config = Config::new( + "enable_compute_heap_size_limit", + false, + "Whether to enable the heap size limit feature.", +); + /// Adds the full set of all compute `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs - .add(&ENABLE_MZ_JOIN_CORE) + .add(&COMPUTE_APPLY_COLUMN_DEMANDS) + .add(&COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK) + .add(&COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES) + .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) + .add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL) + .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER) + .add(©_TO_S3_ARROW_BUILDER_BUFFER_RATIO) + .add(©_TO_S3_MULTIPART_PART_SIZE_BYTES) + .add(©_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO) + .add(&DATAFLOW_MAX_INFLIGHT_BYTES) + .add(&DATAFLOW_MAX_INFLIGHT_BYTES_CC) + .add(&ENABLE_COLUMNAR_LGALLOC) + .add(&ENABLE_COLUMNATION_LGALLOC) + .add(&ENABLE_COMPUTE_HEAP_SIZE_LIMIT) + .add(&ENABLE_COMPUTE_LOGICAL_BACKPRESSURE) + .add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION) + .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) .add(&ENABLE_CORRECTION_V2) - .add(&LINEAR_JOIN_YIELDING) .add(&ENABLE_LGALLOC) + .add(&ENABLE_LGALLOC_EAGER_RECLAMATION) + .add(&ENABLE_MZ_JOIN_CORE) + .add(&HYDRATION_CONCURRENCY) .add(&LGALLOC_BACKGROUND_INTERVAL) .add(&LGALLOC_FILE_GROWTH_DAMPENER) .add(&LGALLOC_LOCAL_BUFFER_BYTES) .add(&LGALLOC_SLOW_CLEAR_BYTES) - .add(&ENABLE_LGALLOC_EAGER_RECLAMATION) - .add(&ENABLE_COLUMNATION_LGALLOC) - .add(&ENABLE_COLUMNAR_LGALLOC) - .add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL) - .add(&DATAFLOW_MAX_INFLIGHT_BYTES) - .add(&DATAFLOW_MAX_INFLIGHT_BYTES_CC) - .add(&HYDRATION_CONCURRENCY) - .add(©_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO) - .add(©_TO_S3_ARROW_BUILDER_BUFFER_RATIO) - .add(©_TO_S3_MULTIPART_PART_SIZE_BYTES) - .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) - .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) - .add(&COMPUTE_APPLY_COLUMN_DEMANDS) - .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER) - .add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION) - .add(&ENABLE_COMPUTE_LOGICAL_BACKPRESSURE) - .add(&COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES) - .add(&COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK) + .add(&LINEAR_JOIN_YIELDING) } diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 7b9d2d1a693c3..ca223558e2f9e 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -120,6 +120,7 @@ impl Context { refresh_schedule: desc.refresh_schedule, debug_name: desc.debug_name, time_dependence: desc.time_dependence, + heap_size_limit: desc.heap_size_limit, }) } diff --git a/src/compute/src/command_channel.rs b/src/compute/src/command_channel.rs index 2bf082baeca41..18da2af8fd8f6 100644 --- a/src/compute/src/command_channel.rs +++ b/src/compute/src/command_channel.rs @@ -186,6 +186,7 @@ fn split_command( initial_storage_as_of: dataflow.initial_storage_as_of.clone(), refresh_schedule: dataflow.refresh_schedule.clone(), time_dependence: dataflow.time_dependence.clone(), + heap_size_limit: dataflow.heap_size_limit.clone(), }) .map(ComputeCommand::CreateDataflow); Either::Left(commands) diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index a1f1d4c7400c4..892289325a7fd 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -25,10 +25,11 @@ use mz_compute_client::protocol::command::{ }; use mz_compute_client::protocol::history::ComputeCommandHistory; use mz_compute_client::protocol::response::{ - ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse, - StatusResponse, SubscribeResponse, + ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse, + OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeResponse, }; use mz_compute_types::dataflows::DataflowDescription; +use mz_compute_types::dyncfgs::ENABLE_COMPUTE_HEAP_SIZE_LIMIT; use mz_compute_types::plan::render_plan::RenderPlan; use mz_compute_types::plan::LirId; use mz_dyncfg::ConfigSet; @@ -66,6 +67,7 @@ use crate::arrangement::manager::{TraceBundle, TraceManager}; use crate::logging; use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent}; use crate::logging::initialize::LoggingTraces; +use crate::logging::LoggingHandles; use crate::metrics::{CollectionMetrics, WorkerMetrics}; use crate::render::{LinearJoinSpec, StartSignal}; use crate::server::{ComputeInstanceContext, ResponseSender}; @@ -176,6 +178,14 @@ pub struct ComputeState { /// replica can drop diffs associated with timestamps beyond the replica expiration. /// The replica will panic if such dataflows are not dropped before the replica has expired. pub replica_expiration: Antichain, + + /// The interval for rounding logging timestamps. + logging_interval: Duration, + + /// The handles to inject updates for logging dataflows. Only available if logging is enabled. + pub logging_handles: LoggingHandles, + /// Shared set of collections exceeding their heap size limit. + pub dataflows_exceeding_heap_size_limit: Rc>>, } impl ComputeState { @@ -220,6 +230,9 @@ impl ComputeState { server_maintenance_interval: Duration::ZERO, init_system_time: mz_ore::now::SYSTEM_TIME(), replica_expiration: Antichain::default(), + logging_interval: Duration::ZERO, + logging_handles: LoggingHandles::default(), + dataflows_exceeding_heap_size_limit: Rc::default(), } } @@ -489,6 +502,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())), plan_until = ?dataflow.until.elements(), until = ?until.elements(), + heap_size_limit = ?dataflow.heap_size_limit, "creating dataflow", ); } else { @@ -502,6 +516,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())), plan_until = ?dataflow.until.elements(), until = ?until.elements(), + heap_size_limit = ?dataflow.heap_size_limit, "creating dataflow", ); }; @@ -511,11 +526,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { .chain(dataflow.copy_to_ids()) .collect(); + let dataflow_id = self.timely_worker.next_dataflow_index(); + // Initialize compute and logging state for each object. for object_id in dataflow.export_ids() { let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id); let metrics = self.compute_state.metrics.for_collection(object_id); - let mut collection = CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics); + let mut collection = + CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics, dataflow_id); if let Some(logger) = self.compute_state.compute_logger.clone() { let logging = CollectionLogging::new( @@ -547,6 +565,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { .insert(id, Rc::clone(&suspension_token)); } + if ENABLE_COMPUTE_HEAP_SIZE_LIMIT.get(&self.compute_state.worker_config) + && dataflow.is_transient() + { + for id in dataflow.export_ids() { + self.handle_set_heap_size_limit(id, dataflow.heap_size_limit); + } + } + crate::render::build_compute_dataflow( self.timely_worker, self.compute_state, @@ -578,6 +604,25 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { } } + /// Handles an update to a heap memory limit for a collection. + fn handle_set_heap_size_limit(&mut self, id: GlobalId, limit: Option) { + let collection = self + .compute_state + .collections + .get_mut(&id) + .expect("collection must exist"); + + let handle = &mut self.compute_state.logging_handles.heap_size_limits_handle; + let time = *handle.time(); + if let Some(limit) = collection.heap_size_limit { + handle.send(((collection.dataflow_id, limit), time, -1)); + } + collection.heap_size_limit = limit; + if let Some(limit) = collection.heap_size_limit { + handle.send(((collection.dataflow_id, limit), time, 1)); + } + } + #[mz_ore::instrument(level = "debug")] fn handle_peek(&mut self, peek: Peek) { let pending = match &peek.target { @@ -646,6 +691,12 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { is_subscribe_or_copy: collection.is_subscribe_or_copy, }; self.compute_state.dropped_collections.push((id, dropped)); + + if let Some(limit) = collection.heap_size_limit { + let handle = &mut self.compute_state.logging_handles.heap_size_limits_handle; + let time = *handle.time(); + handle.send(((collection.dataflow_id, limit), time, -1)); + } } /// Initializes timely dataflow logging and publishes as a view. @@ -658,7 +709,12 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { traces, dataflow_index, compute_logger: logger, - } = logging::initialize(self.timely_worker, &config); + } = logging::initialize( + self.timely_worker, + &config, + Rc::clone(&self.compute_state.dataflows_exceeding_heap_size_limit), + &mut self.compute_state.logging_handles, + ); let mut log_index_ids = config.index_logs; for (log, trace) in traces { @@ -672,7 +728,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { let is_subscribe_or_copy = false; let as_of = Antichain::from_elem(Timestamp::MIN); let metrics = self.compute_state.metrics.for_collection(id); - let mut collection = CollectionState::new(is_subscribe_or_copy, as_of, metrics); + let mut collection = + CollectionState::new(is_subscribe_or_copy, as_of, metrics, dataflow_index); let logging = CollectionLogging::new(id, logger.clone(), dataflow_index, std::iter::empty()); @@ -694,6 +751,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { ); self.compute_state.compute_logger = Some(logger); + self.compute_state.logging_interval = config.interval; } /// Send progress information to the controller. @@ -952,6 +1010,42 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { } } + /// Process reported limit violations. + /// + /// Reports any dataflows that have exceeded their heap size limit to the controller. + pub fn process_limits(&self) { + let mut borrow = self + .compute_state + .dataflows_exceeding_heap_size_limit + .borrow_mut(); + for dataflow_id in std::mem::take(&mut *borrow) { + // We don't have a mapping of dataflow ID to global ID, so scan instead. + for (&collection_id, state) in self.compute_state.collections.iter() { + if state.dataflow_id == dataflow_id { + let status = DataflowLimitStatus { collection_id }; + self.send_compute_response(ComputeResponse::Status( + StatusResponse::DataflowLimitExceeded(status), + )); + } + } + } + } + + /// Advance handles to logging dataflows. + /// + /// Advances the handles based on the current system time. + pub fn advance_handles(&mut self) { + // We try to tick time rounded to the logging interval. + let interval_ms = std::cmp::max(1, self.compute_state.logging_interval.as_millis()); + let interval_ms = u64::try_from(interval_ms).expect("must fit"); + // TODO: Is it correct to use the system time? + let now = mz_ore::now::SYSTEM_TIME(); + let time_ms = ((now / interval_ms) + 1) * interval_ms; + let new_time_ms: Timestamp = time_ms.try_into().expect("must fit"); + + self.compute_state.logging_handles.advance_to(new_time_ms); + } + /// Send a response to the coordinator. fn send_compute_response(&self, response: ComputeResponse) { // Ignore send errors because the coordinator is free to ignore our @@ -1639,6 +1733,10 @@ pub struct CollectionState { logging: Option, /// Metrics tracked for this collection. metrics: CollectionMetrics, + /// The dataflow ID of the dataflow that hosts this collection. + pub dataflow_id: usize, + /// The heap size limit for this dataflow, in bytes. + pub heap_size_limit: Option, } impl CollectionState { @@ -1646,6 +1744,7 @@ impl CollectionState { is_subscribe_or_copy: bool, as_of: Antichain, metrics: CollectionMetrics, + dataflow_id: usize, ) -> Self { Self { reported_frontiers: ReportedFrontiers::new(), @@ -1657,6 +1756,8 @@ impl CollectionState { compute_probe: None, logging: None, metrics, + dataflow_id, + heap_size_limit: None, } } diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index f43998216ea5e..b639bffbe96eb 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -14,6 +14,7 @@ mod differential; pub(super) mod initialize; mod reachability; mod timely; +mod watchdog; use std::any::Any; use std::collections::BTreeMap; @@ -26,6 +27,7 @@ use ::timely::dataflow::channels::pact::Pipeline; use ::timely::dataflow::channels::pushers::buffer::Session; use ::timely::dataflow::channels::pushers::{Counter, Tee}; use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher}; +use ::timely::dataflow::operators::input::Handle; use ::timely::dataflow::operators::Operator; use ::timely::dataflow::StreamCore; use ::timely::progress::Timestamp as TimelyTimestamp; @@ -156,6 +158,20 @@ struct SharedLoggingState { arrangement_size_activators: BTreeMap, } +/// Handles to logging inputs. +#[derive(Default)] +pub(crate) struct LoggingHandles { + /// Handle to provide heap size limits per dataflow. + pub heap_size_limits_handle: Handle>, +} + +impl LoggingHandles { + /// Advance the time of all handles to `time`. + pub(crate) fn advance_to(&mut self, time: Timestamp) { + self.heap_size_limits_handle.advance_to(time); + } +} + /// Helper to pack collections of [`Datum`]s into key and value row. pub(crate) struct PermutedRowPacker { key: Vec, diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 59aa439c03acb..f4fbb98f6fbc8 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -23,11 +23,13 @@ use mz_compute_types::plan::LirId; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, GlobalId, Timestamp}; use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder}; +use mz_timely_util::operator::Bifurcate; use mz_timely_util::replay::MzReplay; +use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::core::Map; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::{Concatenate, Enter, Operator}; +use timely::dataflow::operators::{Concatenate, Enter, Leave, Operator}; use timely::dataflow::{Scope, Stream, StreamCore}; use timely::scheduling::Scheduler; use timely::{Container, Data}; @@ -286,9 +288,11 @@ impl LirMetadata { } /// The return type of the [`construct`] function. -pub(super) struct Return { +pub(super) struct Return { /// Collections returned by [`construct`]. pub collections: BTreeMap, + /// The arrangement heap size stream for each operator. + pub arrangement_heap_size: Stream>, } /// Constructs the logging dataflow fragment for compute logs. @@ -307,7 +311,7 @@ pub(super) fn construct> event_queue: EventQueue>, compute_event_streams: impl IntoIterator>>, shared_state: Rc>, -) -> Return { +) -> Return { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); scope.scoped("compute logging", move |scope| { @@ -478,6 +482,17 @@ pub(super) fn construct> ]) }; + let (arrangement_heap_size, arrangement_heap_size_stripped) = + arrangement_heap_size.bifurcate::>, _>( + "arrangement_heap_size", + |data, session| { + session.give_iterator( + IntoIterator::into_iter(&*data) + .map(|(data, time, diff)| ((data.operator_id, ()), *time, *diff)), + ); + }, + ); + let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize); let arrangement_heap_size = arrangement_heap_size .as_collection() @@ -586,7 +601,10 @@ pub(super) fn construct> } } - Return { collections } + Return { + collections, + arrangement_heap_size: arrangement_heap_size_stripped.leave(), + } }) } diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index e495791828383..c3b884cf6398a 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -23,22 +23,23 @@ use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::containers::{ columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder, }; +use mz_timely_util::operator::{consolidate_pact, Bifurcate}; use mz_timely_util::replay::MzReplay; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::Leave; +use timely::dataflow::operators::{Leave, Operator}; use timely::dataflow::{Scope, Stream, StreamCore}; use crate::extensions::arrange::MzArrangeCore; use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent}; use crate::logging::{ consolidate_and_pack, DifferentialLog, EventQueue, LogCollection, LogVariant, - OutputSessionColumnar, SharedLoggingState, + OutputSessionColumnar, PermutedRowPacker, SharedLoggingState, Update, }; use crate::row_spine::RowRowBuilder; -use crate::typedefs::{KeyBatcher, RowRowSpine}; +use crate::typedefs::{KeyBatcher, RowRowSpine, VecKeyBatcher}; /// The return type of [`construct`]. pub(super) struct Return { @@ -46,6 +47,8 @@ pub(super) struct Return { pub collections: BTreeMap, /// Stream of compute events generated by the differential logger. pub compute_events: StreamCore>, + /// The batcher heap size stream for each operator. + pub batcher_heap_size: Stream>, } /// Constructs the logging dataflow fragment for differential logs. @@ -154,17 +157,45 @@ pub(super) fn construct>( let arrangement_records = stream_to_collection(&records, ArrangementRecords); let sharing = stream_to_collection(&sharing, Sharing); let batcher_records = stream_to_collection(&batcher_records, BatcherRecords); - let batcher_size = stream_to_collection(&batcher_size, BatcherSize); let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity); let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations); + // Handle batcher_size differently because we need to fork it. Sorry for the wall of text. + let (batcher_size_rows, batcher_size) = { + let worker_id = scope.index(); + let log: LogVariant = BatcherSize.into(); + let c_name = &format!("Consolidate {log:?}"); + let u_name = &format!("ToRow {log:?}"); + let mut packer = PermutedRowPacker::new(log); + let consolidated = consolidate_pact::, _, _>(&batcher_size, Pipeline, c_name); + let (consolidated, batcher_size_rows) = consolidated.bifurcate::, _>(u_name, move |data, session| { + for data in data.iter().flatten() { + for ((op, ()), time, diff) in data.iter() { + let data = packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(worker_id)), + ]); + session.give((data, *time, *diff)) + } + } + }); + let batcher_size = consolidated.unary(Pipeline, "Unpack consolidated batcher_size", |_, _| move |input, output| { + input.for_each(|time, data| { + for container in data.into_iter().flatten() { + output.session(&time).give_container(container); + } + }); + }); + (batcher_size_rows, batcher_size) + }; + use DifferentialLog::*; let logs = [ (ArrangementBatches, arrangement_batches), (ArrangementRecords, arrangement_records), (Sharing, sharing), (BatcherRecords, batcher_records), - (BatcherSize, batcher_size), + (BatcherSize, batcher_size_rows), (BatcherCapacity, batcher_capacity), (BatcherAllocations, batcher_allocations), ]; @@ -188,7 +219,7 @@ pub(super) fn construct>( } } - Return { collections, compute_events: compute_events.leave(), } + Return { collections, compute_events: compute_events.leave(), batcher_heap_size: batcher_size.leave() } }) } diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index e9ddc73f80208..8af0f058d154d 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -6,7 +6,7 @@ //! Initialization of logging dataflows. use std::cell::RefCell; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::rc::Rc; use std::time::{Duration, Instant}; @@ -30,16 +30,25 @@ use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder} use crate::arrangement::manager::TraceBundle; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::logging::compute::{ComputeEvent, ComputeEventBuilder}; -use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; +use crate::logging::{BatchLogger, EventQueue, LoggingHandles, SharedLoggingState}; use crate::typedefs::{ErrBatcher, ErrBuilder}; /// Initialize logging dataflows. /// +/// The parameters control how we're rending the logging dataflow. +/// * `worker` is the Timely worker. +/// * `config` is the logging config. +/// * `dataflows_exceeding_heap_size_limit` is a shared set of dataflow IDs that the watchdog +/// writes at. +/// * `logging_handles` contains handles to inject log data. +/// /// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for /// retrieving logged records as well as the index of the exporting dataflow. pub fn initialize( worker: &mut timely::worker::Worker, config: &LoggingConfig, + dataflows_exceeding_heap_size_limit: Rc>>, + logging_handles: &mut LoggingHandles, ) -> LoggingTraces { let interval_ms = std::cmp::max(1, config.interval.as_millis()); @@ -62,6 +71,7 @@ pub fn initialize( d_event_queue: EventQueue::new("d"), c_event_queue: EventQueue::new("c"), shared_state: Default::default(), + dataflows_exceeding_heap_size_limit, }; // Depending on whether we should log the creation of the logging dataflows, we register the @@ -69,9 +79,9 @@ pub fn initialize( let dataflow_index = context.worker.next_dataflow_index(); let traces = if config.log_logging { context.register_loggers(); - context.construct_dataflow() + context.construct_dataflow(logging_handles) } else { - let traces = context.construct_dataflow(); + let traces = context.construct_dataflow(logging_handles); context.register_loggers(); traces }; @@ -97,6 +107,7 @@ struct LoggingContext<'a, A: Allocate> { d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, + dataflows_exceeding_heap_size_limit: Rc>>, } pub(crate) struct LoggingTraces { @@ -109,13 +120,17 @@ pub(crate) struct LoggingTraces { } impl LoggingContext<'_, A> { - fn construct_dataflow(&mut self) -> BTreeMap { + fn construct_dataflow( + &mut self, + logging_handles: &mut LoggingHandles, + ) -> BTreeMap { self.worker.dataflow_named("Dataflow: logging", |scope| { let mut collections = BTreeMap::new(); let super::timely::Return { collections: timely_collections, compute_events: compute_events_timely, + operator_to_dataflow, } = super::timely::construct(scope.clone(), self.config, self.t_event_queue.clone()); collections.extend(timely_collections); @@ -131,6 +146,7 @@ impl LoggingContext<'_, A> { let super::differential::Return { collections: differential_collections, compute_events: compute_events_differential, + batcher_heap_size, } = super::differential::construct( scope.clone(), self.config, @@ -141,6 +157,7 @@ impl LoggingContext<'_, A> { let super::compute::Return { collections: compute_collections, + arrangement_heap_size, } = super::compute::construct( scope.clone(), scope.parent.clone(), @@ -151,6 +168,19 @@ impl LoggingContext<'_, A> { ); collections.extend(compute_collections); + let heap_size_limits = logging_handles.heap_size_limits_handle.to_stream(scope); + + let watchdog_streams = super::watchdog::Streams { + arrangement_heap_size, + batcher_heap_size, + operator_to_dataflow, + heap_size_limits, + dataflows_exceeding_heap_size_limit: Rc::clone( + &self.dataflows_exceeding_heap_size_limit, + ), + }; + super::watchdog::construct(scope.clone(), watchdog_streams); + let errs = scope.scoped("logging errors", |scope| { let collection: KeyCollection<_, DataflowError, Diff> = Collection::empty(scope).into(); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index ee671a9f215d9..e0450eac8a12c 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -21,13 +21,15 @@ use mz_repr::{Datum, Diff, Timestamp}; use mz_timely_util::containers::{ columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder, }; +use mz_timely_util::operator::Bifurcate; use mz_timely_util::replay::MzReplay; +use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::Leave; -use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::logging::{ ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent, TimelyEvent, @@ -37,7 +39,7 @@ use tracing::error; use crate::extensions::arrange::MzArrangeCore; use crate::logging::compute::{ComputeEvent, DataflowShutdown}; -use crate::logging::{consolidate_and_pack, LogCollection, OutputSessionColumnar}; +use crate::logging::{consolidate_and_pack, LogCollection, OutputSessionColumnar, Update}; use crate::logging::{EventQueue, LogVariant, TimelyLog}; use crate::row_spine::RowRowBuilder; use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine}; @@ -48,6 +50,7 @@ pub(super) struct Return { pub collections: BTreeMap, /// Stream of compute events generated by Timely logging. pub compute_events: StreamCore>, + pub operator_to_dataflow: Stream>, } /// Constructs the logging dataflow fragment for timely logs. @@ -183,6 +186,12 @@ pub(super) fn construct>( }, ); + let (addresses, operator_to_dataflow) = addresses.bifurcate::>, _>("addresses", |data, output| { + output.give_iterator(IntoIterator::into_iter(data).map(|((op, addr), time, diff)| { + ((*op, addr[0]), *time, *diff) + })); + }); + let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>( &addresses, TimelyLog::Addresses, @@ -328,7 +337,7 @@ pub(super) fn construct>( } } - Return { collections, compute_events: compute_events.leave(), } + Return { collections, compute_events: compute_events.leave(), operator_to_dataflow: operator_to_dataflow.leave() } }) } diff --git a/src/compute/src/logging/watchdog.rs b/src/compute/src/logging/watchdog.rs new file mode 100644 index 0000000000000..019cd99bf24b9 --- /dev/null +++ b/src/compute/src/logging/watchdog.rs @@ -0,0 +1,141 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Logging dataflows for watchdog events generated by clusterd. + +use std::cell::RefCell; +use std::collections::BTreeSet; +use std::rc::Rc; + +use differential_dataflow::trace::{BatchReader, Cursor}; +use differential_dataflow::AsCollection; +use timely::dataflow::operators::Inspect; +use timely::dataflow::operators::{Concat, Enter}; +use timely::dataflow::{Scope, Stream}; + +use crate::extensions::arrange::MzArrange; +use crate::extensions::reduce::MzReduce; +use crate::logging::Update; +use crate::typedefs::spines::{ColKeyBuilder, ColValBatcher, ColValBuilder}; +use crate::typedefs::{KeyBatcher, KeySpine, KeyValSpine}; + +pub(super) struct Streams { + /// operator id, diff in bytes + pub(super) arrangement_heap_size: Stream>, + /// operator id, diff in bytes + pub(super) batcher_heap_size: Stream>, + /// Operator id to dataflow id + pub(super) operator_to_dataflow: Stream>, + /// Dataflow -> limit in bytes + pub(super) heap_size_limits: Stream>, + pub(super) dataflows_exceeding_heap_size_limit: Rc>>, +} + +pub(super) fn construct>( + mut scope: S, + streams: Streams, +) -> () { + let Streams { + arrangement_heap_size, + batcher_heap_size, + operator_to_dataflow, + heap_size_limits, + dataflows_exceeding_heap_size_limit, + } = streams; + + // The following dataflow computes: + // ``` + // arrangement_heap_size batcher_heap_size operator_to_dataflow heap_size_limits + // | | | | + // |----->concat<------ | arrange arrange + // | | | + // arrange: op_to_heap_size | | + // | | | + // |->join: dataflow_to_heap_size<-| | + // | | + // | | + // arrange | + // | | + // |------------>join<-----------------| + // | + // arrange + // | + // reduce size > limit + // | + // result + // ``` + // + // We encode the size of a dataflow in the diff field, and the limit as actual data. + + scope.scoped("watchdog", |inner| { + let arrangement_heap_size = arrangement_heap_size.enter(inner); + let batcher_heap_size = batcher_heap_size.enter(inner); + let operator_to_dataflow = operator_to_dataflow.enter(inner); + let heap_size_limits = heap_size_limits.enter(inner); + + let operator_to_heap_size = arrangement_heap_size.concat(&batcher_heap_size); + let operator_to_heap_size = operator_to_heap_size + .as_collection() + .mz_arrange::, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>( + "operator_to_heap_size", + ); + + let operator_to_dataflow = operator_to_dataflow + .as_collection() + .mz_arrange::, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>( + "operator_to_dataflow", + ); + + let dataflow_to_heap_size = operator_to_heap_size + .join_core(&operator_to_dataflow, |_op, (), dataflow| { + Some((*dataflow, ())) + }) + .mz_arrange::, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>( + "dataflow_to_heap_size", + ); + + let heap_size_limits = heap_size_limits + .as_collection() + .mz_arrange::, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>("heap_size_limits"); + + let dataflow_to_heap_size_limit = dataflow_to_heap_size + .join_core(&heap_size_limits, |dataflow, (), limit| { + std::iter::once((*dataflow, *limit)) + }) + .mz_arrange::, ColValBuilder<_, _, _, _>, KeyValSpine>( + "dataflow_to_heap_size_count", + ) + .mz_reduce_abelian::<_, usize, (), ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>("Reduce: dataflow_to_heap_size_count", |_dataflow, s, t| { + t.extend(s.iter().flat_map(|(&limit, size)| { + if *size >= limit.try_into().expect("must fit") { + // We could output `size` here to report the actual size of the dataflow. + Some(((), 1)) + } else { + None + } + })); + }); + + dataflow_to_heap_size_limit.stream.inspect(move |batch| { + let mut cursor = batch.cursor(); + while cursor.key_valid(batch) { + let dataflow_id = *cursor.key(batch); + cursor.map_times(batch, |_time, _diff| { + // TODO: This only ever adds, never removes (the main loop drains it). Depending + // on what signals we'd like to have, we could also consider _removing_ + // dataflows here. + dataflows_exceeding_heap_size_limit + .borrow_mut() + .insert(dataflow_id); + }); + cursor.step_key(batch); + } + }); + }); +} diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c64fe713ce294..0185890c27ac3 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -382,6 +382,8 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> { compute_state.process_peeks(); compute_state.process_subscribes(); compute_state.process_copy_tos(); + compute_state.process_limits(); + compute_state.advance_handles(); } } } diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index c824ea85a17d6..be88245237649 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -13,8 +13,10 @@ use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::trace::implementations::chunker::ColumnationChunker; -use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher}; +use differential_dataflow::trace::implementations::chunker::{ColumnationChunker, VecChunker}; +use differential_dataflow::trace::implementations::merge_batcher::{ + ColMerger, MergeBatcher, VecMerger, +}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; use mz_repr::Diff; @@ -121,3 +123,8 @@ pub type RowErrBuilder = RowValBuilder; pub type KeyBatcher = KeyValBatcher; pub type KeyValBatcher = MergeBatcher, ColumnationChunker<((K, V), T, D)>, ColMerger<(K, V), T, D>>; + +// Batchers for consolidation, based on vectors. Only use for data that cannot spill. +pub type VecKeyBatcher = VecKeyValBatcher; +pub type VecKeyValBatcher = + MergeBatcher, VecChunker<((K, V), T, D)>, VecMerger<(K, V), T, D>>; diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index ba98083f18eb1..3aefe430be9cc 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -841,6 +841,13 @@ impl SessionVars { .as_bytes() } + /// Returns the value of the `max_query_heap_size` configuration parameter. + pub fn max_query_heap_size(&self) -> Option { + self.expect_value::>(&MAX_QUERY_HEAP_SIZE) + .as_ref() + .map(ByteSize::as_bytes) + } + /// Sets the external metadata associated with the user. pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) { self.user.external_metadata = Some(metadata); @@ -1095,6 +1102,7 @@ impl SystemVars { &TIMEZONE, &TRANSACTION_ISOLATION, &MAX_QUERY_RESULT_SIZE, + &MAX_QUERY_HEAP_SIZE, ] .into_iter() .map(|var| (UncasedStr::new(var.name()), var)) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 357730b336300..ae6a1d03f7cb0 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -577,6 +577,13 @@ pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new( true, ); +pub static MAX_QUERY_HEAP_SIZE: VarDefinition = VarDefinition::new( + "max_query_heap_size", + value!(Option; None), + "The maximum size in bytes for a single query's dataflow (Materialize).", + true, +); + pub static MAX_COPY_FROM_SIZE: VarDefinition = VarDefinition::new( "max_copy_from_size", // 1 GiB, this limit is noted in the docs, if you change it make sure to update our docs. diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index 3f1c9d6ab6244..0882589c546ed 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -928,3 +928,71 @@ where std::mem::take(chain) } } + +pub use bifurcate::Bifurcate; + +mod bifurcate { + use timely::container::ContainerBuilder; + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::channels::pushers::buffer::Session; + use timely::dataflow::channels::pushers::{Counter, Tee}; + use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; + use timely::dataflow::{Scope, StreamCore}; + use timely::Container; + + use crate::containers::ProvidedBuilder; + + /// Extension trait for [`StreamCore`]. + pub trait Bifurcate: Sized { + /// Bifurcates the stream into two streams. The logic provided will be applied to each + /// input container, and has the opportunity to push data into the provided session for + /// the second output while modifying the container if it chooses to. The container + /// will then be pushed into the first output stream. + fn bifurcate(&self, name: &str, logic: F) -> (Self, StreamCore) + where + CB: ContainerBuilder, + F: FnMut( + &mut C, + &mut Session< + G::Timestamp, + CB, + Counter>, + >, + ) + 'static; + } + + impl Bifurcate for StreamCore { + fn bifurcate(&self, name: &str, mut logic: F) -> (Self, StreamCore) + where + CB: ContainerBuilder, + F: FnMut( + &mut C, + &mut Session< + G::Timestamp, + CB, + Counter>, + >, + ) + 'static, + { + let mut builder = OperatorBuilder::new(format!("Bifurcate({name})"), self.scope()); + let mut input = builder.new_input(self, Pipeline); + let (mut self_out, self_stream) = builder.new_output::>(); + let (mut other_out, other_stream) = builder.new_output::(); + + builder.build(move |_| { + move |_frontiers| { + let mut self_out = self_out.activate(); + let mut other_out = other_out.activate(); + input.for_each(|time, data| { + let mut session = self_out.session_with_builder(&time); + let mut other = other_out.session_with_builder(&time); + logic(data, &mut other); + session.give_container(data); + }); + } + }); + + (self_stream, other_stream) + } + } +} diff --git a/test/sqllogictest/distinct_arrangements.slt b/test/sqllogictest/distinct_arrangements.slt index c9e3c45b2af91..0346fd65b2b4a 100644 --- a/test/sqllogictest/distinct_arrangements.slt +++ b/test/sqllogictest/distinct_arrangements.slt @@ -1099,6 +1099,12 @@ Arrange Timely(MessagesSent) Arrange Timely(Operates) Arrange Timely(Parks) Arrange Timely(Reachability) +Reduce: dataflow_to_heap_size_count +dataflow_to_heap_size +dataflow_to_heap_size_count +heap_size_limits +operator_to_dataflow +operator_to_heap_size statement ok COMMIT diff --git a/test/testdrive/query_heap_size.td b/test/testdrive/query_heap_size.td new file mode 100644 index 0000000000000..492174cf013a7 --- /dev/null +++ b/test/testdrive/query_heap_size.td @@ -0,0 +1,46 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test the `max_query_heap_size` session variable. + +$ set-regex match=\d{13,20} replacement= + +> CREATE TABLE t1 (a int4, b text) + +> INSERT INTO t1 SELECT * FROM generate_series(1, 10000), repeat('a', 100); + +> SET max_query_heap_size = '1kB'; + +> BEGIN + +> DECLARE c CURSOR FOR SUBSCRIBE (SELECT max(a) FROM t1); + +# The first timestamp is computed without detecting that the limit has been +# exceeded. +> FETCH 1 c; + 1 10000 + +# No output should be produced. Instead an error .. notice? +! FETCH 1 c; +contains:Dataflow limit exceeded + +> ROLLBACK; + +> SET max_query_heap_size = '128MB'; + +> BEGIN + +> DECLARE c CURSOR FOR SUBSCRIBE (SELECT max(a) FROM t1); + +> FETCH 1 c; + 1 10000 + +> FETCH 1 FROM c WITH (TIMEOUT = '1s'); + +> ROLLBACK; diff --git a/test/testdrive/session.td b/test/testdrive/session.td index 860a3c8e9d377..c9688cacfb5f1 100644 --- a/test/testdrive/session.td +++ b/test/testdrive/session.td @@ -54,6 +54,7 @@ max_network_policies 25 "The maximum nu max_objects_per_schema 1000 "The maximum number of objects in a schema (Materialize)." max_postgres_connections 1000 "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize)." max_query_result_size "1GB" "The maximum size in bytes for a single query's result (Materialize)." +max_query_heap_size "" "The maximum size in bytes for a single query's dataflow (Materialize)." max_replicas_per_cluster 5 "The maximum number of replicas of a single cluster (Materialize)." max_result_size "1GB" "The maximum size in bytes for an internal query result (Materialize)." max_roles 1000 "The maximum number of roles in the region (Materialize)."