1498: feat: Expose Logical and Physical plan details in the REST API#5
1498: feat: Expose Logical and Physical plan details in the REST API#5martin-augment wants to merge 1 commit intomainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Ballista scheduler's REST API by providing a new endpoint to access detailed execution plan information for individual jobs. It introduces the capability to store and retrieve the logical, physical, and stage plans, offering greater transparency and debuggability into how queries are processed within the system. This change is crucial for users who need to understand the underlying execution strategy of their data processing tasks. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis change adds support for capturing and exposing query execution plans through the REST API. The logical and physical plans are captured at job submission time and stored in execution graph structures. A new REST endpoint retrieves individual job details with computed stage plans and stored plan information. The default feature set is updated from substrait to rest-api, and plan storage fields are propagated through the execution stack from TaskManager through AdaptiveExecutionGraph and StaticExecutionGraph implementations. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| [features] | ||
| build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] | ||
| default = ["build-binary", "substrait"] | ||
| default = ["build-binary", "rest-api"] |
There was a problem hiding this comment.
Default feature silently drops substrait when adding rest-api
High Severity
The default feature list changed from ["build-binary", "substrait"] to ["build-binary", "rest-api"], which removes substrait from the defaults. This appears accidental — the intent was to add rest-api to defaults, but substrait was replaced rather than kept alongside it. Anyone relying on default features for Substrait support will silently lose that functionality.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Bugbot AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change.
| let num_stages = job.stage_count(); | ||
| let completed_stages = job.completed_stages(); | ||
| let percent_complete = | ||
| ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; |
There was a problem hiding this comment.
Division by zero when num_stages is zero
Low Severity
The new get_job handler computes percent_complete by dividing completed_stages by num_stages without guarding against num_stages being zero. If stage_count() ever returns zero, this produces NaN from the f32 division, which then gets cast to u8 — yielding a potentially confusing result rather than a meaningful value.
| let num_stages = job.stage_count(); | ||
| let completed_stages = job.completed_stages(); | ||
| let percent_complete = | ||
| ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; |
There was a problem hiding this comment.
Duplicated status-mapping logic across two handlers
Low Severity
The new get_job handler contains a nearly identical copy of the status-matching and percent_complete computation logic from get_jobs. This duplicated block (mapping Status variants to strings, computing rows/partitions formatting, calculating percent) means future bug fixes or status changes need to be applied in two places, risking divergence.
Additional Locations (1)
There was a problem hiding this comment.
Code Review
This pull request adds a new endpoint to the REST API to expose logical and physical plan details, providing detailed job information. However, this implementation poses a significant security risk as it exposes potentially sensitive information (e.g., credentials, PII) embedded in SQL queries to any user with network access to the scheduler, due to a lack of authentication and authorization on the REST API endpoints. It is highly recommended to implement robust access control and a mechanism to redact sensitive data from execution plans before they are stored or exposed. Additionally, there's a critical issue with API routing that would break existing functionality, and an opportunity to refactor duplicated code for better maintainability.
| .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>)) | ||
| .route("/api/job/{job_id}", get(handlers::get_job::<T, U>)) |
There was a problem hiding this comment.
Adding a new route for the same path /api/job/{job_id} will overwrite the existing PATCH route, effectively disabling the cancel_job functionality. You should chain the method handlers for the same path.
See axum docs on routing to multiple methods.
| .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>)) | |
| .route("/api/job/{job_id}", get(handlers::get_job::<T, U>)) | |
| .route("/api/job/{job_id}", get(handlers::get_job::<T, U>).patch(handlers::cancel_job::<T, U>)) |
| pub async fn get_job< | ||
| T: AsLogicalPlan + Clone + Send + Sync + 'static, | ||
| U: AsExecutionPlan + Send + Sync + 'static, | ||
| >( | ||
| State(data_server): State<Arc<SchedulerServer<T, U>>>, | ||
| Path(job_id): Path<String>, | ||
| ) -> Result<impl IntoResponse, StatusCode> { | ||
| let graph = data_server | ||
| .state | ||
| .task_manager | ||
| .get_job_execution_graph(&job_id) | ||
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? | ||
| .ok_or(StatusCode::NOT_FOUND)?; | ||
| let stage_plan = format!("{:?}", graph); | ||
| let job = graph.as_ref(); | ||
| let (plain_status, job_status) = match &job.status().status { | ||
| Some(Status::Queued(_)) => ("Queued".to_string(), "Queued".to_string()), | ||
| Some(Status::Running(_)) => ("Running".to_string(), "Running".to_string()), | ||
| Some(Status::Failed(error)) => { | ||
| ("Failed".to_string(), format!("Failed: {}", error.error)) | ||
| } | ||
| Some(Status::Successful(completed)) => { | ||
| let num_rows = completed | ||
| .partition_location | ||
| .iter() | ||
| .map(|p| p.partition_stats.as_ref().map(|s| s.num_rows).unwrap_or(0)) | ||
| .sum::<i64>(); | ||
| let num_rows_term = if num_rows == 1 { "row" } else { "rows" }; | ||
| let num_partitions = completed.partition_location.len(); | ||
| let num_partitions_term = if num_partitions == 1 { | ||
| "partition" | ||
| } else { | ||
| "partitions" | ||
| }; | ||
| ( | ||
| "Completed".to_string(), | ||
| format!( | ||
| "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.", | ||
| num_partitions, | ||
| num_partitions_term, | ||
| num_rows, | ||
| num_rows_term, | ||
| job.end_time() - job.start_time() | ||
| ), | ||
| ) | ||
| } | ||
| _ => ("Invalid".to_string(), "Invalid State".to_string()), | ||
| }; | ||
|
|
||
| let num_stages = job.stage_count(); | ||
| let completed_stages = job.completed_stages(); | ||
| let percent_complete = | ||
| ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; | ||
|
|
||
| Ok(Json(JobResponse { | ||
| job_id: job.job_id().to_string(), | ||
| job_name: job.job_name().to_string(), | ||
| job_status, | ||
| status: plain_status, | ||
| num_stages, | ||
| completed_stages, | ||
| percent_complete, | ||
| logical_plan: job.logical_plan().map(str::to_owned), | ||
| physical_plan: job.physical_plan().map(str::to_owned), | ||
| stage_plan: Some(stage_plan), | ||
| })) | ||
| } |
There was a problem hiding this comment.
The new get_job endpoint exposes logical and physical execution plans, as well as the full execution graph, via an unauthenticated REST API. These plans are derived from SQL queries and often contain sensitive information such as database credentials, API keys, or Personally Identifiable Information (PII) embedded as literals. Since the Ballista scheduler's REST API lacks authentication and authorization, any user with network access can retrieve this sensitive information by discovering job IDs (e.g., via the /api/jobs endpoint). Additionally, the code block for determining job status (lines 204-236) is duplicated from the get_jobs function. Consider extracting this logic into a private helper function to improve maintainability and reduce code duplication. This helper could take &JobStatus, start_time, and end_time as arguments and return the (String, String) tuple for plain_status and job_status.
| let logical_plan_str = plan.display_indent().to_string(); | ||
|
|
||
| let plan = session_ctx.state().create_physical_plan(plan).await?; | ||
| debug!( | ||
| "Physical plan: {}", | ||
| DisplayableExecutionPlan::new(plan.as_ref()).indent(false) | ||
| ); | ||
| let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref()) | ||
| .indent(false) | ||
| .to_string(); |
There was a problem hiding this comment.
Capturing and storing raw logical and physical plans as strings without any redaction or masking of sensitive literals leads to information disclosure when these plans are exposed via the REST API. SQL queries frequently contain sensitive data in literals which are preserved in these plan representations.
🤖 Augment PR SummarySummary: Exposes per-job logical and physical plan details via the Ballista scheduler REST API. Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? | ||
| .ok_or(StatusCode::NOT_FOUND)?; | ||
| let stage_plan = format!("{:?}", graph); |
There was a problem hiding this comment.
stage_plan is currently built from format!("{:?}", graph), which can be very large and not a stable/intentional API representation (it may also expose a lot of internal details). Consider whether the REST payload should be more structured/stable or otherwise bounded in size.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! A (JSON) structured plans would be easier to consume by software clients (e.g. UI/TUI). Using the Debug implementation of the plans at least makes it easier to change the format without concerns that it will break the structure.
| "Physical plan: {}", | ||
| DisplayableExecutionPlan::new(plan.as_ref()).indent(false) | ||
| ); | ||
| let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref()) |
There was a problem hiding this comment.
physical_plan_str is captured before the later transform_down that can replace nodes (e.g., EmptyExec / distributed explain), so the plan returned by the API may not match the actual executed plan (plan.data). Consider whether the captured physical plan should reflect the final transformed plan.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! The physical plan is stringyfied before the final transformation and thus it may show wrong/obsolete information. This should either be corrected or a comment should be added why it is done before the final transformation.
| [features] | ||
| build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] | ||
| default = ["build-binary", "substrait"] | ||
| default = ["build-binary", "rest-api"] |
There was a problem hiding this comment.
This changes the scheduler crate default features from substrait to rest-api; if downstream users relied on substrait being enabled by default, this is a behavior change. Was the intent to drop substrait from defaults rather than add rest-api alongside it?
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ballista/scheduler/src/state/task_manager.rs (1)
281-315:⚠️ Potential issue | 🟠 MajorDon't let the new plan fields spill into the existing INFO log.
These strings now live on the execution graph, and Line 318 still logs
Submitting execution graph:\n\n{graph:?}. With the new fields on the graph types, every submission will start emitting the full logical/physical plan at INFO, which is both noisy and an info-disclosure risk. Please either exclude these fields fromDebugor stop logging the whole graph at INFO.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ballista/scheduler/src/state/task_manager.rs` around lines 281 - 315, The current INFO log that prints the entire execution graph (the `Submitting execution graph:\n\n{graph:?}` call) will now include sensitive large strings stored on the graph types (logical_plan / physical_plan); to fix, stop printing the whole graph or omit those fields from Debug: either remove or replace the `{graph:?}` usage with a targeted log that prints only safe metadata (job_id, job_name, session_id, planner type) OR modify the Debug/derive on AdaptiveExecutionGraph and StaticExecutionGraph (and any ExecutionGraphBox wrapper) to skip or redact the logical_plan and physical_plan fields (e.g., custom Debug impl or attributes to prevent them from being formatted) so that submitting the graph no longer emits full plan strings.
🧹 Nitpick comments (2)
ballista/scheduler/src/state/execution_graph.rs (2)
301-347: Consider a builder pattern for future maintainability.The constructor now has 10 parameters, which is getting unwieldy. While the
#[allow(clippy::too_many_arguments)]attribute handles the lint, a builder pattern would improve ergonomics and make it easier to add optional fields in the future without breaking existing call sites.That said, the current implementation is functional, and this refactor could be deferred if there are no immediate plans to add more parameters.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ballista/scheduler/src/state/execution_graph.rs` around lines 301 - 347, The constructor ExecutionGraph::new has many parameters and should be refactored to a builder to improve ergonomics and future-proof optional fields: create an ExecutionGraphBuilder struct with setters for scheduler_id, job_id, job_name, session_id, plan, queued_at, session_config, planner (or planner inputs), logical_plan, and physical_plan, move the logic currently in ExecutionGraph::new (calling planner.plan_query_stages, using ExecutionStageBuilder.build, timestamp_millis, and assembling stages/JobStatus/fields) into a build() method that returns Result<ExecutionGraph>, and replace direct calls to ExecutionGraph::new with ExecutionGraphBuilder::new(...).build() so callers can opt into required fields via builder methods and optional fields without breaking call sites (keep helper names: ExecutionGraphBuilder::new, build(), and preserve ExecutionStageBuilder usage).
632-634: Note: Cloning now includes potentially large plan strings.The
cloned()method performs a full clone of the execution graph, which now includes thelogical_planandphysical_planstrings. For complex queries, these strings can be substantial. Ifcloned()is called frequently (e.g., for state persistence or replication), this could impact memory usage and performance.If this becomes a concern, consider using
Arc<str>instead ofStringfor the plan fields to enable cheap cloning, or lazily computing the plan strings only when requested via the REST API.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ballista/scheduler/src/state/execution_graph.rs` around lines 632 - 634, The cloned() method (ExecutionGraph::cloned / returns ExecutionGraphBox) performs a full clone including potentially large logical_plan and physical_plan String fields; change those fields to Arc<str> (or Arc<String>/Arc<str> as appropriate) so clones are cheap, update all constructors, builders, and any places that mutate or deserialize the plans to produce Arc values, and adjust Clone/Debug/serde implementations and any code that expects &str or String (call .as_ref() or .to_string() only where owning String is actually required); alternatively, if you prefer lazy generation, move plan generation out of the core struct and expose a method that computes the plan on demand for the REST API instead of storing large Strings that cloned() duplicates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ballista/scheduler/Cargo.toml`:
- Line 37: The default feature list was changed to remove substrait; restore it
by ensuring the default features array includes "substrait" along with the newly
desired "rest-api" (i.e., keep "substrait" and add "rest-api" instead of
replacing it) so the scheduler's default build still enables Substrait support;
update the default = ["build-binary", "rest-api"] entry to include "substrait"
as well.
In `@ballista/scheduler/src/api/handlers.rs`:
- Around line 195-253: The current code sets stage_plan = format!("{:?}",
graph), which leaks the Debug dump of the entire execution graph; instead
iterate the graph/job's per-stage objects and construct stage_plan from each
stage's explicit plan fields (e.g., logical/physical stage plan strings or a
structured stage summary) rather than using Debug. Locate the graph/job value
(symbols: graph, job) and replace the Debug-format call with code that collects
per-stage plan representations (e.g., map over job.stages() or graph.stages(),
extracting stage.logical_plan()/stage.physical_plan() or equivalent and
serializing them into a compact Vec or JSON string), then assign that structured
string/collection to JobResponse.stage_plan so the API returns only the explicit
stage plans.
In `@ballista/scheduler/src/api/mod.rs`:
- Around line 33-34: The GET route exposing per-job plans at "/api/job/{job_id}"
(handlers::get_job::<T, U>) must not be registered by default; gate it behind an
explicit protection mechanism (e.g., a configuration flag like
enable_job_plan_endpoint or an authentication/authorization
middleware/admin-only guard) and only register the route when that flag/guard is
present, or move it to a separate admin-only router; ensure the cancel_job route
(handlers::cancel_job::<T, U>) remains unaffected unless it also needs
protection.
In `@ballista/scheduler/src/state/mod.rs`:
- Around line 448-455: The current code builds physical_plan_str from the plan
before downstream rewrites, causing the REST API to show a different plan than
executed; update the logic to generate physical_plan_str after the final
transforms by using the rewritten plan data (e.g., call
DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false).to_string() or
equivalent) after transform_down/ExplainExec replacement and zero-partition
rewrites complete (update both occurrences around the transform_down block and
the other similar block at 489-499) so the serialized physical plan matches the
actual executed plan.
---
Outside diff comments:
In `@ballista/scheduler/src/state/task_manager.rs`:
- Around line 281-315: The current INFO log that prints the entire execution
graph (the `Submitting execution graph:\n\n{graph:?}` call) will now include
sensitive large strings stored on the graph types (logical_plan /
physical_plan); to fix, stop printing the whole graph or omit those fields from
Debug: either remove or replace the `{graph:?}` usage with a targeted log that
prints only safe metadata (job_id, job_name, session_id, planner type) OR modify
the Debug/derive on AdaptiveExecutionGraph and StaticExecutionGraph (and any
ExecutionGraphBox wrapper) to skip or redact the logical_plan and physical_plan
fields (e.g., custom Debug impl or attributes to prevent them from being
formatted) so that submitting the graph no longer emits full plan strings.
---
Nitpick comments:
In `@ballista/scheduler/src/state/execution_graph.rs`:
- Around line 301-347: The constructor ExecutionGraph::new has many parameters
and should be refactored to a builder to improve ergonomics and future-proof
optional fields: create an ExecutionGraphBuilder struct with setters for
scheduler_id, job_id, job_name, session_id, plan, queued_at, session_config,
planner (or planner inputs), logical_plan, and physical_plan, move the logic
currently in ExecutionGraph::new (calling planner.plan_query_stages, using
ExecutionStageBuilder.build, timestamp_millis, and assembling
stages/JobStatus/fields) into a build() method that returns
Result<ExecutionGraph>, and replace direct calls to ExecutionGraph::new with
ExecutionGraphBuilder::new(...).build() so callers can opt into required fields
via builder methods and optional fields without breaking call sites (keep helper
names: ExecutionGraphBuilder::new, build(), and preserve ExecutionStageBuilder
usage).
- Around line 632-634: The cloned() method (ExecutionGraph::cloned / returns
ExecutionGraphBox) performs a full clone including potentially large
logical_plan and physical_plan String fields; change those fields to Arc<str>
(or Arc<String>/Arc<str> as appropriate) so clones are cheap, update all
constructors, builders, and any places that mutate or deserialize the plans to
produce Arc values, and adjust Clone/Debug/serde implementations and any code
that expects &str or String (call .as_ref() or .to_string() only where owning
String is actually required); alternatively, if you prefer lazy generation, move
plan generation out of the core struct and expose a method that computes the
plan on demand for the REST API instead of storing large Strings that cloned()
duplicates.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8a74d200-f8d0-4e40-bdc5-b2f8333696b2
📒 Files selected for processing (10)
.gitignoreballista/scheduler/Cargo.tomlballista/scheduler/src/api/handlers.rsballista/scheduler/src/api/mod.rsballista/scheduler/src/state/aqe/mod.rsballista/scheduler/src/state/execution_graph.rsballista/scheduler/src/state/execution_graph_dot.rsballista/scheduler/src/state/mod.rsballista/scheduler/src/state/task_manager.rsballista/scheduler/src/test_utils.rs
| [features] | ||
| build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] | ||
| default = ["build-binary", "substrait"] | ||
| default = ["build-binary", "rest-api"] |
There was a problem hiding this comment.
Keep substrait in the default feature set.
This silently removes Substrait support from default scheduler builds, even though the gRPC query path still supports it behind feature = "substrait". That is a breaking behavior change unrelated to exposing REST plan details. If the goal is just to turn on the REST API by default, please add rest-api without dropping substrait.
Suggested change
-default = ["build-binary", "rest-api"]
+default = ["build-binary", "rest-api", "substrait"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| default = ["build-binary", "rest-api"] | |
| default = ["build-binary", "rest-api", "substrait"] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ballista/scheduler/Cargo.toml` at line 37, The default feature list was
changed to remove substrait; restore it by ensuring the default features array
includes "substrait" along with the newly desired "rest-api" (i.e., keep
"substrait" and add "rest-api" instead of replacing it) so the scheduler's
default build still enables Substrait support; update the default =
["build-binary", "rest-api"] entry to include "substrait" as well.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change.
| let graph = data_server | ||
| .state | ||
| .task_manager | ||
| .get_job_execution_graph(&job_id) | ||
| .await | ||
| .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? | ||
| .ok_or(StatusCode::NOT_FOUND)?; | ||
| let stage_plan = format!("{:?}", graph); | ||
| let job = graph.as_ref(); | ||
| let (plain_status, job_status) = match &job.status().status { | ||
| Some(Status::Queued(_)) => ("Queued".to_string(), "Queued".to_string()), | ||
| Some(Status::Running(_)) => ("Running".to_string(), "Running".to_string()), | ||
| Some(Status::Failed(error)) => { | ||
| ("Failed".to_string(), format!("Failed: {}", error.error)) | ||
| } | ||
| Some(Status::Successful(completed)) => { | ||
| let num_rows = completed | ||
| .partition_location | ||
| .iter() | ||
| .map(|p| p.partition_stats.as_ref().map(|s| s.num_rows).unwrap_or(0)) | ||
| .sum::<i64>(); | ||
| let num_rows_term = if num_rows == 1 { "row" } else { "rows" }; | ||
| let num_partitions = completed.partition_location.len(); | ||
| let num_partitions_term = if num_partitions == 1 { | ||
| "partition" | ||
| } else { | ||
| "partitions" | ||
| }; | ||
| ( | ||
| "Completed".to_string(), | ||
| format!( | ||
| "Completed. Produced {} {} containing {} {}. Elapsed time: {} ms.", | ||
| num_partitions, | ||
| num_partitions_term, | ||
| num_rows, | ||
| num_rows_term, | ||
| job.end_time() - job.start_time() | ||
| ), | ||
| ) | ||
| } | ||
| _ => ("Invalid".to_string(), "Invalid State".to_string()), | ||
| }; | ||
|
|
||
| let num_stages = job.stage_count(); | ||
| let completed_stages = job.completed_stages(); | ||
| let percent_complete = | ||
| ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8; | ||
|
|
||
| Ok(Json(JobResponse { | ||
| job_id: job.job_id().to_string(), | ||
| job_name: job.job_name().to_string(), | ||
| job_status, | ||
| status: plain_status, | ||
| num_stages, | ||
| completed_stages, | ||
| percent_complete, | ||
| logical_plan: job.logical_plan().map(str::to_owned), | ||
| physical_plan: job.physical_plan().map(str::to_owned), | ||
| stage_plan: Some(stage_plan), |
There was a problem hiding this comment.
stage_plan should not be the execution graph's Debug dump.
Line 202 serializes the entire graph with format!("{:?}", graph), so stage_plan is now coupled to Rust Debug output rather than an explicit stage-plan format. That makes the API brittle, drags unrelated internals into the response, and will change whenever the graph structs change. Please build this field from the per-stage plans explicitly instead of exposing the whole graph debug string.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ballista/scheduler/src/api/handlers.rs` around lines 195 - 253, The current
code sets stage_plan = format!("{:?}", graph), which leaks the Debug dump of the
entire execution graph; instead iterate the graph/job's per-stage objects and
construct stage_plan from each stage's explicit plan fields (e.g.,
logical/physical stage plan strings or a structured stage summary) rather than
using Debug. Locate the graph/job value (symbols: graph, job) and replace the
Debug-format call with code that collects per-stage plan representations (e.g.,
map over job.stages() or graph.stages(), extracting
stage.logical_plan()/stage.physical_plan() or equivalent and serializing them
into a compact Vec or JSON string), then assign that structured
string/collection to JobResponse.stage_plan so the API returns only the explicit
stage plans.
| .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>)) | ||
| .route("/api/job/{job_id}", get(handlers::get_job::<T, U>)) |
There was a problem hiding this comment.
Protect the per-job plan endpoint before enabling it by default.
This route exposes raw logical, physical, and stage-plan data for any job, and I don't see any auth/authorization layer in this router. Those plans can contain table names, file paths, and literals, so adding this endpoint to the default REST surface creates a new disclosure path unless it is explicitly protected.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ballista/scheduler/src/api/mod.rs` around lines 33 - 34, The GET route
exposing per-job plans at "/api/job/{job_id}" (handlers::get_job::<T, U>) must
not be registered by default; gate it behind an explicit protection mechanism
(e.g., a configuration flag like enable_job_plan_endpoint or an
authentication/authorization middleware/admin-only guard) and only register the
route when that flag/guard is present, or move it to a separate admin-only
router; ensure the cancel_job route (handlers::cancel_job::<T, U>) remains
unaffected unless it also needs protection.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! Since the plan may contain sensitive data the REST API should not be enabled by default and it should be documented that the Scheduler should not be reachable from unauthorized clients.
| let plan = session_ctx.state().create_physical_plan(plan).await?; | ||
| debug!( | ||
| "Physical plan: {}", | ||
| DisplayableExecutionPlan::new(plan.as_ref()).indent(false) | ||
| ); | ||
| let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref()) | ||
| .indent(false) | ||
| .to_string(); |
There was a problem hiding this comment.
Capture the physical-plan string after the final rewrite.
Line 453 snapshots the physical plan before the transform_down rewrite on Lines 457-483, but Line 494 submits the transformed plan. For ExplainExec replacement and zero-partition rewrites, the REST API will report a different physical plan than the one actually executed. Please build physical_plan_str from plan.data after the transformation instead.
Suggested change
- let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref())
- .indent(false)
- .to_string();
-
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
if node.output_partitioning().partition_count() == 0 {
let empty: Arc<dyn ExecutionPlan> =
Arc::new(EmptyExec::new(node.schema()));
Ok(Transformed::yes(empty))
@@
debug!(
"Transformed physical plan: {}",
DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false)
);
+ let physical_plan_str = DisplayableExecutionPlan::new(plan.data.as_ref())
+ .indent(false)
+ .to_string();Also applies to: 489-499
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ballista/scheduler/src/state/mod.rs` around lines 448 - 455, The current code
builds physical_plan_str from the plan before downstream rewrites, causing the
REST API to show a different plan than executed; update the logic to generate
physical_plan_str after the final transforms by using the rewritten plan data
(e.g., call
DisplayableExecutionPlan::new(plan.data.as_ref()).indent(false).to_string() or
equivalent) after transform_down/ExplainExec replacement and zero-partition
rewrites complete (update both occurrences around the transform_down block and
the other similar block at 489-499) so the serialized physical plan matches the
actual executed plan.
There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is not correct! The physical plan is the plan produced by DataFusion. The transformed plan is shown as "staged_plan".
Code Review: Expose Logical and Physical Plan Details in REST APIThis PR adds a new GET /api/job/{job_id} endpoint. Review content below (posting via workaround due to environment restrictions): Bug: physical_plan_str captured BEFORE transform_down (ballista/scheduler/src/state/mod.rs ~L453). Use plan.data.as_ref() after transformation instead. Breaking: substrait removed from default features in Cargo.toml without justification - semver bump warranted. Design: JobResponse shared between list/detail endpoints with inconsistent field population. Consider JobDetailResponse or serde skip_serializing_if. Duplication: Status matching logic copied in get_jobs and get_job - extract shared helper. API Quality: stage_plan uses Debug format - unstable and not human-friendly. Use ExecutionGraphDot::generate. Style: Two .route() calls for same path - chain as .get().patch() per Axum idiom. Observability: get_job silently drops errors from get_job_execution_graph unlike cancel_job which uses tracing::error!. Memory: Plan strings stored per-job for lifetime - could be KB each at scale. Testing: No tests for GET /api/job/{job_id} - need 404 and plan population tests. Minor: .gitignore missing newline at EOF after .claude/ entry. |
|
Code Review: PR #5 - Expose Logical and Physical Plan Details in REST API BUG - Physical Plan Captured Before Transformation (ballista/scheduler/src/state/mod.rs ~line 453) The physical_plan_str is captured BEFORE transform_down is applied. That transformation replaces ExplainExec nodes and substitutes EmptyExec for zero-partition nodes, so the stored plan string will not reflect the actual plan that gets executed. Consider capturing physical_plan_str after the transformation using plan.data.as_ref() instead of plan.as_ref(). BREAKING CHANGE - substrait Removed from Default Features (ballista/scheduler/Cargo.toml) The default changed from [build-binary, substrait] to [build-binary, rest-api]. Removing substrait from defaults is a breaking change for users who rely on it being available without explicitly opting in. This should be documented and may warrant a semver bump. DESIGN - Inconsistent JobResponse Between Endpoints JobResponse is shared between GET /api/jobs (list) and GET /api/job/{job_id} (single). The list endpoint always sets plan fields to None while the single endpoint populates them. Consider a separate JobDetailResponse type, or add skip_serializing_if so null fields are omitted from list responses rather than appearing as explicit nulls. MAINTENANCE - Duplicated Status Matching Logic The match block for Queued/Running/Failed/Successful status (including rows/partitions formatting) is duplicated between get_jobs and get_job. Extract a shared helper to prevent the two from diverging over time. API QUALITY - Unstructured stage_plan via Debug Format (handlers.rs line 202) Using the Debug format on the full execution graph produces a large, unstable, Rust-internal representation. Consider using ExecutionGraphDot::generate or directing callers to the existing /api/job/{job_id}/dot endpoint instead. STYLE - Route Registration Two separate .route() calls on /api/job/{job_id} work but the idiomatic Axum pattern chains methods on one call: .route("/api/job/{job_id}", get(get_job).patch(cancel_job)) OBSERVABILITY - Missing Error Logging in get_job The get_job handler silently discards errors with |_| when calling get_job_execution_graph. The cancel_job handler logs via tracing::error! first. Apply the same pattern in get_job. MEMORY - Plan Strings Stored Per Job Plan strings are stored on each execution graph for the job lifetime. For complex queries these can be several KB each. Worth noting if many jobs are retained long-term. TESTING - No Tests for New Endpoint No tests were added for GET /api/job/{job_id}. Tests covering 404 for unknown jobs and correct plan string population would be valuable. MINOR - .gitignore Missing Newline at EOF The .claude/ entry is missing a trailing newline. Summary: Bug (physical plan timing), Breaking (substrait removed from defaults), Design (shared response type), Maintenance (duplicated status logic), API (Debug stage_plan), Style (route chaining), Observability (silent error), Testing (no tests), Minor (newline). The core idea is solid. The most critical fixes are the physical plan capture timing bug and the unintended substrait breaking change. |
value:good-to-have; category:bug; feedback: The Claude AI reviewer is correct! Adding the rest-api feature as a default is related to the PR. But dropping the "substrait" one is not and it should be done in a separate PR, so that it appears in the changelog to let the users know about this change. |


1498: To review by AI