Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 86 additions & 26 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,42 @@
//! [`AppExecution`]: Handles executing queries for the TUI application.

use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
use crate::execution::ExecutionContext;
use color_eyre::eyre::Result;
use datafusion::execution::context::SessionContext;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::execute_stream;
use futures::StreamExt;
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

/// Handles executing queries for the TUI application, formatting results
/// and sending them to the UI.
pub(crate) struct AppExecution {
pub struct AppExecution {
inner: Arc<ExecutionContext>,
result_stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
}

impl AppExecution {
/// Create a new instance of [`AppExecution`].
pub fn new(inner: Arc<ExecutionContext>) -> Self {
Self { inner }
Self {
inner,
result_stream: Arc::new(Mutex::new(None)),
}
}

pub fn session_ctx(&self) -> &SessionContext {
self.inner.session_ctx()
}

pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) {
let mut s = self.result_stream.lock().await;
*s = Some(stream)
}

/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
Expand All @@ -60,33 +77,53 @@ impl AppExecution {
let start = std::time::Instant::now();
if i == statement_count - 1 {
info!("Executing last query and display results");
match self.inner.execute_sql(sql).await {
Ok(mut stream) => {
let mut batches = Vec::new();
while let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
batches.push(batch);
}
Err(e) => {
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
break;
sender.send(AppEvent::NewExecution)?;
match self.inner.create_physical_plan(sql).await {
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
Ok(stream) => {
self.set_result_stream(stream).await;
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql.to_string(),
batch: b,
duration,
};
sender.send(AppEvent::ExecutionResultsNextPage(
results,
))?;
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
Err(stream_err) => {
error!("Error creating physical plan: {:?}", stream_err);
let elapsed = start.elapsed();
let e = ExecutionError {
query: sql.to_string(),
error: stream_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
},
Err(plan_err) => {
error!("Error creating physical plan: {:?}", plan_err);
let elapsed = start.elapsed();
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error creating dataframe: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
let e = ExecutionError {
query: sql.to_string(),
error: plan_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
}
} else {
Expand All @@ -107,4 +144,27 @@ impl AppExecution {
}
Ok(())
}

pub async fn next_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
let start = std::time::Instant::now();
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql,
batch: b,
duration,
};
let _ = sender.send(AppEvent::ExecutionResultsNextPage(results));
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
}
136 changes: 68 additions & 68 deletions src/app/handlers/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,74 +66,74 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
}
}

KeyCode::Enter => {
info!("Run FS query");
let sql = app.state.flightsql_tab.editor().lines().join("");
info!("SQL: {}", sql);
let execution = Arc::clone(&app.execution);
let _event_tx = app.event_tx();
tokio::spawn(async move {
let client = execution.flightsql_client();
let mut query =
FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
let start = Instant::now();
if let Some(ref mut c) = *client.lock().await {
info!("Sending query");
match c.execute(sql, None).await {
Ok(flight_info) => {
for endpoint in flight_info.endpoint {
if let Some(ticket) = endpoint.ticket {
match c.do_get(ticket.into_request()).await {
Ok(mut stream) => {
let mut batches: Vec<RecordBatch> = Vec::new();
// temporarily only show the first batch to avoid
// buffering massive result sets. Eventually there should
// be some sort of paging logic
// see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
// while let Some(maybe_batch) = stream.next().await {
if let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
info!("Batch rows: {}", batch.num_rows());
batches.push(batch);
}
Err(e) => {
error!("Error getting batch: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}
let elapsed = start.elapsed();
let rows: usize =
batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error getting response: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}
}
}
Err(e) => {
error!("Error getting response: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
}
}
}

let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
});
}
// KeyCode::Enter => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to come back to this in a follow on PR for paginating FlightSQL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can somehow hide it behind an interface (so the pagination code for FlightSQL and local are the same) 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that would be nice. I want to finalize the functionality for flightsql first - then with both features complete i think it will be easier to reason about what the appropriate interface should be.

// info!("Run FS query");
// let sql = app.state.flightsql_tab.editor().lines().join("");
// info!("SQL: {}", sql);
// let execution = Arc::clone(&app.execution);
// let _event_tx = app.event_tx();
// tokio::spawn(async move {
// let client = execution.flightsql_client();
// let mut query =
// FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
// let start = Instant::now();
// if let Some(ref mut c) = *client.lock().await {
// info!("Sending query");
// match c.execute(sql, None).await {
// Ok(flight_info) => {
// for endpoint in flight_info.endpoint {
// if let Some(ticket) = endpoint.ticket {
// match c.do_get(ticket.into_request()).await {
// Ok(mut stream) => {
// let mut batches: Vec<RecordBatch> = Vec::new();
// // temporarily only show the first batch to avoid
// // buffering massive result sets. Eventually there should
// // be some sort of paging logic
// // see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
// // while let Some(maybe_batch) = stream.next().await {
// if let Some(maybe_batch) = stream.next().await {
// match maybe_batch {
// Ok(batch) => {
// info!("Batch rows: {}", batch.num_rows());
// batches.push(batch);
// }
// Err(e) => {
// error!("Error getting batch: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
// let elapsed = start.elapsed();
// let rows: usize =
// batches.iter().map(|r| r.num_rows()).sum();
// query.set_results(Some(batches));
// query.set_num_rows(Some(rows));
// query.set_execution_time(elapsed);
// }
// Err(e) => {
// error!("Error getting response: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
// }
// }
// Err(e) => {
// error!("Error getting response: {:?}", e);
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// }
// }
// }
//
// let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
// });
// }
_ => {}
}
}
Expand Down
Loading
Loading