Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sparrow-runtime/src/execute/operation/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ impl ScanOperation {
})
// TODO: Share this code / unify it with other scans.
.take_until(async move {
println!("FRAZ - Take until");
let mut stop_signal_rx =
stop_signal_rx.expect("stop signal for use with materialization");
while !*stop_signal_rx.borrow() {
println!("Stop signal: {:?}", *stop_signal_rx.borrow());
match stop_signal_rx.changed().await {
Ok(_) => (),
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
itertools.workspace = true
parking_lot.workspace = true
smallvec.workspace = true
sparrow-api = { path = "../sparrow-api" }
sparrow-backend = { path = "../sparrow-backend" }
Expand Down
2 changes: 2 additions & 0 deletions crates/sparrow-session/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub enum Error {
Execute,
#[display(fmt = "execution failed")]
ExecutionFailed,
#[display(fmt = "execution results already consumed")]
AlreadyConsumed,
}

impl error_stack::Context for Error {}
Expand Down
182 changes: 137 additions & 45 deletions crates/sparrow-session/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,109 @@
use std::borrow::BorrowMut;
use std::ops::DerefMut;

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use futures::future::BoxFuture;
use futures::StreamExt;
use parking_lot::Mutex;

use crate::Error;

pub struct Execution {
/// Tokio handle managing this execution.
handle: tokio::runtime::Handle,
/// Channel to receive output on.
output: tokio_stream::wrappers::ReceiverStream<RecordBatch>,
// Future that resolves to the first error, if one occurred.
status: Status,
status: tokio::sync::Mutex<Status>,
/// Stop signal. Send `true` to stop execution.
stop_signal_tx: tokio::sync::watch::Sender<bool>,
pub schema: SchemaRef,
}

enum Status {
Running(BoxFuture<'static, error_stack::Result<(), Error>>),
/// This execution is still running and/or there is still unconsumed output.
Running {
/// Future that resolves to the first error, if one occurred.
error: BoxFuture<'static, error_stack::Result<(), Error>>,
/// Channel to receive output on.
output: tokio_stream::wrappers::ReceiverStream<RecordBatch>,
},
/// This execution has failed.
Failed,
/// The output of this execution has been completely consumed and
/// no failures happened.
Completed,
}

impl Execution {
pub(super) fn new(
handle: tokio::runtime::Handle,
output_rx: tokio::sync::mpsc::Receiver<RecordBatch>,
future: BoxFuture<'static, error_stack::Result<(), Error>>,
error: BoxFuture<'static, error_stack::Result<(), Error>>,
stop_signal_tx: tokio::sync::watch::Sender<bool>,
schema: SchemaRef,
) -> Self {
let output = tokio_stream::wrappers::ReceiverStream::new(output_rx);

// Constructs a futures that resolves to the first error, if one occurred.
let status = Status::Running(future);
let status = Status::Running { error, output };
Self {
handle,
output,
status,
status: tokio::sync::Mutex::new(status),
stop_signal_tx,
schema,
}
}

/// Check the status future.
/// Returns true if the output of this execution has been completely consumed
/// and no failures happened.
///
/// If it has previously completed (successfully or with error) returns
/// accordingly. Otherwise, check to see if the future is ready, and update
/// status (and return) accordingly.
fn is_done(&mut self) -> error_stack::Result<(), Error> {
let result = match &mut self.status {
Status::Running(progress) => {
// Based on the implementation of `FutureExt::now_or_never`:
let noop_waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&noop_waker);

match progress.as_mut().poll(&mut cx) {
std::task::Poll::Ready(x) => x,
_ => return Ok(()),
}
/// If an error has occurred, sets the Status to Failed and returns the error.
fn is_done(&self) -> error_stack::Result<bool, Error> {
let mut status = self.status.blocking_lock();
match *status {
Status::Running { .. } => {
// handled below
}
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
Status::Completed => return Ok(()),
Status::Completed => return Ok(true),
}

// Replace the status with completed temporarily so we can access the output stream.
// This is safe to do, as we have locked the status and no other thread can access it.
let Status::Running {
mut error,
mut output,
} = std::mem::replace(status.deref_mut(), Status::Completed)
else {
unreachable!("Status changed unexpectedly")
};

match result {
Ok(_) => {
self.status = Status::Completed;
Ok(())
}
Err(e) => {
self.status = Status::Failed;
// Based on the implementation of `FutureExt::now_or_never`:
let noop_waker = futures::task::noop_waker();
let mut cx = std::task::Context::from_waker(&noop_waker);

match error.as_mut().poll(&mut cx) {
std::task::Poll::Ready(Err(e)) => {
*status = Status::Failed;
Err(e)
}
std::task::Poll::Ready(Ok(_)) => {
// The query has completed without error, so check if all outputs have been consumed.
if let std::task::Poll::Ready(None) = output.poll_next_unpin(&mut cx) {
// The stream has terminated; set the status to complete.
*status = Status::Completed;
Ok(true)
} else {
// The stream has not terminated; set the status back to running.
let _ =
std::mem::replace(status.deref_mut(), Status::Running { error, output });
Ok(false)
}
}
_ => {
// Error channel has not resolved, so set the status back to running.
let _ = std::mem::replace(status.deref_mut(), Status::Running { error, output });
Ok(false)
}
}
}

Expand All @@ -87,38 +117,100 @@ impl Execution {
});
}

pub async fn next(&mut self) -> error_stack::Result<Option<RecordBatch>, Error> {
pub async fn next(&self) -> error_stack::Result<Option<RecordBatch>, Error> {
self.is_done()?;
Ok(self.output.next().await)

let mut status = self.status.lock().await;
match *status {
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
Status::Completed => {
// Should this just return None?
error_stack::bail!(Error::AlreadyConsumed);
}
Status::Running { .. } => {
// handled below
}
}

// Replace the status with completed temporarily so we can access the output stream.
// This is safe to do, as we have locked the status and no other thread can access it.
let Status::Running { error, mut output } =
std::mem::replace(status.deref_mut(), Status::Completed)
else {
unreachable!("Status changed unexpectedly")
};

let next = output.next().await;

// Move the original status back
let _ = std::mem::replace(status.deref_mut(), Status::Running { error, output });

Ok(next)
}

pub fn next_blocking(&mut self) -> error_stack::Result<Option<RecordBatch>, Error> {
pub fn next_blocking(&self) -> error_stack::Result<Option<RecordBatch>, Error> {
self.is_done()?;
Ok(self.handle.block_on(self.output.next()))
let mut status = self.status.blocking_lock();
match *status {
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
Status::Completed => {
// Should this just return None?
error_stack::bail!(Error::AlreadyConsumed);
}
Status::Running { .. } => {
// handled below
}
}

// Replace the status with completed temporarily so we can access the output stream.
// This is safe to do, as we have locked the status and no other thread can access it.
let Status::Running { error, mut output } =
std::mem::replace(status.deref_mut(), Status::Completed)
else {
unreachable!("Status changed unexpectedly")
};

let next = self.handle.block_on(output.next());

// Move the original status back
let _ = std::mem::replace(status.deref_mut(), Status::Running { error, output });

Ok(next)
}

pub async fn collect_all(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
let progress = match self.status {
Status::Running(progress) => progress,
pub async fn collect_all(&self) -> error_stack::Result<Vec<RecordBatch>, Error> {
let mut status = self.status.blocking_lock();

match *status {
Status::Failed => error_stack::bail!(Error::ExecutionFailed),
Status::Completed => {
// If the progress channel has completed without error, we know that the output channel
// hasn't filled up, so we can go ahead and collect the output
return Ok(self.output.collect().await);
error_stack::bail!(Error::AlreadyConsumed);
}
Status::Running { .. } => {
// handled below
}
}

// Replace the status with completed (assuming we won't have an error) for future calls.
// This is safe to do, as we have locked the status and no other thread can access it.
let Status::Running { error, output } =
std::mem::replace(status.deref_mut(), Status::Completed)
else {
unreachable!("Status changed unexpectedly")
};

let output = self.output.collect::<Vec<_>>();
let output = output.collect::<Vec<_>>();

let (first_error, output) = futures::join!(progress, output);
let (first_error, output) = futures::join!(error, output);
if let Err(e) = first_error {
*status = Status::Failed;
Err(e)
} else {
Ok(output)
}
}

pub fn collect_all_blocking(self) -> error_stack::Result<Vec<RecordBatch>, Error> {
pub fn collect_all_blocking(&self) -> error_stack::Result<Vec<RecordBatch>, Error> {
// In order to check the running status, we have to enter the runtime regardless,
// so there's no reason to check the status prior to entering the runtime
// here.
Expand Down
20 changes: 18 additions & 2 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading