Skip to content

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Jan 30, 2025

Observe a dataflow's arrangement sizes and notify controller once it's over a limit. Let the controller handle the next step.

Related design: #31277

Missing from this PR:

  • A feature flag to disable the behavior.
  • Documentation for users.

Motivation

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@antiguru antiguru force-pushed the heap_limit branch 3 times, most recently from 97777b3 to a5ebc06 Compare February 3, 2025 17:06
@antiguru antiguru mentioned this pull request Feb 18, 2025
5 tasks
@antiguru antiguru force-pushed the heap_limit branch 3 times, most recently from 6729021 to 2a9f336 Compare February 26, 2025 15:12
antiguru added a commit that referenced this pull request Feb 28, 2025
Combine all logging dataflows into a single dataflow.

This allows easier sharing between the dataflows as events can be send
through streams, instead of injecting them into the log events for later
processing. The change looks substantial, but shouldn't alter behavior
other than that injected commands pass through actual streams instead of
injecting them into the logging dataflows.

This has some benefits: The injected commands are consistent, i.e., they
arrive at the expected time, and this change enables us to share
information across logging dataflows more easily, for example to do
analysis such as #31246.


### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru antiguru force-pushed the heap_limit branch 2 times, most recently from fcef92c to 7d8edc7 Compare March 5, 2025 16:13
@antiguru antiguru marked this pull request as ready for review March 5, 2025 16:17
@antiguru antiguru requested review from a team as code owners March 5, 2025 16:17
@antiguru antiguru requested review from ParkMyCar and teskje March 5, 2025 16:17
@antiguru antiguru force-pushed the heap_limit branch 2 times, most recently from be67077 to 1794cc4 Compare March 5, 2025 17:42
antiguru added 3 commits March 6, 2025 15:55
Implement a watchdog dataflow observing the reported heap size of a
dataflow, and allow the controller to make decisions if the heap size
exceeds some limit.

Signed-off-by: Moritz Hoffmann <[email protected]>

Signed-off-by: Moritz Hoffmann <[email protected]>

diff --git c/src/adapter/src/coord.rs i/src/adapter/src/coord.rs
index df80532445..137d82312a 100644
--- c/src/adapter/src/coord.rs
+++ i/src/adapter/src/coord.rs
@@ -467,6 +467,7 @@ pub struct PeekStageLinearizeTimestamp {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -481,6 +482,7 @@ pub struct PeekStageRealTimeRecency {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -496,6 +498,7 @@ pub struct PeekStageTimestampReadHold {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -512,6 +515,7 @@ pub struct PeekStageOptimize {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     id_bundle: CollectionIdBundle,
     target_replica: Option<ReplicaId>,
@@ -527,6 +531,7 @@ pub struct PeekStageFinish {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     id_bundle: CollectionIdBundle,
     target_replica: Option<ReplicaId>,
     source_ids: BTreeSet<GlobalId>,
diff --git c/src/adapter/src/coord/sequencer.rs i/src/adapter/src/coord/sequencer.rs
index 61e7665e36..93efff8c9e 100644
--- c/src/adapter/src/coord/sequencer.rs
+++ i/src/adapter/src/coord/sequencer.rs
@@ -379,8 +379,10 @@ impl Coordinator {
                     self.sequence_end_transaction(ctx, action).await;
                 }
                 Plan::Select(plan) => {
-                    let max = Some(ctx.session().vars().max_query_result_size());
-                    self.sequence_peek(ctx, plan, target_cluster, max).await;
+                    let max_query = Some(ctx.session().vars().max_query_result_size());
+                    let max_heap = ctx.session().vars().max_query_heap_size();
+                    self.sequence_peek(ctx, plan, target_cluster, max_query, max_heap)
+                        .await;
                 }
                 Plan::Subscribe(plan) => {
                     self.sequence_subscribe(ctx, plan, target_cluster).await;
@@ -392,9 +394,16 @@ impl Coordinator {
                     ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
                 }
                 Plan::ShowColumns(show_columns_plan) => {
-                    let max = Some(ctx.session().vars().max_query_result_size());
-                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
-                        .await;
+                    let max_query = Some(ctx.session().vars().max_query_result_size());
+                    let max_heap = ctx.session().vars().max_query_heap_size();
+                    self.sequence_peek(
+                        ctx,
+                        show_columns_plan.select_plan,
+                        target_cluster,
+                        max_query,
+                        max_heap,
+                    )
+                    .await;
                 }
                 Plan::CopyFrom(plan) => match plan.source {
                     CopyFromSource::Stdin => {
diff --git c/src/adapter/src/coord/sequencer/inner.rs i/src/adapter/src/coord/sequencer/inner.rs
index ab1906787f..6e1776c16c 100644
--- c/src/adapter/src/coord/sequencer/inner.rs
+++ i/src/adapter/src/coord/sequencer/inner.rs
@@ -2581,6 +2581,7 @@ impl Coordinator {
                         None,
                         ExplainContext::Pushdown,
                         Some(ctx.session().vars().max_query_result_size()),
+                        ctx.session().vars().max_query_heap_size(),
                     ),
                     ctx
                 );
@@ -3005,6 +3006,7 @@ impl Coordinator {
             },
             TargetCluster::Active,
             None,
+            None,
         )
         .await;

diff --git c/src/adapter/src/coord/sequencer/inner/peek.rs i/src/adapter/src/coord/sequencer/inner/peek.rs
index fe19ca5d34..d53635b744 100644
--- c/src/adapter/src/coord/sequencer/inner/peek.rs
+++ i/src/adapter/src/coord/sequencer/inner/peek.rs
@@ -128,6 +128,7 @@ impl Coordinator {
         plan: plan::SelectPlan,
         target_cluster: TargetCluster,
         max_query_result_size: Option<u64>,
+        max_query_heap_size: Option<u64>,
     ) {
         let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
             let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
@@ -143,7 +144,8 @@ impl Coordinator {
                 target_cluster,
                 None,
                 explain_ctx,
-                max_query_result_size
+                max_query_result_size,
+                max_query_heap_size,
             ),
             ctx
         );
@@ -209,6 +211,7 @@ impl Coordinator {
                 }),
                 ExplainContext::None,
                 Some(ctx.session().vars().max_query_result_size()),
+                ctx.session().vars().max_query_heap_size(),
             ),
             ctx
         );
@@ -258,6 +261,7 @@ impl Coordinator {
                     optimizer_trace,
                 }),
                 Some(ctx.session().vars().max_query_result_size()),
+                ctx.session().vars().max_query_heap_size(),
             ),
             ctx
         );
@@ -274,6 +278,7 @@ impl Coordinator {
         copy_to_ctx: Option<CopyToContext>,
         explain_ctx: ExplainContext,
         max_query_result_size: Option<u64>,
+        max_query_heap_size: Option<u64>,
     ) -> Result<PeekStage, AdapterError> {
         // Collect optimizer parameters.
         let catalog = self.owned_catalog();
@@ -391,6 +396,7 @@ impl Coordinator {
             validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             target_replica,
             timeline_context,
@@ -409,6 +415,7 @@ impl Coordinator {
             source_ids,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             target_replica,
             timeline_context,
             optimizer,
@@ -424,6 +431,7 @@ impl Coordinator {
             validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             target_replica,
             timeline_context,
@@ -468,6 +476,7 @@ impl Coordinator {
             mut validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             target_replica,
             timeline_context,
@@ -508,6 +517,7 @@ impl Coordinator {
             validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             id_bundle,
             target_replica,
@@ -526,6 +536,7 @@ impl Coordinator {
             validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             id_bundle,
             target_replica,
@@ -640,6 +651,7 @@ impl Coordinator {
                                         validity,
                                         plan,
                                         max_query_result_size,
+                                        max_query_heap_size,
                                         id_bundle,
                                         target_replica,
                                         source_ids,
@@ -656,6 +668,7 @@ impl Coordinator {
                                     validity,
                                     plan,
                                     max_query_result_size,
+                                    max_query_heap_size,
                                     id_bundle,
                                     target_replica,
                                     source_ids,
@@ -745,6 +758,7 @@ impl Coordinator {
             validity,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             source_ids,
             target_replica,
             timeline_context,
@@ -772,6 +786,7 @@ impl Coordinator {
                             validity,
                             plan,
                             max_query_result_size,
+                            max_query_heap_size,
                             target_replica,
                             timeline_context,
                             source_ids,
@@ -790,6 +805,7 @@ impl Coordinator {
                     validity,
                     plan,
                     max_query_result_size,
+                    max_query_heap_size,
                     target_replica,
                     timeline_context,
                     source_ids,
@@ -810,6 +826,7 @@ impl Coordinator {
             validity: _,
             plan,
             max_query_result_size,
+            max_query_heap_size,
             id_bundle,
             target_replica,
             source_ids,
@@ -833,9 +850,13 @@ impl Coordinator {
         let session = ctx.session_mut();
         let conn_id = session.conn_id().clone();

-        let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
+        let (mut peek_plan, df_meta, typ) = global_lir_plan.unapply();
         let source_arity = typ.arity();

+        if let peek::PeekPlan::SlowPath(PeekDataflowPlan { desc, .. }) = &mut peek_plan {
+            desc.memory_limit = max_query_heap_size;
+        }
+
         self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);

         let target_cluster = self.catalog().get_cluster(cluster_id);
diff --git c/src/adapter/src/coord/sequencer/inner/subscribe.rs i/src/adapter/src/coord/sequencer/inner/subscribe.rs
index 26274f2c51..67f6a845ef 100644
--- c/src/adapter/src/coord/sequencer/inner/subscribe.rs
+++ i/src/adapter/src/coord/sequencer/inner/subscribe.rs
@@ -355,7 +355,9 @@ impl Coordinator {
         };
         active_subscribe.initialize();

-        let (df_desc, df_meta) = global_lir_plan.unapply();
+        let (mut df_desc, df_meta) = global_lir_plan.unapply();
+
+        df_desc.memory_limit = ctx.session().vars().max_query_heap_size();

         // Emit notices.
         self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
diff --git c/src/compute-client/src/as_of_selection.rs i/src/compute-client/src/as_of_selection.rs
index 8603d1b9fd..d12aabc7ae 100644
--- c/src/compute-client/src/as_of_selection.rs
+++ i/src/compute-client/src/as_of_selection.rs
@@ -1103,6 +1103,7 @@ mod tests {
             refresh_schedule: Default::default(),
             debug_name: Default::default(),
             time_dependence: None,
+            memory_limit: None,
         }
     }

diff --git c/src/compute-client/src/controller/instance.rs i/src/compute-client/src/controller/instance.rs
index d5a436cb56..67bb609456 100644
--- c/src/compute-client/src/controller/instance.rs
+++ i/src/compute-client/src/controller/instance.rs
@@ -65,8 +65,8 @@ use crate::protocol::command::{
 };
 use crate::protocol::history::ComputeCommandHistory;
 use crate::protocol::response::{
-    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
-    StatusResponse, SubscribeBatch, SubscribeResponse,
+    ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse,
+    OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeBatch, SubscribeResponse,
 };
 use crate::service::{ComputeClient, ComputeGrpcClient};

@@ -1352,6 +1352,7 @@ where
             refresh_schedule: dataflow.refresh_schedule,
             debug_name: dataflow.debug_name,
             time_dependence: dataflow.time_dependence,
+            memory_limit: dataflow.memory_limit,
         };

         if augmented_dataflow.is_transient() {
@@ -2007,6 +2008,43 @@ where
             StatusResponse::OperatorHydration(status) => {
                 self.update_operator_hydration_status(replica_id, status)
             }
+            StatusResponse::DataflowLimitExceeded(status) => {
+                self.update_dataflow_limit_status(replica_id, status)
+            }
+        }
+    }
+
+    /// Update the tracked hydration status for an operator according to a received status update.
+    fn update_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
+        tracing::warn!(
+            "Dataflow limit exceeded on replica {}: {:?}",
+            replica_id,
+            status
+        );
+        if let Some(subscribe) = self.subscribes.get(&status.collection_id) {
+            self.deliver_response(ComputeControllerResponse::SubscribeResponse(
+                status.collection_id,
+                SubscribeBatch {
+                    lower: subscribe.frontier.clone(),
+                    upper: subscribe.frontier.clone(),
+                    updates: Err("Dataflow limit exceeded".to_string()),
+                },
+            ))
+        } else {
+            // Look for a matching peek
+            let mut peek_uuid = None;
+            for (uuid, peek) in self.peeks.iter() {
+                if peek.read_hold.id() == status.collection_id {
+                    peek_uuid = Some(*uuid);
+                    break;
+                }
+            }
+            if let Some(uuid) = peek_uuid {
+                self.cancel_peek(
+                    uuid,
+                    PeekResponse::Error("Dataflow limit exceeded".to_string()),
+                );
+            }
         }
     }

diff --git c/src/compute-client/src/protocol/response.proto i/src/compute-client/src/protocol/response.proto
index cd73d62bc2..cf678574c2 100644
--- c/src/compute-client/src/protocol/response.proto
+++ i/src/compute-client/src/protocol/response.proto
@@ -107,6 +107,7 @@ message ProtoSubscribeBatch {
 message ProtoStatusResponse {
   oneof kind {
     ProtoOperatorHydrationStatus operator_hydration = 1;
+    ProtoDataflowLimitStatus dataflow_limit_status = 2;
   }
 }

@@ -116,3 +117,7 @@ message ProtoOperatorHydrationStatus {
   uint64 worker_id = 3;
   bool hydrated = 4;
 }
+
+message ProtoDataflowLimitStatus {
+  mz_repr.global_id.ProtoGlobalId collection_id = 1;
+}
diff --git c/src/compute-client/src/protocol/response.rs i/src/compute-client/src/protocol/response.rs
index b08ea18171..15cac40797 100644
--- c/src/compute-client/src/protocol/response.rs
+++ i/src/compute-client/src/protocol/response.rs
@@ -589,6 +589,8 @@ impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
 pub enum StatusResponse {
     /// Reports the hydration status of dataflow operators.
     OperatorHydration(OperatorHydrationStatus),
+    /// Reports limit violations for dataflows.
+    DataflowLimitExceeded(DataflowLimitStatus),
 }

 impl RustType<ProtoStatusResponse> for StatusResponse {
@@ -597,6 +599,7 @@ impl RustType<ProtoStatusResponse> for StatusResponse {

         let kind = match self {
             Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
+            Self::DataflowLimitExceeded(status) => Kind::DataflowLimitStatus(status.into_proto()),
         };
         ProtoStatusResponse { kind: Some(kind) }
     }
@@ -608,6 +611,9 @@ impl RustType<ProtoStatusResponse> for StatusResponse {
             Some(Kind::OperatorHydration(status)) => {
                 Ok(Self::OperatorHydration(status.into_rust()?))
             }
+            Some(Kind::DataflowLimitStatus(status)) => {
+                Ok(Self::DataflowLimitExceeded(status.into_rust()?))
+            }
             None => Err(TryFromProtoError::missing_field(
                 "ProtoStatusResponse::kind",
             )),
@@ -650,6 +656,29 @@ impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
     }
 }

+/// A dataflow exceeded some limit.
+#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
+pub struct DataflowLimitStatus {
+    /// The ID of the compute collection exported by the dataflow.
+    pub collection_id: GlobalId,
+}
+
+impl RustType<ProtoDataflowLimitStatus> for DataflowLimitStatus {
+    fn into_proto(&self) -> ProtoDataflowLimitStatus {
+        ProtoDataflowLimitStatus {
+            collection_id: Some(self.collection_id.into_proto()),
+        }
+    }
+
+    fn from_proto(proto: ProtoDataflowLimitStatus) -> Result<Self, TryFromProtoError> {
+        Ok(Self {
+            collection_id: proto
+                .collection_id
+                .into_rust_if_some("ProtoDataflowLimitStatus::collection_id")?,
+        })
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use mz_ore::assert_ok;
diff --git c/src/compute-types/src/dataflows.proto i/src/compute-types/src/dataflows.proto
index 08f49d16df..68f8d0b00b 100644
--- c/src/compute-types/src/dataflows.proto
+++ i/src/compute-types/src/dataflows.proto
@@ -61,6 +61,8 @@ message ProtoDataflowDescription {
   optional mz_storage_types.time_dependence.ProtoTimeDependence time_dependence = 11;

   string debug_name = 8;
+
+  optional uint64 memory_limit = 12;
 }

 message ProtoIndexDesc {
diff --git c/src/compute-types/src/dataflows.rs i/src/compute-types/src/dataflows.rs
index d77494a925..18ca55f26e 100644
--- c/src/compute-types/src/dataflows.rs
+++ i/src/compute-types/src/dataflows.rs
@@ -75,6 +75,8 @@ pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
     pub debug_name: String,
     /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
     pub time_dependence: Option<TimeDependence>,
+    /// Optional heap size limit for this dataflow, in bytes.
+    pub memory_limit: Option<u64>,
 }

 impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
@@ -147,6 +149,7 @@ impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
             refresh_schedule: None,
             debug_name: name,
             time_dependence: None,
+            memory_limit: None,
         }
     }

@@ -562,6 +565,7 @@ where
             refresh_schedule: self.refresh_schedule.clone(),
             debug_name: self.debug_name.clone(),
             time_dependence: self.time_dependence.clone(),
+            memory_limit: self.memory_limit,
         }
     }
 }
@@ -580,6 +584,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, Coll
             refresh_schedule: self.refresh_schedule.into_proto(),
             debug_name: self.debug_name.clone(),
             time_dependence: self.time_dependence.into_proto(),
+            memory_limit: self.memory_limit.into_proto(),
         }
     }

@@ -603,6 +608,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, Coll
             refresh_schedule: proto.refresh_schedule.into_rust()?,
             debug_name: proto.debug_name,
             time_dependence: proto.time_dependence.into_rust()?,
+            memory_limit: proto.memory_limit,
         })
     }
 }
@@ -765,7 +771,7 @@ where
             any::<RefreshSchedule>(),
             proptest::string::string_regex(".*").unwrap(),
         ),
-        any::<Option<TimeDependence>>(),
+        (any::<Option<TimeDependence>>(), any::<Option<u64>>()),
     )
         .prop_map(
             |(
@@ -783,7 +789,7 @@ where
                     refresh_schedule,
                     debug_name,
                 ),
-                time_dependence,
+                (time_dependence, memory_limit),
             )| DataflowDescription {
                 source_imports: BTreeMap::from_iter(source_imports),
                 index_imports: BTreeMap::from_iter(index_imports),
@@ -808,6 +814,7 @@ where
                 },
                 debug_name,
                 time_dependence,
+                memory_limit,
             },
         )
 }
diff --git c/src/compute-types/src/plan/lowering.rs i/src/compute-types/src/plan/lowering.rs
index 7b9d2d1a69..fdaa363aea 100644
--- c/src/compute-types/src/plan/lowering.rs
+++ i/src/compute-types/src/plan/lowering.rs
@@ -120,6 +120,7 @@ impl Context {
             refresh_schedule: desc.refresh_schedule,
             debug_name: desc.debug_name,
             time_dependence: desc.time_dependence,
+            memory_limit: desc.memory_limit,
         })
     }

diff --git c/src/compute/src/compute_state.rs i/src/compute/src/compute_state.rs
index 8ac78545e6..6745e7e769 100644
--- c/src/compute/src/compute_state.rs
+++ i/src/compute/src/compute_state.rs
@@ -25,8 +25,8 @@ use mz_compute_client::protocol::command::{
 };
 use mz_compute_client::protocol::history::ComputeCommandHistory;
 use mz_compute_client::protocol::response::{
-    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
-    StatusResponse, SubscribeResponse,
+    ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse,
+    OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeResponse,
 };
 use mz_compute_types::dataflows::DataflowDescription;
 use mz_compute_types::plan::render_plan::RenderPlan;
@@ -65,7 +65,7 @@ use uuid::Uuid;
 use crate::arrangement::manager::{TraceBundle, TraceManager};
 use crate::logging;
 use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
-use crate::logging::initialize::LoggingTraces;
+use crate::logging::{LoggingHandles, LoggingTraces};
 use crate::metrics::{CollectionMetrics, WorkerMetrics};
 use crate::render::{LinearJoinSpec, StartSignal};
 use crate::server::{ComputeInstanceContext, ResponseSender};
@@ -74,7 +74,7 @@ use crate::server::{ComputeInstanceContext, ResponseSender};
 ///
 /// This state is restricted to the COMPUTE state, the deterministic, idempotent work
 /// done between data ingress and egress.
-pub struct ComputeState {
+pub(crate) struct ComputeState {
     /// State kept for each installed compute collection.
     ///
     /// Each collection has exactly one frontier.
@@ -176,6 +176,14 @@ pub struct ComputeState {
     /// replica can drop diffs associated with timestamps beyond the replica expiration.
     /// The replica will panic if such dataflows are not dropped before the replica has expired.
     pub replica_expiration: Antichain<Timestamp>,
+
+    /// The interval for rounding logging timestamps.
+    logging_interval: Duration,
+
+    /// The handles to inject updates for logging dataflows.
+    pub logging_handles: Option<LoggingHandles>,
+    /// Shared set of collections exceeding their heap size limit.
+    pub dataflows_exceeding_heap_size_limit: Rc<RefCell<BTreeSet<usize>>>,
 }

 impl ComputeState {
@@ -220,21 +228,12 @@ impl ComputeState {
             server_maintenance_interval: Duration::ZERO,
             init_system_time: mz_ore::now::SYSTEM_TIME(),
             replica_expiration: Antichain::default(),
+            logging_interval: Duration::ZERO,
+            logging_handles: None,
+            dataflows_exceeding_heap_size_limit: Rc::default(),
         }
     }

-    /// Return whether a collection with the given ID exists.
-    pub fn collection_exists(&self, id: GlobalId) -> bool {
-        self.collections.contains_key(&id)
-    }
-
-    /// Return a reference to the identified collection.
-    ///
-    /// Panics if the collection doesn't exist.
-    pub fn expect_collection(&self, id: GlobalId) -> &CollectionState {
-        self.collections.get(&id).expect("collection must exist")
-    }
-
     /// Return a mutable reference to the identified collection.
     ///
     /// Panics if the collection doesn't exist.
@@ -487,6 +486,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
                 plan_until = ?dataflow.until.elements(),
                 until = ?until.elements(),
+                memory_limit = ?dataflow.memory_limit,
                 "creating dataflow",
             );
         } else {
@@ -500,6 +500,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
                 plan_until = ?dataflow.until.elements(),
                 until = ?until.elements(),
+                memory_limit = ?dataflow.memory_limit,
                 "creating dataflow",
             );
         };
