Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl AnalyzedFlow {
}

pub async fn get_execution_plan(&self) -> Result<Arc<plan::ExecutionPlan>> {
let execution_plan = self.execution_plan.clone().await.std_result()?;
let execution_plan = self.execution_plan.clone().await.anyhow_result()?;
Ok(execution_plan)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/execution/memoization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
async move { fut.await.map_err(SharedError::new) }
})
.await
.std_result()?,
.anyhow_result()?,
),
None => Cow::Owned(compute().await?),
};
Expand Down
2 changes: 1 addition & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ impl SourceIndexingContext {
pending_update_fut
}
};
pending_update_fut.await.std_result()?;
pending_update_fut.await.anyhow_result()?;
Ok(())
}

Expand Down
118 changes: 72 additions & 46 deletions src/service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,45 +71,91 @@ impl From<ApiError> for PyErr {
}
}

pub struct ResidualErrorData {
message: String,
debug: String,
}

#[derive(Clone)]
pub struct SharedError {
pub err: Arc<anyhow::Error>,
pub struct ResidualError(Arc<ResidualErrorData>);

impl Display for ResidualError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0.message)
}
}

impl Debug for ResidualError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0.debug)
}
}

impl Error for ResidualError {}

enum SharedErrorState {
Anyhow(anyhow::Error),
ResidualErrorMessage(ResidualError),
}

/// SharedError allows to be cloned.
/// The original `anyhow::Error` can be extracted once, and later it decays to `ResidualError` which preserves the message and debug information.
#[derive(Clone)]
pub struct SharedError(Arc<Mutex<SharedErrorState>>);

impl SharedError {
pub fn new(err: anyhow::Error) -> Self {
Self { err: Arc::new(err) }
Self(Arc::new(Mutex::new(SharedErrorState::Anyhow(err))))
}

fn extract_anyhow_error(&self) -> anyhow::Error {
let mut state = self.0.lock().unwrap();
let mut_state = &mut *state;

let residual_err = match mut_state {
SharedErrorState::ResidualErrorMessage(err) => {
return anyhow::Error::from(err.clone());
}
SharedErrorState::Anyhow(err) => ResidualError(Arc::new(ResidualErrorData {
message: format!("{}", err),
debug: format!("{:?}", err),
})),
};
let orig_state = std::mem::replace(
mut_state,
SharedErrorState::ResidualErrorMessage(residual_err),
);
let SharedErrorState::Anyhow(err) = orig_state else {
panic!("Expected anyhow error");
};
err
}
}
impl Debug for SharedError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Debug::fmt(&self.err, f)
let state = self.0.lock().unwrap();
match &*state {
SharedErrorState::Anyhow(err) => Debug::fmt(err, f),
SharedErrorState::ResidualErrorMessage(err) => Debug::fmt(err, f),
}
}
}

impl Display for SharedError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Display::fmt(&self.err, f)
let state = self.0.lock().unwrap();
match &*state {
SharedErrorState::Anyhow(err) => Display::fmt(err, f),
SharedErrorState::ResidualErrorMessage(err) => Display::fmt(err, f),
}
}
}

impl<E: std::error::Error + Send + Sync + 'static> From<E> for SharedError {
fn from(err: E) -> Self {
Self {
err: Arc::new(anyhow::Error::from(err)),
}
}
}

impl AsRef<dyn std::error::Error> for SharedError {
fn as_ref(&self) -> &(dyn std::error::Error + 'static) {
self.err.as_ref().as_ref()
}
}

impl AsRef<dyn std::error::Error + Send + Sync> for SharedError {
fn as_ref(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
self.err.as_ref().as_ref()
Self(Arc::new(Mutex::new(SharedErrorState::Anyhow(
anyhow::Error::from(err),
))))
}
}

Expand All @@ -119,48 +165,28 @@ pub fn shared_ok<T>(value: T) -> Result<T, SharedError> {

pub type SharedResult<T> = Result<T, SharedError>;

pub struct SharedErrorWrapper(SharedError);

impl Display for SharedErrorWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}

impl Debug for SharedErrorWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Debug::fmt(&self.0, f)
}
}

impl Error for SharedErrorWrapper {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.err.as_ref().source()
}
}

pub trait SharedResultExt<T> {
fn std_result(self) -> Result<T, SharedErrorWrapper>;
fn anyhow_result(self) -> Result<T, anyhow::Error>;
}

impl<T> SharedResultExt<T> for Result<T, SharedError> {
fn std_result(self) -> Result<T, SharedErrorWrapper> {
fn anyhow_result(self) -> Result<T, anyhow::Error> {
match self {
Ok(value) => Ok(value),
Err(err) => Err(SharedErrorWrapper(err)),
Err(err) => Err(err.extract_anyhow_error()),
}
}
}

pub trait SharedResultExtRef<'a, T> {
fn std_result(self) -> Result<&'a T, SharedErrorWrapper>;
fn anyhow_result(self) -> Result<&'a T, anyhow::Error>;
}

impl<'a, T> SharedResultExtRef<'a, T> for &'a Result<T, SharedError> {
fn std_result(self) -> Result<&'a T, SharedErrorWrapper> {
fn anyhow_result(self) -> Result<&'a T, anyhow::Error> {
match self {
Ok(value) => Ok(value),
Err(err) => Err(SharedErrorWrapper(err.clone())),
Err(err) => Err(err.extract_anyhow_error()),
}
}
}
Expand Down
Loading