-
Notifications
You must be signed in to change notification settings - Fork 0
1498: feat: Expose Logical and Physical plan details in the REST API #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,3 +108,4 @@ logs/ | |
|
|
||
| # Claude Code guidance file (local only) | ||
| CLAUDE.md | ||
| .claude/ | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -34,7 +34,7 @@ required-features = ["build-binary"] | |||||
|
|
||||||
| [features] | ||||||
| build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] | ||||||
| default = ["build-binary", "substrait"] | ||||||
| default = ["build-binary", "rest-api"] | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This changes the scheduler crate default features from Severity: low 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep This silently removes Substrait support from default scheduler builds, even though the gRPC query path still supports it behind Suggested change-default = ["build-binary", "rest-api"]
+default = ["build-binary", "rest-api", "substrait"]📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change. |
||||||
| # job info can cache stage plans, in some cases where | ||||||
| # task plans can be re-computed, cache behavior may need to be disabled. | ||||||
| disable-stage-plan-cache = [] | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,9 @@ pub struct JobResponse { | |
| pub num_stages: usize, | ||
| pub completed_stages: usize, | ||
| pub percent_complete: u8, | ||
| pub logical_plan: Option<String>, | ||
| pub physical_plan: Option<String>, | ||
| pub stage_plan: Option<String>, | ||
| } | ||
|
|
||
| #[derive(Debug, serde::Serialize)] | ||
|
|
@@ -172,13 +175,85 @@ pub async fn get_jobs< | |
| num_stages: job.num_stages, | ||
| completed_stages: job.completed_stages, | ||
| percent_complete, | ||
| logical_plan: None, | ||
| physical_plan: None, | ||
| stage_plan: None, | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| Ok(Json(jobs)) | ||
| } | ||
|
|
||
| pub async fn get_job< | ||
| T: AsLogicalPlan + Clone + Send + Sync + 'static, | ||
| U: AsExecutionPlan + Send + Sync + 'static, | ||
| >( | ||
| State(data_server): State<Arc<SchedulerServer<T, U>>>, | ||
| Path(job_id): Path<String>, | ||
| ) -> Result<impl IntoResponse, StatusCode> { | ||
| let graph = data_server | ||
| .state | ||
| .task_manager | ||
| .get_job_execution_graph(&job_id) | ||
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? | ||
| .ok_or(StatusCode::NOT_FOUND)?; | ||
| let stage_plan = format!("{:?}", graph); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Severity: medium 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! A (JSON) structured plans would be easier to consume by software clients (e.g. UI/TUI). Using the Debug implementation of the plans at least makes it easier to change the format without concerns that it will break the structure. |
||
| let job = graph.as_ref(); | ||
| let (plain_status, job_status) = match &job.status().status { | ||
| Some(Status::Queued(_)) => ("Queued".to_string(), "Queued".to_string()), | ||
| Some(Status::Running(_)) => ("Running".to_string(), "Running".to_string()), | ||
| Some(Status::Failed(error)) => { | ||
| ("Failed".to_string(), format!("Failed: {}", error.error)) | ||
| } | ||
| Some(Status::Successful(completed)) => { | ||
| let num_rows = completed | ||
| .partition_location | ||
| .iter() | ||
| .map(|p| p.partition_stats.as_ref().map(|s| s.num_rows).unwrap_or(0)) | ||
| .sum::<i64>(); | ||
| let num_rows_term = if num_rows == 1 { "row" } else { "rows" }; | ||
| let num_partitions = completed.partition_location.len(); | ||
| let num_partitions_term = if num_partitions == 1 { | ||
| "partition" | ||
| } else { | ||
| "partitions" | ||
| }; | ||
| ( | ||
| "Completed".to_string(), | ||
| format!( | ||
| "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.", | ||
| num_partitions, | ||
| num_partitions_term, | ||
| num_rows, | ||
| num_rows_term, | ||
| job.end_time() - job.start_time() | ||
| ), | ||
| ) | ||
| } | ||
| _ => ("Invalid".to_string(), "Invalid State".to_string()), | ||
| }; | ||
|
|
||
| let num_stages = job.stage_count(); | ||
| let completed_stages = job.completed_stages(); | ||
| let percent_complete = | ||
| ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Division by zero when
|
||
|
|
||
| Ok(Json(JobResponse { | ||
| job_id: job.job_id().to_string(), | ||
| job_name: job.job_name().to_string(), | ||
| job_status, | ||
| status: plain_status, | ||
| num_stages, | ||
| completed_stages, | ||
| percent_complete, | ||
| logical_plan: job.logical_plan().map(str::to_owned), | ||
| physical_plan: job.physical_plan().map(str::to_owned), | ||
| stage_plan: Some(stage_plan), | ||
|
Comment on lines
+195
to
+253
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Line 202 serializes the entire graph with 🤖 Prompt for AI Agents |
||
| })) | ||
| } | ||
|
Comment on lines
+188
to
+255
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new |
||
|
|
||
| pub async fn cancel_job< | ||
| T: AsLogicalPlan + Clone + Send + Sync + 'static, | ||
| U: AsExecutionPlan + Send + Sync + 'static, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,6 +31,7 @@ pub fn get_routes< | |||||||
| .route("/api/executors", get(handlers::get_executors::<T, U>)) | ||||||||
| .route("/api/jobs", get(handlers::get_jobs::<T, U>)) | ||||||||
| .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>)) | ||||||||
| .route("/api/job/{job_id}", get(handlers::get_job::<T, U>)) | ||||||||
|
Comment on lines
33
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding a new route for the same path See axum docs on routing to multiple methods.
Suggested change
Comment on lines
33
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Protect the per-job plan endpoint before enabling it by default. This route exposes raw logical, physical, and stage-plan data for any job, and I don't see any auth/authorization layer in this router. Those plans can contain table names, file paths, and literals, so adding this endpoint to the default REST surface creates a new disclosure path unless it is explicitly protected. 🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! Since the plan may contain sensitive data the REST API should not be enabled by default and it should be documented that the Scheduler should not be reachable from unauthorized clients. |
||||||||
| .route( | ||||||||
| "/api/job/{job_id}/stages", | ||||||||
| get(handlers::get_query_stages::<T, U>), | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -443,11 +443,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
| None | ||
| }; | ||
|
|
||
| let logical_plan_str = plan.display_indent().to_string(); | ||
|
|
||
| let plan = session_ctx.state().create_physical_plan(plan).await?; | ||
| debug!( | ||
| "Physical plan: {}", | ||
| DisplayableExecutionPlan::new(plan.as_ref()).indent(false) | ||
| ); | ||
| let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref()) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Severity: medium 🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! The physical plan is stringyfied before the final transformation and thus it may show wrong/obsolete information. This should either be corrected or a comment should be added why it is done before the final transformation. |
||
| .indent(false) | ||
| .to_string(); | ||
|
Comment on lines
+446
to
+455
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Capturing and storing raw logical and physical plans as strings without any redaction or masking of sensitive literals leads to information disclosure when these plans are exposed via the REST API. SQL queries frequently contain sensitive data in literals which are preserved in these plan representations.
Comment on lines
448
to
+455
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Capture the physical-plan string after the final rewrite. Line 453 snapshots the physical plan before the Suggested change- let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref())
- .indent(false)
- .to_string();
-
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
if node.output_partitioning().partition_count() == 0 {
let empty: Arc<dyn ExecutionPlan> =
Arc::new(EmptyExec::new(node.schema()));
Ok(Transformed::yes(empty))
@@
debug!(
"Transformed physical plan: {}",
DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false)
);
+ let physical_plan_str = DisplayableExecutionPlan::new(plan.data.as_ref())
+ .indent(false)
+ .to_string();Also applies to: 489-499 🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is not correct! The physical plan is the plan produced by DataFusion. The transformed plan is shown as "staged_plan". |
||
|
|
||
| let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| { | ||
| if node.output_partitioning().partition_count() == 0 { | ||
|
|
@@ -490,6 +495,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T, | |
| queued_at, | ||
| session_config, | ||
| subscriber, | ||
| Some(logical_plan_str), | ||
| Some(physical_plan_str), | ||
| ) | ||
| .await?; | ||
|
|
||
|
|
||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default feature silently drops
substraitwhen addingrest-apiHigh Severity
The
defaultfeature list changed from["build-binary", "substrait"]to["build-binary", "rest-api"], which removessubstraitfrom the defaults. This appears accidental — the intent was to addrest-apito defaults, butsubstraitwas replaced rather than kept alongside it. Anyone relying on default features for Substrait support will silently lose that functionality.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-to-have; category:bug; feedback: The Bugbot AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change.