@@ -509,11 +510,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
             .chain(dataflow.copy_to_ids())
             .collect();

+        let dataflow_id = self.timely_worker.next_dataflow_index();
+
         // Initialize compute and logging state for each object.
         for object_id in dataflow.export_ids() {
             let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
             let metrics = self.compute_state.metrics.for_collection(object_id);
-            let mut collection = CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics);
+            let mut collection =
+                CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics, dataflow_id);

             if let Some(logger) = self.compute_state.compute_logger.clone() {
                 let logging = CollectionLogging::new(
@@ -545,6 +549,13 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 .insert(id, Rc::clone(&suspension_token));
         }

+        if let (Some(limit), true) = (dataflow.memory_limit, dataflow.is_transient()) {
+            for id in dataflow.export_ids() {
+                // TODO: Make configurable.
+                self.handle_set_heap_size_limit(id, Some(limit));
+            }
+        }
+
         crate::render::build_compute_dataflow(
             self.timely_worker,
             self.compute_state,
@@ -576,6 +587,35 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         }
     }

+    fn handle_set_heap_size_limit(&mut self, id: GlobalId, limit: Option<u64>) {
+        let collection = self
+            .compute_state
+            .collections
+            .get_mut(&id)
+            .expect("collection must exist");
+        let old_limit = collection.heap_size_limit;
+        collection.heap_size_limit = limit;
+
+        if let Some(handles) = self.compute_state.logging_handles.as_mut() {
+            let handle = &mut handles.heap_size_limits_handle;
+            let time = *handle.time();
+            if let Some(old_limit) = old_limit {
+                handle.send((
+                    (collection.dataflow_id, old_limit),
+                    time,
+                    -Diff::try_from(old_limit).expect("must fit"),
+                ));
+            }
+            if let Some(limit) = limit {
+                handle.send((
+                    (collection.dataflow_id, limit),
+                    time,
+                    Diff::try_from(limit).expect("must fit"),
+                ));
+            }
+        }
+    }
+
     #[mz_ore::instrument(level = "debug")]
     fn handle_peek(&mut self, peek: Peek) {
         let pending = match &peek.target {
@@ -644,6 +684,19 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
             is_subscribe_or_copy: collection.is_subscribe_or_copy,
         };
         self.compute_state.dropped_collections.push((id, dropped));
+
+        if let (Some(handle), Some(limit)) = (
+            self.compute_state.logging_handles.as_mut(),
+            collection.heap_size_limit,
+        ) {
+            let handle = &mut handle.heap_size_limits_handle;
+            let time = *handle.time();
+            handle.send((
+                (collection.dataflow_id, limit),
+                time,
+                -Diff::try_from(limit).expect("must fit"),
+            ));
+        }
     }

     /// Initializes timely dataflow logging and publishes as a view.
@@ -656,7 +709,12 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
             traces,
             dataflow_index,
             compute_logger: logger,
-        } = logging::initialize(self.timely_worker, &config);
+            logging_handles,
+        } = logging::initialize(
+            self.timely_worker,
+            &config,
+            Rc::clone(&self.compute_state.dataflows_exceeding_heap_size_limit),
+        );

         let mut log_index_ids = config.index_logs;
         for (log, trace) in traces {
@@ -670,7 +728,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
             let is_subscribe_or_copy = false;
             let as_of = Antichain::from_elem(Timestamp::MIN);
             let metrics = self.compute_state.metrics.for_collection(id);
-            let mut collection = CollectionState::new(is_subscribe_or_copy, as_of, metrics);
+            let mut collection =
+                CollectionState::new(is_subscribe_or_copy, as_of, metrics, dataflow_index);

             let logging =
                 CollectionLogging::new(id, logger.clone(), dataflow_index, std::iter::empty());
@@ -692,6 +751,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         );

         self.compute_state.compute_logger = Some(logger);
+        self.compute_state.logging_handles = Some(logging_handles);
+        self.compute_state.logging_interval = config.interval;
     }

     /// Send progress information to the controller.
@@ -950,6 +1011,35 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         }
     }

+    pub fn process_limits(&self) {
+        for dataflow_id in std::mem::take(
+            &mut *self
+                .compute_state
+                .dataflows_exceeding_heap_size_limit
+                .borrow_mut(),
+        ) {
+            for (&collection_id, state) in self.compute_state.collections.iter() {
+                if state.dataflow_id == dataflow_id {
+                    let status = DataflowLimitStatus { collection_id };
+                    self.send_compute_response(ComputeResponse::Status(
+                        StatusResponse::DataflowLimitExceeded(status),
+                    ));
+                }
+            }
+        }
+    }
+
+    pub fn advance_handles(&mut self) {
+        let interval_ms = std::cmp::max(1, self.compute_state.logging_interval.as_millis());
+        let interval_ms = u64::try_from(interval_ms).expect("must fit");
+        let now = mz_ore::now::SYSTEM_TIME();
+        let time_ms = ((now / interval_ms) + 1) * interval_ms;
+        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
+        if let Some(handles) = self.compute_state.logging_handles.as_mut() {
+            handles.advance_to(new_time_ms);
+        }
+    }
+
     /// Send a response to the coordinator.
     fn send_compute_response(&self, response: ComputeResponse) {
         // Ignore send errors because the coordinator is free to ignore our
@@ -1637,6 +1727,10 @@ pub struct CollectionState {
     logging: Option<CollectionLogging>,
     /// Metrics tracked for this collection.
     metrics: CollectionMetrics,
+    /// The dataflow ID of the dataflow that hosts this collection.
+    pub dataflow_id: usize,
+    /// The heap size limit for this dataflow, in bytes.
+    pub heap_size_limit: Option<u64>,
 }

 impl CollectionState {
@@ -1644,6 +1738,7 @@ impl CollectionState {
         is_subscribe_or_copy: bool,
         as_of: Antichain<Timestamp>,
         metrics: CollectionMetrics,
+        dataflow_id: usize,
     ) -> Self {
         Self {
             reported_frontiers: ReportedFrontiers::new(),
@@ -1655,6 +1750,8 @@ impl CollectionState {
             compute_probe: None,
             logging: None,
             metrics,
+            dataflow_id,
+            heap_size_limit: None,
         }
     }

diff --git c/src/compute/src/logging.rs i/src/compute/src/logging.rs
index f43998216e..432c68069e 100644
--- c/src/compute/src/logging.rs
+++ i/src/compute/src/logging.rs
@@ -14,6 +14,7 @@ mod differential;
 pub(super) mod initialize;
 mod reachability;
 mod timely;
+mod watchdog;

 use std::any::Any;
 use std::collections::BTreeMap;
@@ -26,6 +27,7 @@ use ::timely::dataflow::channels::pact::Pipeline;
 use ::timely::dataflow::channels::pushers::buffer::Session;
 use ::timely::dataflow::channels::pushers::{Counter, Tee};
 use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
+use ::timely::dataflow::operators::input::Handle;
 use ::timely::dataflow::operators::Operator;
 use ::timely::dataflow::StreamCore;
 use ::timely::progress::Timestamp as TimelyTimestamp;
@@ -39,6 +41,7 @@ use mz_timely_util::activator::RcActivator;
 use mz_timely_util::containers::ColumnBuilder;
 use mz_timely_util::operator::consolidate_pact;

+use crate::arrangement::manager::TraceBundle;
 use crate::typedefs::RowRowAgent;

 pub use crate::logging::initialize::initialize;
@@ -156,6 +159,29 @@ struct SharedLoggingState {
     arrangement_size_activators: BTreeMap<usize, Activator>,
 }

+pub(crate) struct LoggingTraces {
+    /// Exported traces, by log variant.
+    pub traces: BTreeMap<LogVariant, TraceBundle>,
+    /// The index of the dataflow that exports the traces.
+    pub dataflow_index: usize,
+    /// The compute logger.
+    pub compute_logger: compute::Logger,
+    /// Handles to logging inputs.
+    pub logging_handles: LoggingHandles,
+}
+
+pub(crate) struct LoggingHandles {
+    /// Handle to provide heap size limits per dataflow.
+    pub heap_size_limits_handle: Handle<Timestamp, Update<(usize, u64)>>,
+}
+
+impl LoggingHandles {
+    /// Advance the time of all handles to `time`.
+    pub(crate) fn advance_to(&mut self, time: Timestamp) {
+        self.heap_size_limits_handle.advance_to(time);
+    }
+}
+
 /// Helper to pack collections of [`Datum`]s into key and value row.
 pub(crate) struct PermutedRowPacker {
     key: Vec<usize>,
diff --git c/src/compute/src/logging/compute.rs i/src/compute/src/logging/compute.rs
index 59aa439c03..d52f08de2e 100644
--- c/src/compute/src/logging/compute.rs
+++ i/src/compute/src/logging/compute.rs
@@ -23,11 +23,13 @@ use mz_compute_types::plan::LirId;
 use mz_ore::cast::CastFrom;
 use mz_repr::{Datum, Diff, GlobalId, Timestamp};
 use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
+use mz_timely_util::operator::Bifurcate;
 use mz_timely_util::replay::MzReplay;
+use timely::container::CapacityContainerBuilder;
 use timely::dataflow::channels::pact::Pipeline;
 use timely::dataflow::operators::core::Map;
 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
-use timely::dataflow::operators::{Concatenate, Enter, Operator};
+use timely::dataflow::operators::{Concatenate, Enter, Leave, Operator};
 use timely::dataflow::{Scope, Stream, StreamCore};
 use timely::scheduling::Scheduler;
 use timely::{Container, Data};
@@ -286,9 +288,11 @@ impl LirMetadata {
 }

 /// The return type of the [`construct`] function.
-pub(super) struct Return {
+pub(super) struct Return<S: Scope> {
     /// Collections returned by [`construct`].
     pub collections: BTreeMap<LogVariant, LogCollection>,
+    /// The arrangement heap size stream for each operator.
+    pub arrangement_heap_size: Stream<S, Update<(usize, ())>>,
 }

 /// Constructs the logging dataflow fragment for compute logs.
@@ -307,7 +311,7 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
     event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
     compute_event_streams: impl IntoIterator<Item = StreamCore<G, Column<(Duration, ComputeEvent)>>>,
     shared_state: Rc<RefCell<SharedLoggingState>>,
-) -> Return {
+) -> Return<G> {
     let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());

     scope.scoped("compute logging", move |scope| {
@@ -478,6 +482,17 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
                 ])
             };

+        let (arrangement_heap_size, arrangement_heap_size_stripped) =
+            arrangement_heap_size.bifurcate::<CapacityContainerBuilder<Vec<_>>>(
+                "arrangement_heap_size",
+                |data, session| {
+                    session.give_iterator(
+                        IntoIterator::into_iter(&*data)
+                            .map(|(data, time, diff)| ((data.operator_id, ()), *time, *diff)),
+                    );
+                },
+            );
+
         let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
         let arrangement_heap_size = arrangement_heap_size
             .as_collection()
@@ -586,7 +601,10 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
             }
         }

-        Return { collections }
+        Return {
+            collections,
+            arrangement_heap_size: arrangement_heap_size_stripped.leave(),
+        }
     })
 }

