Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ pub struct PeekStageLinearizeTimestamp {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
Comment on lines 469 to +470
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea to bundle these into a QueryResourceLimits or similar, to making passing them around less boilerplate-y.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's maybe a larger refactoring around here (the first ~900 lines of coord.rs are types without documentation, other than some field-level doccomments). Imo, maybe this PR isn't the place to do that, but rather a more holistic sweep?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having said that, the next several pages of diffs do seem to be more boilerplate, pairing up the new parameter with the prior one. I may be coming around!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have called this a nit! Imo it's good to follow the boy scout rule when it's not too much of a hassle, because we might never get to that holistic refactor. But nothing I'd block the PR on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to bundling into a QueryResourceLimits but also not blocking

source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -481,6 +482,7 @@ pub struct PeekStageRealTimeRecency {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -496,6 +498,7 @@ pub struct PeekStageTimestampReadHold {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -512,6 +515,7 @@ pub struct PeekStageOptimize {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
Expand All @@ -527,6 +531,7 @@ pub struct PeekStageFinish {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
Expand Down
19 changes: 14 additions & 5 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,10 @@ impl Coordinator {
self.sequence_end_transaction(ctx, action).await;
}
Plan::Select(plan) => {
let max = Some(ctx.session().vars().max_query_result_size());
self.sequence_peek(ctx, plan, target_cluster, max).await;
let max_query = Some(ctx.session().vars().max_query_result_size());
let max_heap = ctx.session().vars().max_query_heap_size();
self.sequence_peek(ctx, plan, target_cluster, max_query, max_heap)
.await;
}
Plan::Subscribe(plan) => {
self.sequence_subscribe(ctx, plan, target_cluster).await;
Expand All @@ -392,9 +394,16 @@ impl Coordinator {
ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
}
Plan::ShowColumns(show_columns_plan) => {
let max = Some(ctx.session().vars().max_query_result_size());
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
.await;
let max_query = Some(ctx.session().vars().max_query_result_size());
let max_heap = ctx.session().vars().max_query_heap_size();
self.sequence_peek(
ctx,
show_columns_plan.select_plan,
target_cluster,
max_query,
max_heap,
)
.await;
}
Plan::CopyFrom(plan) => match plan.source {
CopyFromSource::Stdin => {
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,7 @@ impl Coordinator {
None,
ExplainContext::Pushdown,
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand Down Expand Up @@ -3004,6 +3005,7 @@ impl Coordinator {
},
TargetCluster::Active,
None,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not enforcing limits here? Afaict this is the code path for INSERT/UPDATE ... SELECT and nothing prevents the SELECT from being expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

What's tricky here is the already existing variable max_query_result_size != max_result_size. The former limits how large of a result we'll return to the user, while the later limits how large of a result we'll allow clusterd to send to environmentd.

Some more context is here but because the results of this peek are not generally user facing (except in the case where the user specifies a RETURNING clause) we don't want to set a max_query_result_size.

)
.await;

Expand Down
25 changes: 23 additions & 2 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Coordinator {
plan: plan::SelectPlan,
target_cluster: TargetCluster,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
) {
let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
Expand All @@ -143,7 +144,8 @@ impl Coordinator {
target_cluster,
None,
explain_ctx,
max_query_result_size
max_query_result_size,
max_query_heap_size,
),
ctx
);
Expand Down Expand Up @@ -209,6 +211,7 @@ impl Coordinator {
}),
ExplainContext::None,
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand Down Expand Up @@ -258,6 +261,7 @@ impl Coordinator {
optimizer_trace,
}),
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand All @@ -274,6 +278,7 @@ impl Coordinator {
copy_to_ctx: Option<CopyToContext>,
explain_ctx: ExplainContext,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
) -> Result<PeekStage, AdapterError> {
// Collect optimizer parameters.
let catalog = self.owned_catalog();
Expand Down Expand Up @@ -391,6 +396,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand All @@ -409,6 +415,7 @@ impl Coordinator {
source_ids,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
optimizer,
Expand All @@ -424,6 +431,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -468,6 +476,7 @@ impl Coordinator {
mut validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -508,6 +517,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
id_bundle,
target_replica,
Expand All @@ -526,6 +536,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
id_bundle,
target_replica,
Expand Down Expand Up @@ -640,6 +651,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand All @@ -656,6 +668,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand Down Expand Up @@ -745,6 +758,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -772,6 +786,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
source_ids,
Expand All @@ -790,6 +805,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
source_ids,
Expand All @@ -810,6 +826,7 @@ impl Coordinator {
validity: _,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand All @@ -833,9 +850,13 @@ impl Coordinator {
let session = ctx.session_mut();
let conn_id = session.conn_id().clone();

let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
let (mut peek_plan, df_meta, typ) = global_lir_plan.unapply();
let source_arity = typ.arity();

if let peek::PeekPlan::SlowPath(PeekDataflowPlan { desc, .. }) = &mut peek_plan {
desc.heap_size_limit = max_query_heap_size;
}

self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);

let target_cluster = self.catalog().get_cluster(cluster_id);
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Coordinator {
};
active_subscribe.initialize();

let (df_desc, df_meta) = global_lir_plan.unapply();
let (mut df_desc, df_meta) = global_lir_plan.unapply();

df_desc.heap_size_limit = ctx.session().vars().max_query_heap_size();

// Emit notices.
self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/as_of_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ mod tests {
refresh_schedule: Default::default(),
debug_name: Default::default(),
time_dependence: None,
heap_size_limit: None,
}
}

Expand Down
39 changes: 37 additions & 2 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use crate::protocol::command::{
};
use crate::protocol::history::ComputeCommandHistory;
use crate::protocol::response::{
ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
StatusResponse, SubscribeBatch, SubscribeResponse,
ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse,
OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeBatch, SubscribeResponse,
};
use crate::service::{ComputeClient, ComputeGrpcClient};

Expand Down Expand Up @@ -1360,6 +1360,7 @@ where
refresh_schedule: dataflow.refresh_schedule,
debug_name: dataflow.debug_name,
time_dependence: dataflow.time_dependence,
heap_size_limit: dataflow.heap_size_limit,
};

if augmented_dataflow.is_transient() {
Expand Down Expand Up @@ -2015,6 +2016,40 @@ where
StatusResponse::OperatorHydration(status) => {
self.update_operator_hydration_status(replica_id, status)
}
StatusResponse::DataflowLimitExceeded(status) => {
self.handle_dataflow_limit_status(replica_id, status)
}
}
}

/// Handle a dataflow limit exceeded response.
fn handle_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
tracing::debug!(%replica_id, ?status, "dataflow limit exceeded");
if let Some(subscribe) = self.subscribes.get(&status.collection_id) {
self.deliver_response(ComputeControllerResponse::SubscribeResponse(
status.collection_id,
SubscribeBatch {
lower: subscribe.frontier.clone(),
upper: subscribe.frontier.clone(),
updates: Err("Dataflow limit exceeded".to_string()),
},
))
// Look for a matching peek
} else if let Some((uuid, _)) = self
.peeks
.iter()
.find(|(_, peek)| peek.read_hold.id() == status.collection_id)
{
self.cancel_peek(
*uuid,
PeekResponse::Error("Dataflow limit exceeded".to_string()),
);
} else {
tracing::warn!(
%replica_id,
collection_id = %status.collection_id,
"dataflow limit exceeded for unknown collection"
);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/compute-client/src/protocol/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ message ProtoSubscribeBatch {
message ProtoStatusResponse {
oneof kind {
ProtoOperatorHydrationStatus operator_hydration = 1;
ProtoDataflowLimitStatus dataflow_limit_status = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field should be called dataflow_limit_exceeded, since that's what the StatusResponse variant is called too?

}
}

Expand All @@ -116,3 +117,7 @@ message ProtoOperatorHydrationStatus {
uint64 worker_id = 3;
bool hydrated = 4;
}

message ProtoDataflowLimitStatus {
mz_repr.global_id.ProtoGlobalId collection_id = 1;
}
29 changes: 29 additions & 0 deletions src/compute-client/src/protocol/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
pub enum StatusResponse {
/// Reports the hydration status of dataflow operators.
OperatorHydration(OperatorHydrationStatus),
/// Reports limit violations for dataflows.
DataflowLimitExceeded(DataflowLimitStatus),
}

impl RustType<ProtoStatusResponse> for StatusResponse {
Expand All @@ -597,6 +599,7 @@ impl RustType<ProtoStatusResponse> for StatusResponse {

let kind = match self {
Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
Self::DataflowLimitExceeded(status) => Kind::DataflowLimitStatus(status.into_proto()),
};
ProtoStatusResponse { kind: Some(kind) }
}
Expand All @@ -608,6 +611,9 @@ impl RustType<ProtoStatusResponse> for StatusResponse {
Some(Kind::OperatorHydration(status)) => {
Ok(Self::OperatorHydration(status.into_rust()?))
}
Some(Kind::DataflowLimitStatus(status)) => {
Ok(Self::DataflowLimitExceeded(status.into_rust()?))
}
None => Err(TryFromProtoError::missing_field(
"ProtoStatusResponse::kind",
)),
Expand Down Expand Up @@ -650,6 +656,29 @@ impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
}
}

/// A dataflow exceeded some limit.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct DataflowLimitStatus {
/// The ID of the compute collection exported by the dataflow.
pub collection_id: GlobalId,
}
Comment on lines +659 to +664
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned above, but we should also report, which limit!


impl RustType<ProtoDataflowLimitStatus> for DataflowLimitStatus {
fn into_proto(&self) -> ProtoDataflowLimitStatus {
ProtoDataflowLimitStatus {
collection_id: Some(self.collection_id.into_proto()),
}
}

fn from_proto(proto: ProtoDataflowLimitStatus) -> Result<Self, TryFromProtoError> {
Ok(Self {
collection_id: proto
.collection_id
.into_rust_if_some("ProtoDataflowLimitStatus::collection_id")?,
})
}
}

#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
Expand Down
Loading