diff --git c/src/compute/src/logging/differential.rs i/src/compute/src/logging/differential.rs
index e495791828..b84701b100 100644
--- c/src/compute/src/logging/differential.rs
+++ i/src/compute/src/logging/differential.rs
@@ -35,7 +35,7 @@ use crate::extensions::arrange::MzArrangeCore;
 use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
 use crate::logging::{
     consolidate_and_pack, DifferentialLog, EventQueue, LogCollection, LogVariant,
-    OutputSessionColumnar, SharedLoggingState,
+    OutputSessionColumnar, SharedLoggingState, Update,
 };
 use crate::row_spine::RowRowBuilder;
 use crate::typedefs::{KeyBatcher, RowRowSpine};
@@ -46,6 +46,8 @@ pub(super) struct Return<S: Scope> {
     pub collections: BTreeMap<LogVariant, LogCollection>,
     /// Stream of compute events generated by the differential logger.
     pub compute_events: StreamCore<S, Column<(Duration, ComputeEvent)>>,
+    /// The batcher heap size stream for each operator.
+    pub batcher_heap_size: Stream<S, Update<(usize, ())>>,
 }

 /// Constructs the logging dataflow fragment for differential logs.
@@ -154,7 +156,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
         let arrangement_records = stream_to_collection(&records, ArrangementRecords);
         let sharing = stream_to_collection(&sharing, Sharing);
         let batcher_records = stream_to_collection(&batcher_records, BatcherRecords);
-        let batcher_size = stream_to_collection(&batcher_size, BatcherSize);
+        let batcher_size_rows = stream_to_collection(&batcher_size, BatcherSize);
         let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity);
         let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations);

@@ -164,7 +166,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
             (ArrangementRecords, arrangement_records),
             (Sharing, sharing),
             (BatcherRecords, batcher_records),
-            (BatcherSize, batcher_size),
+            (BatcherSize, batcher_size_rows),
             (BatcherCapacity, batcher_capacity),
             (BatcherAllocations, batcher_allocations),
         ];
@@ -188,7 +190,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
             }
         }

-        Return { collections, compute_events: compute_events.leave(), }
+        Return { collections, compute_events: compute_events.leave(), batcher_heap_size: batcher_size.leave() }
     })
 }

diff --git c/src/compute/src/logging/initialize.rs i/src/compute/src/logging/initialize.rs
index e9ddc73f80..bb071e8038 100644
--- c/src/compute/src/logging/initialize.rs
+++ i/src/compute/src/logging/initialize.rs
@@ -6,7 +6,7 @@
 //! Initialization of logging dataflows.

 use std::cell::RefCell;
-use std::collections::BTreeMap;
+use std::collections::{BTreeMap, BTreeSet};
 use std::rc::Rc;
 use std::time::{Duration, Instant};

@@ -21,6 +21,7 @@ use mz_timely_util::containers::{Column, ColumnBuilder};
 use mz_timely_util::operator::CollectionExt;
 use timely::communication::Allocate;
 use timely::container::{ContainerBuilder, PushInto};
+use timely::dataflow::operators::Input;
 use timely::dataflow::Scope;
 use timely::logging::{TimelyEvent, TimelyEventBuilder};
 use timely::logging_core::{Logger, Registry};
@@ -40,7 +41,8 @@ use crate::typedefs::{ErrBatcher, ErrBuilder};
 pub fn initialize<A: Allocate + 'static>(
     worker: &mut timely::worker::Worker<A>,
     config: &LoggingConfig,
-) -> LoggingTraces {
+    dataflows_exceeding_heap_size_limit: Rc<RefCell<BTreeSet<usize>>>,
+) -> super::LoggingTraces {
     let interval_ms = std::cmp::max(1, config.interval.as_millis());

     // Track time relative to the Unix epoch, rather than when the server
@@ -62,25 +64,27 @@ pub fn initialize<A: Allocate + 'static>(
         d_event_queue: EventQueue::new("d"),
         c_event_queue: EventQueue::new("c"),
         shared_state: Default::default(),
+        dataflows_exceeding_heap_size_limit,
     };

     // Depending on whether we should log the creation of the logging dataflows, we register the
     // loggers with timely either before or after creating them.
     let dataflow_index = context.worker.next_dataflow_index();
-    let traces = if config.log_logging {
+    let (traces, handles) = if config.log_logging {
         context.register_loggers();
         context.construct_dataflow()
     } else {
-        let traces = context.construct_dataflow();
+        let traces_and_handles = context.construct_dataflow();
         context.register_loggers();
-        traces
+        traces_and_handles
     };

     let compute_logger = worker.log_register().get("materialize/compute").unwrap();
-    LoggingTraces {
+    super::LoggingTraces {
         traces,
         dataflow_index,
         compute_logger,
+        logging_handles: handles,
     }
 }

@@ -97,25 +101,18 @@ struct LoggingContext<'a, A: Allocate> {
     d_event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
     c_event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
     shared_state: Rc<RefCell<SharedLoggingState>>,
-}
-
-pub(crate) struct LoggingTraces {
-    /// Exported traces, by log variant.
-    pub traces: BTreeMap<LogVariant, TraceBundle>,
-    /// The index of the dataflow that exports the traces.
-    pub dataflow_index: usize,
-    /// The compute logger.
-    pub compute_logger: super::compute::Logger,
+    dataflows_exceeding_heap_size_limit: Rc<RefCell<BTreeSet<usize>>>,
 }

 impl<A: Allocate + 'static> LoggingContext<'_, A> {
-    fn construct_dataflow(&mut self) -> BTreeMap<LogVariant, TraceBundle> {
+    fn construct_dataflow(&mut self) -> (BTreeMap<LogVariant, TraceBundle>, super::LoggingHandles) {
         self.worker.dataflow_named("Dataflow: logging", |scope| {
             let mut collections = BTreeMap::new();

             let super::timely::Return {
                 collections: timely_collections,
                 compute_events: compute_events_timely,
+                operator_to_dataflow,
             } = super::timely::construct(scope.clone(), self.config, self.t_event_queue.clone());
             collections.extend(timely_collections);

@@ -131,6 +128,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
             let super::differential::Return {
                 collections: differential_collections,
                 compute_events: compute_events_differential,
+                batcher_heap_size,
             } = super::differential::construct(
                 scope.clone(),
                 self.config,
@@ -141,6 +139,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {

             let super::compute::Return {
                 collections: compute_collections,
+                arrangement_heap_size,
             } = super::compute::construct(
                 scope.clone(),
                 scope.parent.clone(),
@@ -151,6 +150,19 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
             );
             collections.extend(compute_collections);

+            let (heap_size_limits_handle, heap_size_limits) = scope.new_input();
+
+            let watchdog_streams = super::watchdog::Streams {
+                arrangement_heap_size,
+                batcher_heap_size,
+                operator_to_dataflow,
+                heap_size_limits,
+                dataflows_exceeding_heap_size_limit: Rc::clone(
+                    &self.dataflows_exceeding_heap_size_limit,
+                ),
+            };
+            super::watchdog::construct(scope.clone(), watchdog_streams);
+
             let errs = scope.scoped("logging errors", |scope| {
                 let collection: KeyCollection<_, DataflowError, Diff> =
                     Collection::empty(scope).into();
@@ -167,7 +179,11 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
                     (log, bundle)
                 })
                 .collect();
-            traces
+
+            let handles = super::LoggingHandles {
+                heap_size_limits_handle,
+            };
+            (traces, handles)
         })
     }

diff --git c/src/compute/src/logging/timely.rs i/src/compute/src/logging/timely.rs
index 671a54a52c..feea6e56ee 100644
--- c/src/compute/src/logging/timely.rs
+++ i/src/compute/src/logging/timely.rs
@@ -20,14 +20,16 @@ use mz_repr::{Datum, Diff, Timestamp};
 use mz_timely_util::containers::{
     columnar_exchange, Col2ValBatcher, Column, ColumnBuilder, ProvidedBuilder,
 };
+use mz_timely_util::operator::Bifurcate;
 use mz_timely_util::replay::MzReplay;
 use timely::container::columnation::{Columnation, CopyRegion};
+use timely::container::CapacityContainerBuilder;
 use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
 use timely::dataflow::channels::pushers::buffer::Session;
 use timely::dataflow::channels::pushers::{Counter, Tee};
 use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
 use timely::dataflow::operators::Leave;
-use timely::dataflow::{Scope, StreamCore};
+use timely::dataflow::{Scope, Stream, StreamCore};
 use timely::logging::{
     ChannelsEvent, MessagesEvent, OperatesEvent, ParkEvent, ScheduleEvent, ShutdownEvent,
     TimelyEvent,
@@ -37,7 +39,7 @@ use tracing::error;

 use crate::extensions::arrange::MzArrangeCore;
 use crate::logging::compute::{ComputeEvent, DataflowShutdown};
-use crate::logging::{consolidate_and_pack, LogCollection, OutputSessionColumnar};
+use crate::logging::{consolidate_and_pack, LogCollection, OutputSessionColumnar, Update};
 use crate::logging::{EventQueue, LogVariant, TimelyLog};
 use crate::row_spine::RowRowBuilder;
 use crate::typedefs::{KeyBatcher, KeyValBatcher, RowRowSpine};
@@ -48,6 +50,7 @@ pub(super) struct Return<S: Scope> {
     pub collections: BTreeMap<LogVariant, LogCollection>,
     /// Stream of compute events generated by Timely logging.
     pub compute_events: StreamCore<S, Column<(Duration, ComputeEvent)>>,
+    pub operator_to_dataflow: Stream<S, Update<(usize, usize)>>,
 }

 /// Constructs the logging dataflow fragment for timely logs.
@@ -183,6 +186,12 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
             },
         );

+        let (addresses, operator_to_dataflow) = addresses.bifurcate::<CapacityContainerBuilder<Vec<_>>>("addresses", |data, output| {
+            output.give_iterator(IntoIterator::into_iter(data).map(|((op, addr), time, diff)| {
+                ((*op, addr[0]), *time, *diff)
+            }));
+        });
+
         let addresses = consolidate_and_pack::<_, KeyValBatcher<_, _, _, _>, ColumnBuilder<_>, _, _>(
             &addresses,
             TimelyLog::Addresses,
@@ -328,7 +337,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
             }
         }

-        Return { collections, compute_events: compute_events.leave(), }
+        Return { collections, compute_events: compute_events.leave(), operator_to_dataflow: operator_to_dataflow.leave() }
     })
 }

diff --git c/src/compute/src/logging/watchdog.rs i/src/compute/src/logging/watchdog.rs
new file mode 100644
index 0000000000..283622b8ef
--- /dev/null
+++ i/src/compute/src/logging/watchdog.rs
@@ -0,0 +1,112 @@
+// Copyright Materialize, Inc. and contributors. All rights reserved.
+//
+// Use of this software is governed by the Business Source License
+// included in the LICENSE file.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0.
+
+//! Logging dataflows for watchdog events generated by clusterd.
+
+use crate::extensions::arrange::MzArrange;
+use crate::logging::Update;
+use crate::typedefs::spines::{ColKeyBuilder, ColValBatcher, ColValBuilder};
+use crate::typedefs::{KeyBatcher, KeySpine, KeyValSpine};
+use differential_dataflow::operators::CountTotal;
+use differential_dataflow::AsCollection;
+use std::cell::RefCell;
+use std::collections::BTreeSet;
+use std::rc::Rc;
+use timely::dataflow::operators::Concat;
+use timely::dataflow::{Scope, Stream};
+
+pub(super) struct Streams<S: Scope> {
+    /// operator id, diff in bytes
+    pub(super) arrangement_heap_size: Stream<S, Update<(usize, ())>>,
+    /// operator id, diff in bytes
+    pub(super) batcher_heap_size: Stream<S, Update<(usize, ())>>,
+    /// Operator id to dataflow id
+    pub(super) operator_to_dataflow: Stream<S, Update<(usize, usize)>>,
+    /// Dataflow -> limit in bytes
+    pub(super) heap_size_limits: Stream<S, Update<(usize, u64)>>,
+    pub(super) dataflows_exceeding_heap_size_limit: Rc<RefCell<BTreeSet<usize>>>,
+}
+
+pub(super) fn construct<S: Scope<Timestamp = mz_repr::Timestamp>>(
+    _scope: S,
+    streams: Streams<S>,
+) -> () {
+    let Streams {
+        arrangement_heap_size,
+        batcher_heap_size,
+        operator_to_dataflow,
+        heap_size_limits,
+        dataflows_exceeding_heap_size_limit,
+    } = streams;
+
+    // The following dataflow computes:
+    // ```
+    // arrangement_heap_size    batcher_heap_size  operator_to_dataflow   heap_size_limits
+    //           |                    |                    |                    |
+    //           |----->concat<------ |                 arrange              arrange
+    //                    |                                |                    |
+    //               arrange: op_to_heap_size              |                    |
+    //                    |                                |                    |
+    //                    |-->join: dataflow_to_heap_size<-|                    |
+    //                                    |                                     |
+    //                                arrange                                   |
+    //                                    |                                     |
+    //                               count_total_core                           |
+    //                                    |                                     |
+    //                                arrange                                   |
+    //                                    |                                     |
+    //                                    |------------>join<-------------------|
+    //                                                    |
+    //                                                result
+    // ```
+
+    let operator_to_heap_size = arrangement_heap_size.concat(&batcher_heap_size);
+    let operator_to_heap_size = operator_to_heap_size
+        .as_collection()
+        .mz_arrange::<KeyBatcher<_, _, _>, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>(
+            "operator_to_heap_size",
+        );
+
+    let operator_to_dataflow = operator_to_dataflow
+        .as_collection()
+        .mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>(
+            "operator_to_dataflow",
+        );
+
+    let dataflow_to_heap_size = operator_to_heap_size
+        .join_core(&operator_to_dataflow, |_op, (), dataflow| {
+            Some((*dataflow, ()))
+        });
+
+    let heap_size_limits = heap_size_limits
+        .as_collection()
+        .mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>("heap_size_limits");
+
+    let dataflow_to_heap_size_limit =  dataflow_to_heap_size
+        .mz_arrange::<KeyBatcher<_, _, _>, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>(
+            "dataflow_to_heap_size",
+        )
+        .count_total_core::<mz_repr::Diff>()
+        .mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>(
+            "dataflow_to_heap_size_count",
+        )
+        .join_core(&heap_size_limits, |dataflow, size, limit| {
+            if *size >= (*limit).try_into().expect("must fit") {
+                Some(*dataflow)
+            } else {
+                None
+            }
+        });
+
+    dataflow_to_heap_size_limit.inspect(move |(dataflow_id, _, _)| {
+        dataflows_exceeding_heap_size_limit
+            .borrow_mut()
+            .insert(*dataflow_id);
+    });
+}
diff --git c/src/compute/src/server.rs i/src/compute/src/server.rs
index 4de993dae0..fd6bc3d58e 100644
--- c/src/compute/src/server.rs
+++ i/src/compute/src/server.rs
@@ -333,6 +333,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> {
                         initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
                         refresh_schedule: dataflow.refresh_schedule.clone(),
                         time_dependence: dataflow.time_dependence.clone(),
+                        memory_limit: dataflow.memory_limit,
                     })
                     .map(ComputeCommand::CreateDataflow)
                     .collect()
@@ -514,6 +515,8 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> {
                 compute_state.process_peeks();
                 compute_state.process_subscribes();
                 compute_state.process_copy_tos();
+                compute_state.process_limits();
+                compute_state.advance_handles();
             }
         }
     }
diff --git c/src/ore/Cargo.toml i/src/ore/Cargo.toml
index 9ba9e0a552..391877d7b7 100644
--- c/src/ore/Cargo.toml
+++ i/src/ore/Cargo.toml
@@ -165,7 +165,7 @@ harness = false
 [[bench]]
 name = "bytes"
 harness = false
-required-features = ["bytes_", "region", "tracing_", "lgalloc", "derivative"]
+required-features = ["bytes_", "region", "tracing_", "derivative"]

 [package.metadata.cargo-udeps.ignore]
 normal = ["workspace-hack"]
diff --git c/src/sql/src/session/vars.rs i/src/sql/src/session/vars.rs
index a8b3f8b3fb..f828f088fb 100644
--- c/src/sql/src/session/vars.rs
+++ i/src/sql/src/session/vars.rs
@@ -841,6 +841,13 @@ impl SessionVars {
             .as_bytes()
     }

+    /// Returns the value of the `max_query_heap_size` configuration parameter.
+    pub fn max_query_heap_size(&self) -> Option<u64> {
+        self.expect_value::<Option<ByteSize>>(&MAX_QUERY_HEAP_SIZE)
+            .as_ref()
+            .map(ByteSize::as_bytes)
+    }
+
     /// Sets the external metadata associated with the user.
     pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
         self.user.external_metadata = Some(metadata);
@@ -1095,6 +1102,7 @@ impl SystemVars {
                 &TIMEZONE,
                 &TRANSACTION_ISOLATION,
                 &MAX_QUERY_RESULT_SIZE,
+                &MAX_QUERY_HEAP_SIZE,
             ]
             .into_iter()
             .map(|var| (UncasedStr::new(var.name()), var))
diff --git c/src/sql/src/session/vars/definitions.rs i/src/sql/src/session/vars/definitions.rs
index 357730b336..ae6a1d03f7 100644
--- c/src/sql/src/session/vars/definitions.rs
+++ i/src/sql/src/session/vars/definitions.rs
@@ -577,6 +577,13 @@ pub static MAX_QUERY_RESULT_SIZE: VarDefinition = VarDefinition::new(
     true,
 );

+pub static MAX_QUERY_HEAP_SIZE: VarDefinition = VarDefinition::new(
+    "max_query_heap_size",
+    value!(Option<ByteSize>; None),
+    "The maximum size in bytes for a single query's dataflow (Materialize).",
+    true,
+);
+
 pub static MAX_COPY_FROM_SIZE: VarDefinition = VarDefinition::new(
     "max_copy_from_size",
     // 1 GiB, this limit is noted in the docs, if you change it make sure to update our docs.
diff --git c/src/timely-util/src/operator.rs i/src/timely-util/src/operator.rs
index 8763e95937..f82033be41 100644
--- c/src/timely-util/src/operator.rs
+++ i/src/timely-util/src/operator.rs
@@ -928,3 +928,67 @@ where
         std::mem::take(chain)
     }
 }
+
+pub use bifurcate::Bifurcate;
+
+mod bifurcate {
+    use timely::container::ContainerBuilder;
+    use timely::dataflow::channels::pact::Pipeline;
+    use timely::dataflow::channels::pushers::buffer::Session;
+    use timely::dataflow::channels::pushers::{Counter, Tee};
+    use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
+    use timely::dataflow::{Scope, StreamCore};
+    use timely::Container;
+
+    use crate::containers::ProvidedBuilder;
+
+    pub trait Bifurcate<G: Scope, C>: Sized {
+        fn bifurcate<CB: ContainerBuilder>(
+            &self,
+            name: &str,
+            logic: impl FnMut(
+                    &mut C,
+                    &mut Session<
+                        G::Timestamp,
+                        CB,
+                        Counter<G::Timestamp, CB::Container, Tee<G::Timestamp, CB::Container>>,
+                    >,
+                ) + 'static,
+        ) -> (Self, StreamCore<G, CB::Container>);
+    }
+
+    impl<G: Scope, C: Container + Clone + 'static> Bifurcate<G, C> for StreamCore<G, C> {
+        fn bifurcate<CB: ContainerBuilder>(
+            &self,
+            name: &str,
+            mut logic: impl FnMut(
+                    &mut C,
+                    &mut Session<
+                        G::Timestamp,
+                        CB,
+                        Counter<G::Timestamp, CB::Container, Tee<G::Timestamp, CB::Container>>,
+                    >,
+                ) + 'static,
+        ) -> (Self, StreamCore<G, CB::Container>) {
+            let mut builder = OperatorBuilder::new(format!("Bifurcate {name}"), self.scope());
+            let mut input = builder.new_input(self, Pipeline);
+            let (mut self_out, self_stream) = builder.new_output::<ProvidedBuilder<C>>();
+            let (mut other_out, other_stream) = builder.new_output::<CB>();
+
+            builder.build(move |_| {
+                move |_frontiers| {
+                    let mut self_out = self_out.activate();
+                    let mut other_out = other_out.activate();
+                    input.for_each(|time, data| {
+                        let mut session = self_out.session_with_builder(&time);
+                        let mut other = other_out.session_with_builder(&time);
+                        logic(data, &mut other);
+                        session.give_container(data);
+                    });
+                }
+            });
+
+            (self_stream, other_stream)
+        }
+    }
+}
diff --git c/test/sqllogictest/distinct_arrangements.slt i/test/sqllogictest/distinct_arrangements.slt
index c9e3c45b2a..a07b22b0e5 100644
--- c/test/sqllogictest/distinct_arrangements.slt
+++ i/test/sqllogictest/distinct_arrangements.slt
@@ -1099,6 +1099,11 @@ Arrange Timely(MessagesSent)
 Arrange Timely(Operates)
 Arrange Timely(Parks)
 Arrange Timely(Reachability)
+dataflow_to_heap_size
+dataflow_to_heap_size_count
+heap_size_limits
+operator_to_dataflow
+operator_to_heap_size

 statement ok
 COMMIT
diff --git c/test/testdrive/query_heap_size.td i/test/testdrive/query_heap_size.td
new file mode 100644
index 0000000000..aefe95db3c
--- /dev/null
+++ i/test/testdrive/query_heap_size.td
@@ -0,0 +1,55 @@
+# Copyright Materialize, Inc. and contributors. All rights reserved.
+#
+# Use of this software is governed by the Business Source License
+# included in the LICENSE file at the root of this repository.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0.
+
+# Test the `max_query_heap_size` session variable.
+
+$ set-arg-default single-replica-cluster=quickstart
+
+> CREATE SOURCE tpch
+  IN CLUSTER ${arg.single-replica-cluster}
+  FROM LOAD GENERATOR TPCH (SCALE FACTOR 1, UP TO 1000)
+  FOR ALL TABLES;
+
+> CREATE VIEW vq01 AS
+  SELECT
+    l_returnflag,
+    l_linestatus,
+    sum(l_quantity) AS sum_qty,
+    sum(l_extendedprice) AS sum_base_price,
+    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
+    avg(l_quantity) AS avg_qty,
+    avg(l_extendedprice) AS avg_price,
+    avg(l_discount) AS avg_disc,
+    count(*) AS count_order
+  FROM
+    lineitem
+  WHERE
+    l_shipdate <= DATE '1998-12-01' - INTERVAL '60' day
+  GROUP BY
+    l_returnflag,
+    l_linestatus
+  ORDER BY
+    l_returnflag,
+    l_linestatus;
+
+> SET max_query_heap_size = '1kB';
+
+! SELECT * FROM vq01;
+contains:Dataflow limit exceeded
+
+> SET max_query_heap_size = '128MB';
+
+> SELECT * FROM vq01;
+4 values hashing to 87d2cbecfc8cdcd9c9614dfb7e6d236f
+
+> SET max_query_heap_size = '';
+
+> SELECT * FROM vq01;
+4 values hashing to 87d2cbecfc8cdcd9c9614dfb7e6d236f
diff --git c/test/testdrive/session.td i/test/testdrive/session.td
index 860a3c8e9d..c9688cacfb 100644
--- c/test/testdrive/session.td
+++ i/test/testdrive/session.td
@@ -54,6 +54,7 @@ max_network_policies                     25                      "The maximum nu
 max_objects_per_schema                   1000                    "The maximum number of objects in a schema (Materialize)."
 max_postgres_connections                 1000                    "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize)."
 max_query_result_size                    "1GB"                   "The maximum size in bytes for a single query's result (Materialize)."
+max_query_heap_size                      ""                      "The maximum size in bytes for a single query's dataflow (Materialize)."
 max_replicas_per_cluster                 5                       "The maximum number of replicas of a single cluster (Materialize)."
 max_result_size                          "1GB"                   "The maximum size in bytes for an internal query result (Materialize)."
 max_roles                                1000                    "The maximum number of roles in the region (Materialize)."

diff --git c/src/adapter/src/coord.rs i/src/adapter/src/coord.rs
index 84baf4fe35..9236f6f7db 100644
--- c/src/adapter/src/coord.rs
+++ i/src/adapter/src/coord.rs
@@ -467,6 +467,7 @@ pub struct PeekStageLinearizeTimestamp {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -481,6 +482,7 @@ pub struct PeekStageRealTimeRecency {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -496,6 +498,7 @@ pub struct PeekStageTimestampReadHold {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     target_replica: Option<ReplicaId>,
     timeline_context: TimelineContext,
@@ -512,6 +515,7 @@ pub struct PeekStageOptimize {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     source_ids: BTreeSet<GlobalId>,
     id_bundle: CollectionIdBundle,
     target_replica: Option<ReplicaId>,
@@ -527,6 +531,7 @@ pub struct PeekStageFinish {
     validity: PlanValidity,
     plan: mz_sql::plan::SelectPlan,
     max_query_result_size: Option<u64>,
+    max_query_heap_size: Option<u64>,
     id_bundle: CollectionIdBundle,
     target_replica: Option<ReplicaId>,
     source_ids: BTreeSet<GlobalId>,
diff --git c/src/adapter/src/coord/sequencer.rs i/src/adapter/src/coord/sequencer.rs
index 61e7665e36..93efff8c9e 100644
--- c/src/adapter/src/coord/sequencer.rs
+++ i/src/adapter/src/coord/sequencer.rs
@@ -379,8 +379,10 @@ impl Coordinator {
                     self.sequence_end_transaction(ctx, action).await;
                 }
                 Plan::Select(plan) => {
-                    let max = Some(ctx.session().vars().max_query_result_size());
-                    self.sequence_peek(ctx, pl…
Signed-off-by: Moritz Hoffmann <[email protected]>

# Conflicts:
#	src/compute/src/server.rs

diff --git c/src/adapter/src/coord/sequencer/inner/peek.rs i/src/adapter/src/coord/sequencer/inner/peek.rs
index d53635b..9db9f1f 100644
--- c/src/adapter/src/coord/sequencer/inner/peek.rs
+++ i/src/adapter/src/coord/sequencer/inner/peek.rs
@@ -854,7 +854,7 @@ impl Coordinator {
         let source_arity = typ.arity();

         if let peek::PeekPlan::SlowPath(PeekDataflowPlan { desc, .. }) = &mut peek_plan {
-            desc.memory_limit = max_query_heap_size;
+            desc.heap_size_limit = max_query_heap_size;
         }

         self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);
diff --git c/src/adapter/src/coord/sequencer/inner/subscribe.rs i/src/adapter/src/coord/sequencer/inner/subscribe.rs
index 67f6a84..f2aeac4 100644
--- c/src/adapter/src/coord/sequencer/inner/subscribe.rs
+++ i/src/adapter/src/coord/sequencer/inner/subscribe.rs
@@ -357,7 +357,7 @@ impl Coordinator {

         let (mut df_desc, df_meta) = global_lir_plan.unapply();

-        df_desc.memory_limit = ctx.session().vars().max_query_heap_size();
+        df_desc.heap_size_limit = ctx.session().vars().max_query_heap_size();

         // Emit notices.
         self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
diff --git c/src/compute-client/src/controller/instance.rs i/src/compute-client/src/controller/instance.rs
index 67bb609..85cb917 100644
--- c/src/compute-client/src/controller/instance.rs
+++ i/src/compute-client/src/controller/instance.rs
@@ -1352,7 +1352,7 @@ where
             refresh_schedule: dataflow.refresh_schedule,
             debug_name: dataflow.debug_name,
             time_dependence: dataflow.time_dependence,
-            memory_limit: dataflow.memory_limit,
+            heap_size_limit: dataflow.heap_size_limit,
         };

         if augmented_dataflow.is_transient() {
@@ -2009,18 +2009,14 @@ where
                 self.update_operator_hydration_status(replica_id, status)
             }
             StatusResponse::DataflowLimitExceeded(status) => {
-                self.update_dataflow_limit_status(replica_id, status)
+                self.handle_dataflow_limit_status(replica_id, status)
             }
         }
     }

-    /// Update the tracked hydration status for an operator according to a received status update.
-    fn update_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
-        tracing::warn!(
-            "Dataflow limit exceeded on replica {}: {:?}",
-            replica_id,
-            status
-        );
+    /// Handle a dataflow limit exceeded response.
+    fn handle_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
+        tracing::debug!(%replica_id, ?status, "dataflow limit exceeded");
         if let Some(subscribe) = self.subscribes.get(&status.collection_id) {
             self.deliver_response(ComputeControllerResponse::SubscribeResponse(
                 status.collection_id,
diff --git c/src/compute-types/src/dataflows.proto i/src/compute-types/src/dataflows.proto
index 68f8d0b..72910c0 100644
--- c/src/compute-types/src/dataflows.proto
+++ i/src/compute-types/src/dataflows.proto
@@ -62,7 +62,7 @@ message ProtoDataflowDescription {

   string debug_name = 8;

-  optional uint64 memory_limit = 12;
+  optional uint64 heap_size_limit = 12;
 }

 message ProtoIndexDesc {
diff --git c/src/compute-types/src/dataflows.rs i/src/compute-types/src/dataflows.rs
index 18ca55f..e36060e 100644
--- c/src/compute-types/src/dataflows.rs
+++ i/src/compute-types/src/dataflows.rs
@@ -76,7 +76,7 @@ pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
     /// Description of how the dataflow's progress relates to wall-clock time. None for unknown.
     pub time_dependence: Option<TimeDependence>,
     /// Optional heap size limit for this dataflow, in bytes.
-    pub memory_limit: Option<u64>,
+    pub heap_size_limit: Option<u64>,
 }

 impl<P, S> DataflowDescription<P, S, mz_repr::Timestamp> {
@@ -149,7 +149,7 @@ impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
             refresh_schedule: None,
             debug_name: name,
             time_dependence: None,
-            memory_limit: None,
+            heap_size_limit: None,
         }
     }

@@ -565,7 +565,7 @@ where
             refresh_schedule: self.refresh_schedule.clone(),
             debug_name: self.debug_name.clone(),
             time_dependence: self.time_dependence.clone(),
-            memory_limit: self.memory_limit,
+            heap_size_limit: self.heap_size_limit,
         }
     }
 }
@@ -584,7 +584,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, Coll
             refresh_schedule: self.refresh_schedule.into_proto(),
             debug_name: self.debug_name.clone(),
             time_dependence: self.time_dependence.into_proto(),
-            memory_limit: self.memory_limit.into_proto(),
+            heap_size_limit: self.heap_size_limit.into_proto(),
         }
     }

@@ -608,7 +608,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<RenderPlan, Coll
             refresh_schedule: proto.refresh_schedule.into_rust()?,
             debug_name: proto.debug_name,
             time_dependence: proto.time_dependence.into_rust()?,
-            memory_limit: proto.memory_limit,
+            heap_size_limit: proto.heap_size_limit,
         })
     }
 }
@@ -789,7 +789,7 @@ where
                     refresh_schedule,
                     debug_name,
                 ),
-                (time_dependence, memory_limit),
+                (time_dependence, heap_size_limit),
             )| DataflowDescription {
                 source_imports: BTreeMap::from_iter(source_imports),
                 index_imports: BTreeMap::from_iter(index_imports),
@@ -814,7 +814,7 @@ where
                 },
                 debug_name,
                 time_dependence,
-                memory_limit,
+                heap_size_limit,
             },
         )
 }
diff --git c/src/compute-types/src/plan/lowering.rs i/src/compute-types/src/plan/lowering.rs
index fdaa363..ca22355 100644
--- c/src/compute-types/src/plan/lowering.rs
+++ i/src/compute-types/src/plan/lowering.rs
@@ -120,7 +120,7 @@ impl Context {
             refresh_schedule: desc.refresh_schedule,
             debug_name: desc.debug_name,
             time_dependence: desc.time_dependence,
-            memory_limit: desc.memory_limit,
+            heap_size_limit: desc.heap_size_limit,
         })
     }

diff --git c/src/compute/src/compute_state.rs i/src/compute/src/compute_state.rs
index 2780f99..1e78b1e 100644
--- c/src/compute/src/compute_state.rs
+++ i/src/compute/src/compute_state.rs
@@ -180,7 +180,7 @@ pub(crate) struct ComputeState {
     /// The interval for rounding logging timestamps.
     logging_interval: Duration,

-    /// The handles to inject updates for logging dataflows.
+    /// The handles to inject updates for logging dataflows. Only available if logging is enabled.
     pub logging_handles: Option<LoggingHandles>,
     /// Shared set of collections exceeding their heap size limit.
     pub dataflows_exceeding_heap_size_limit: Rc<RefCell<BTreeSet<usize>>>,
@@ -500,7 +500,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
                 plan_until = ?dataflow.until.elements(),
                 until = ?until.elements(),
-                memory_limit = ?dataflow.memory_limit,
+                heap_size_limit = ?dataflow.heap_size_limit,
                 "creating dataflow",
             );
         } else {
@@ -514,7 +514,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
                 plan_until = ?dataflow.until.elements(),
                 until = ?until.elements(),
-                memory_limit = ?dataflow.memory_limit,
+                heap_size_limit = ?dataflow.heap_size_limit,
                 "creating dataflow",
             );
         };
@@ -563,9 +563,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
                 .insert(id, Rc::clone(&suspension_token));
         }

-        if let (Some(limit), true) = (dataflow.memory_limit, dataflow.is_transient()) {
+        if let (Some(limit), true) = (dataflow.heap_size_limit, dataflow.is_transient()) {
             for id in dataflow.export_ids() {
-                // TODO: Make configurable.
                 self.handle_set_heap_size_limit(id, Some(limit));
             }
         }
@@ -601,6 +600,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         }
     }

+    /// Handles an update to a heap memory limit for a collection.
     fn handle_set_heap_size_limit(&mut self, id: GlobalId, limit: Option<u64>) {
         let collection = self
             .compute_state
@@ -614,18 +614,10 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
             let handle = &mut handles.heap_size_limits_handle;
             let time = *handle.time();
             if let Some(old_limit) = old_limit {
-                handle.send((
-                    (collection.dataflow_id, old_limit),
-                    time,
-                    -Diff::try_from(old_limit).expect("must fit"),
-                ));
+                handle.send(((collection.dataflow_id, old_limit), time, -1));
             }
             if let Some(limit) = limit {
-                handle.send((
-                    (collection.dataflow_id, limit),
-                    time,
-                    Diff::try_from(limit).expect("must fit"),
-                ));
+                handle.send(((collection.dataflow_id, limit), time, 1));
             }
         }
     }
@@ -705,11 +697,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         ) {
             let handle = &mut handle.heap_size_limits_handle;
             let time = *handle.time();
-            handle.send((
-                (collection.dataflow_id, limit),
-                time,
-                -Diff::try_from(limit).expect("must fit"),
-            ));
+            handle.send(((collection.dataflow_id, limit), time, -1));
         }
     }

@@ -1025,13 +1013,16 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         }
     }

+    /// Process reported limit violations.
+    ///
+    /// Reports any dataflows that have exceeded their heap size limit to the controller.
     pub fn process_limits(&self) {
-        for dataflow_id in std::mem::take(
-            &mut *self
-                .compute_state
-                .dataflows_exceeding_heap_size_limit
-                .borrow_mut(),
-        ) {
+        let mut borrow = self
+            .compute_state
+            .dataflows_exceeding_heap_size_limit
+            .borrow_mut();
+        for dataflow_id in std::mem::take(&mut *borrow) {
+            // We don't have a mapping of dataflow ID to global ID, so scan instead.
             for (&collection_id, state) in self.compute_state.collections.iter() {
                 if state.dataflow_id == dataflow_id {
                     let status = DataflowLimitStatus { collection_id };
@@ -1043,13 +1034,19 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
         }
     }

+    /// Advance handles to logging dataflows.
+    ///
+    /// Advances the handles based on the current system time.
     pub fn advance_handles(&mut self) {
-        let interval_ms = std::cmp::max(1, self.compute_state.logging_interval.as_millis());
-        let interval_ms = u64::try_from(interval_ms).expect("must fit");
-        let now = mz_ore::now::SYSTEM_TIME();
-        let time_ms = ((now / interval_ms) + 1) * interval_ms;
-        let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
         if let Some(handles) = self.compute_state.logging_handles.as_mut() {
+            // We try to tick time rounded to the logging interval.
+            let interval_ms = std::cmp::max(1, self.compute_state.logging_interval.as_millis());
+            let interval_ms = u64::try_from(interval_ms).expect("must fit");
+            // TODO: Is it correct to use the system time?
+            let now = mz_ore::now::SYSTEM_TIME();
+            let time_ms = ((now / interval_ms) + 1) * interval_ms;
+            let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
+
             handles.advance_to(new_time_ms);
         }
     }
diff --git c/src/compute/src/logging.rs i/src/compute/src/logging.rs
index 432c680..81bfaee 100644
--- c/src/compute/src/logging.rs
+++ i/src/compute/src/logging.rs
@@ -159,6 +159,7 @@ struct SharedLoggingState {
     arrangement_size_activators: BTreeMap<usize, Activator>,
 }

+/// Data returned by the logging dataflow initialization.
 pub(crate) struct LoggingTraces {
     /// Exported traces, by log variant.
     pub traces: BTreeMap<LogVariant, TraceBundle>,
@@ -170,6 +171,7 @@ pub(crate) struct LoggingTraces {
     pub logging_handles: LoggingHandles,
 }

+/// Handles to logging inputs.
 pub(crate) struct LoggingHandles {
     /// Handle to provide heap size limits per dataflow.
     pub heap_size_limits_handle: Handle<Timestamp, Update<(usize, u64)>>,
diff --git c/src/compute/src/logging/compute.rs i/src/compute/src/logging/compute.rs
index d52f08d..f4fbb98 100644
--- c/src/compute/src/logging/compute.rs
+++ i/src/compute/src/logging/compute.rs
@@ -483,7 +483,7 @@ pub(super) fn construct<S: Scheduler + 'static, G: Scope<Timestamp = Timestamp>>
             };

         let (arrangement_heap_size, arrangement_heap_size_stripped) =
-            arrangement_heap_size.bifurcate::<CapacityContainerBuilder<Vec<_>>>(
+            arrangement_heap_size.bifurcate::<CapacityContainerBuilder<Vec<_>>, _>(
                 "arrangement_heap_size",
                 |data, session| {
                     session.give_iterator(
diff --git c/src/compute/src/logging/timely.rs i/src/compute/src/logging/timely.rs
index 9506d04..e0450ea 100644
--- c/src/compute/src/logging/timely.rs
+++ i/src/compute/src/logging/timely.rs
@@ -186,7 +186,7 @@ pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
             },
         );

-        let (addresses, operator_to_dataflow) = addresses.bifurcate::<CapacityContainerBuilder<Vec<_>>>("addresses", |data, output| {
+        let (addresses, operator_to_dataflow) = addresses.bifurcate::<CapacityContainerBuilder<Vec<_>>, _>("addresses", |data, output| {
             output.give_iterator(IntoIterator::into_iter(data).map(|((op, addr), time, diff)| {
                 ((*op, addr[0]), *time, *diff)
             }));
diff --git c/src/compute/src/logging/watchdog.rs i/src/compute/src/logging/watchdog.rs
index 283622b..f849805 100644
--- c/src/compute/src/logging/watchdog.rs
+++ i/src/compute/src/logging/watchdog.rs
@@ -9,17 +9,20 @@

 //! Logging dataflows for watchdog events generated by clusterd.

+use std::cell::RefCell;
+use std::collections::BTreeSet;
+use std::rc::Rc;
+
+use differential_dataflow::trace::{BatchReader, Cursor};
+use differential_dataflow::AsCollection;
+use timely::dataflow::operators::Concat;
+use timely::dataflow::operators::Inspect;
+use timely::dataflow::{Scope, Stream};
+
 use crate::extensions::arrange::MzArrange;
 use crate::logging::Update;
 use crate::typedefs::spines::{ColKeyBuilder, ColValBatcher, ColValBuilder};
 use crate::typedefs::{KeyBatcher, KeySpine, KeyValSpine};
-use differential_dataflow::operators::CountTotal;
-use differential_dataflow::AsCollection;
-use std::cell::RefCell;
-use std::collections::BTreeSet;
-use std::rc::Rc;
-use timely::dataflow::operators::Concat;
-use timely::dataflow::{Scope, Stream};

 pub(super) struct Streams<S: Scope> {
     /// operator id, diff in bytes
@@ -47,24 +50,27 @@ pub(super) fn construct<S: Scope<Timestamp = mz_repr::Timestamp>>(

     // The following dataflow computes:
     // ```
-    // arrangement_heap_size    batcher_heap_size  operator_to_dataflow   heap_size_limits
-    //           |                    |                    |                    |
-    //           |----->concat<------ |                 arrange              arrange
-    //                    |                                |                    |
-    //               arrange: op_to_heap_size              |                    |
-    //                    |                                |                    |
-    //                    |-->join: dataflow_to_heap_size<-|                    |
-    //                                    |                                     |
-    //                                arrange                                   |
-    //                                    |                                     |
-    //                               count_total_core                           |
-    //                                    |                                     |
-    //                                arrange                                   |
-    //                                    |                                     |
-    //                                    |------------>join<-------------------|
-    //                                                    |
+    // arrangement_heap_size  batcher_heap_size  operator_to_dataflow  heap_size_limits
+    //           |                    |                   |                  |
+    //           |----->concat<------ |                arrange            arrange
+    //                    |                               |                  |
+    //               arrange: op_to_heap_size             |                  |
+    //                    |                               |                  |
+    //                    |->join: dataflow_to_heap_size<-|                  |
+    //                                   |                                   |
+    //                                   |                                   |
+    //                               arrange                                 |
+    //                                   |                                   |
+    //                                   |------------>join<-----------------|
+    //                                                   |
+    //                                                arrange
+    //                                                   |
+    //                                                reduce size > limit
+    //                                                   |
     //                                                result
     // ```
+    //
+    // We encode the size of a dataflow in the diff field, and the limit as actual data.

     let operator_to_heap_size = arrangement_heap_size.concat(&batcher_heap_size);
     let operator_to_heap_size = operator_to_heap_size
@@ -82,31 +88,45 @@ pub(super) fn construct<S: Scope<Timestamp = mz_repr::Timestamp>>(
     let dataflow_to_heap_size = operator_to_heap_size
         .join_core(&operator_to_dataflow, |_op, (), dataflow| {
             Some((*dataflow, ()))
-        });
+        })
+        .mz_arrange::<KeyBatcher<_, _, _>, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>(
+            "dataflow_to_heap_size",
+        );

     let heap_size_limits = heap_size_limits
         .as_collection()
         .mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>("heap_size_limits");

     let dataflow_to_heap_size_limit =  dataflow_to_heap_size
-        .mz_arrange::<KeyBatcher<_, _, _>, ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>(
-            "dataflow_to_heap_size",
-        )
-        .count_total_core::<mz_repr::Diff>()
-        .mz_arrange::<ColValBatcher<_, _, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<_, _, _, _>>(
+        .join_core(&heap_size_limits, |dataflow, (), limit| {
+            std::iter::once((*dataflow, *limit))
+        })
+        .mz_arrange::<ColValBatcher<usize, u64, _, _>, ColValBuilder<_, _, _, _>, KeyValSpine<usize, u64, _, mz_repr::Diff>>(
             "dataflow_to_heap_size_count",
         )
-        .join_core(&heap_size_limits, |dataflow, size, limit| {
-            if *size >= (*limit).try_into().expect("must fit") {
-                Some(*dataflow)
-            } else {
-                None
-            }
+        .reduce_abelian::<_, usize, (), ColKeyBuilder<_, _, _>, KeySpine<_, _, _>>("Reduce: dataflow_to_heap_size_count", |_dataflow, s, t| {
+            t.extend(s.iter().flat_map(|(&limit, size)| {
+                if *size >= limit.try_into().expect("must fit") {
+                    // We could output `size` here to report the actual size of the dataflow.
+                    Some(((), 1))
+                } else {
+                    None
+                }
+            }));
         });

-    dataflow_to_heap_size_limit.inspect(move |(dataflow_id, _, _)| {
-        dataflows_exceeding_heap_size_limit
-            .borrow_mut()
-            .insert(*dataflow_id);
+    dataflow_to_heap_size_limit.stream.inspect(move |batch| {
+        let mut cursor = batch.cursor();
+        while cursor.key_valid(&batch) {
+            let dataflow_id = *cursor.key(&batch);
+            cursor.map_times(&batch, |_time, _diff| {
+                // TODO: This only ever adds, never removes. Depending on what signals
+                //   we'd like to have, we could also consider _removing_ dataflows here.
+                dataflows_exceeding_heap_size_limit
+                    .borrow_mut()
+                    .insert(dataflow_id);
+            });
+            cursor.step_key(&batch);
+        }
     });
 }
diff --git c/src/timely-util/src/operator.rs i/src/timely-util/src/operator.rs
index 8cb6b18..0882589 100644
--- c/src/timely-util/src/operator.rs
+++ i/src/timely-util/src/operator.rs
@@ -942,26 +942,30 @@ mod bifurcate {

     use crate::containers::ProvidedBuilder;

+    /// Extension trait for [`StreamCore`].
     pub trait Bifurcate<G: Scope, C>: Sized {
-        fn bifurcate<CB: ContainerBuilder>(
-            &self,
-            name: &str,
-            logic: impl FnMut(
+        /// Bifurcates the stream into two streams. The logic provided will be applied to each
+        /// input container, and has the opportunity to push data into the provided session for
+        /// the second output while modifying the container if it chooses to. The container
+        /// will then be pushed into the first output stream.
+        fn bifurcate<CB, F>(&self, name: &str, logic: F) -> (Self, StreamCore<G, CB::Container>)
+        where
+            CB: ContainerBuilder,
+            F: FnMut(
                     &mut C,
                     &mut Session<
                         G::Timestamp,
                         CB,
                         Counter<G::Timestamp, CB::Container, Tee<G::Timestamp, CB::Container>>,
                     >,
-                ) + 'static,
-        ) -> (Self, StreamCore<G, CB::Container>);
+                ) + 'static;
     }

     impl<G: Scope, C: Container + Clone + 'static> Bifurcate<G, C> for StreamCore<G, C> {
-        fn bifurcate<CB: ContainerBuilder>(
-            &self,
-            name: &str,
-            mut logic: impl FnMut(
+        fn bifurcate<CB, F>(&self, name: &str, mut logic: F) -> (Self, StreamCore<G, CB::Container>)
+        where
+            CB: ContainerBuilder,
+            F: FnMut(
                     &mut C,
                     &mut Session<
                         G::Timestamp,
@@ -969,8 +973,8 @@ mod bifurcate {
                         Counter<G::Timestamp, CB::Container, Tee<G::Timestamp, CB::Container>>,
                     >,
                 ) + 'static,
-        ) -> (Self, StreamCore<G, CB::Container>) {
-            let mut builder = OperatorBuilder::new(format!("Bifurcate {name}"), self.scope());
+        {
+            let mut builder = OperatorBuilder::new(format!("Bifurcate({name})"), self.scope());
             let mut input = builder.new_input(self, Pipeline);
             let (mut self_out, self_stream) = builder.new_output::<ProvidedBuilder<C>>();
             let (mut other_out, other_stream) = builder.new_output::<CB>();
diff --git c/test/sqllogictest/distinct_arrangements.slt i/test/sqllogictest/distinct_arrangements.slt
index a07b22b..0346fd6 100644
--- c/test/sqllogictest/distinct_arrangements.slt
+++ i/test/sqllogictest/distinct_arrangements.slt
@@ -1099,6 +1099,7 @@ Arrange Timely(MessagesSent)
 Arrange Timely(Operates)
 Arrange Timely(Parks)
 Arrange Timely(Reachability)
+Reduce: dataflow_to_heap_size_count
 dataflow_to_heap_size
 dataflow_to_heap_size_count
 heap_size_limits
diff --git c/test/testdrive/query_heap_size.td i/test/testdrive/query_heap_size.td
index aefe95d..492174c 100644
--- c/test/testdrive/query_heap_size.td
+++ i/test/testdrive/query_heap_size.td
@@ -9,47 +9,38 @@

 # Test the `max_query_heap_size` session variable.

-$ set-arg-default single-replica-cluster=quickstart
+$ set-regex match=\d{13,20} replacement=<TIMESTAMP>

-> CREATE SOURCE tpch
-  IN CLUSTER ${arg.single-replica-cluster}
-  FROM LOAD GENERATOR TPCH (SCALE FACTOR 1, UP TO 1000)
-  FOR ALL TABLES;
+> CREATE TABLE t1 (a int4, b text)

-> CREATE VIEW vq01 AS
-  SELECT
-    l_returnflag,
-    l_linestatus,
-    sum(l_quantity) AS sum_qty,
-    sum(l_extendedprice) AS sum_base_price,
-    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
-    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
-    avg(l_quantity) AS avg_qty,
-    avg(l_extendedprice) AS avg_price,
-    avg(l_discount) AS avg_disc,
-    count(*) AS count_order
-  FROM
-    lineitem
-  WHERE
-    l_shipdate <= DATE '1998-12-01' - INTERVAL '60' day
-  GROUP BY
-    l_returnflag,
-    l_linestatus
-  ORDER BY
-    l_returnflag,
-    l_linestatus;
+> INSERT INTO t1 SELECT * FROM generate_series(1, 10000), repeat('a', 100);

 > SET max_query_heap_size = '1kB';

-! SELECT * FROM vq01;
+> BEGIN
+
+> DECLARE c CURSOR FOR SUBSCRIBE (SELECT max(a) FROM t1);
+
+# The first timestamp is computed without detecting that the limit has been
+# exceeded.
+> FETCH 1 c;
+<TIMESTAMP> 1 10000
+
+# No output should be produced. Instead an error .. notice?
+! FETCH 1 c;
 contains:Dataflow limit exceeded

+> ROLLBACK;
+
 > SET max_query_heap_size = '128MB';

-> SELECT * FROM vq01;
-4 values hashing to 87d2cbecfc8cdcd9c9614dfb7e6d236f
+> BEGIN

-> SET max_query_heap_size = '';
+> DECLARE c CURSOR FOR SUBSCRIBE (SELECT max(a) FROM t1);

-> SELECT * FROM vq01;
-4 values hashing to 87d2cbecfc8cdcd9c9614dfb7e6d236f
+> FETCH 1 c;
+<TIMESTAMP> 1 10000
+
+> FETCH 1 FROM c WITH (TIMEOUT = '1s');
+
+> ROLLBACK;
Signed-off-by: Moritz Hoffmann <[email protected]>
Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Releasing some comments already, but I only made it through adapter and the compute controller. Looks good so far though!

Comment on lines 469 to +470
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea to bundle these into a QueryResourceLimits or similar, to making passing them around less boilerplate-y.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's maybe a larger refactoring around here (the first ~900 lines of coord.rs are types without documentation, other than some field-level doccomments). Imo, maybe this PR isn't the place to do that, but rather a more holistic sweep?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having said that, the next several pages of diffs do seem to be more boilerplate, pairing up the new parameter with the prior one. I may be coming around!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have called this a nit! Imo it's good to follow the boy scout rule when it's not too much of a hassle, because we might never get to that holistic refactor. But nothing I'd block the PR on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to bundling into a QueryResourceLimits but also not blocking

},
TargetCluster::Active,
None,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not enforcing limits here? Afaict this is the code path for INSERT/UPDATE ... SELECT and nothing prevents the SELECT from being expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

What's tricky here is the already existing variable max_query_result_size != max_result_size. The former limits how large of a result we'll return to the user, while the later limits how large of a result we'll allow clusterd to send to environmentd.

Some more context is here but because the results of this peek are not generally user facing (except in the case where the user specifies a RETURNING clause) we don't want to set a max_query_result_size.

Comment on lines 2026 to 2051
fn handle_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
tracing::debug!(%replica_id, ?status, "dataflow limit exceeded");
if let Some(subscribe) = self.subscribes.get(&status.collection_id) {
self.deliver_response(ComputeControllerResponse::SubscribeResponse(
status.collection_id,
SubscribeBatch {
lower: subscribe.frontier.clone(),
upper: subscribe.frontier.clone(),
updates: Err("Dataflow limit exceeded".to_string()),
},
))
} else {
// Look for a matching peek
let mut peek_uuid = None;
for (uuid, peek) in self.peeks.iter() {
if peek.read_hold.id() == status.collection_id {
peek_uuid = Some(*uuid);
break;
}
}
if let Some(uuid) = peek_uuid {
self.cancel_peek(
uuid,
PeekResponse::Error("Dataflow limit exceeded".to_string()),
);
}
Copy link
Contributor

@teskje teskje Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts:

  • It's awkward that we can't immediately cancel the offending dataflow here because controller clients don't expect dataflows to get dropped silently. If we had a way to mark dataflows as "failed" we could use that here to drop them aggressively. Since we don't have that yet, I think the current way to report an error and then rely on the client to cancel the dataflow in a timely fashion makes sense.
  • Controller clients can specify a heap size limit for dataflows that are neither the target of peeks nor subscribes. We don't support this currently, so I think we should log an error here, or soft panic, if we don't find a subscribe/peek.
  • It seems important to report, which limit was exceeded, because users will definitely ask that once they get an error like this. The current "dataflow limit exceeded" response doesn't really provide that level of detail, so maybe we should just rename it to "dataflow heap limit exceeded"? Or add the limit type to the status, if we think that we might have more limits like this in the future.

} else {
// Look for a matching peek
let mut peek_uuid = None;
for (uuid, peek) in self.peeks.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: .find(...).map(...) might end up a bit more readable... or not :D

message ProtoStatusResponse {
oneof kind {
ProtoOperatorHydrationStatus operator_hydration = 1;
ProtoDataflowLimitStatus dataflow_limit_status = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field should be called dataflow_limit_exceeded, since that's what the StatusResponse variant is called too?

Comment on lines +659 to +664
/// A dataflow exceeded some limit.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct DataflowLimitStatus {
/// The ID of the compute collection exported by the dataflow.
pub collection_id: GlobalId,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned above, but we should also report, which limit!

@teskje teskje self-requested a review March 6, 2025 14:57
Copy link
Contributor

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapter changes LGTM

},
TargetCluster::Active,
None,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

What's tricky here is the already existing variable max_query_result_size != max_result_size. The former limits how large of a result we'll return to the user, while the later limits how large of a result we'll allow clusterd to send to environmentd.

Some more context is here but because the results of this peek are not generally user facing (except in the case where the user specifies a RETURNING clause) we don't want to set a max_query_result_size.


> ROLLBACK;

> SET max_query_heap_size = '128MB';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also test the behavior of RESET max_query_heap_size and SET max_query_heap_size = 0? Both should reset the size to None

Comment on lines 469 to +470
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to bundling into a QueryResourceLimits but also not blocking

Signed-off-by: Moritz Hoffmann <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants