Skip to content

Commit 6e8244d

Browse files
authored
Make error location caused by functions more clear. (#181)
1 parent 555d328 commit 6e8244d

File tree

3 files changed

+43
-9
lines changed

3 files changed

+43
-9
lines changed

src/base/value.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,28 @@ impl serde::Serialize for KeyValue {
133133
}
134134
}
135135

136+
impl std::fmt::Display for KeyValue {
137+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138+
match self {
139+
KeyValue::Bytes(v) => write!(f, "{}", BASE64_STANDARD.encode(v)),
140+
KeyValue::Str(v) => write!(f, "\"{}\"", v.escape_default()),
141+
KeyValue::Bool(v) => write!(f, "{}", v),
142+
KeyValue::Int64(v) => write!(f, "{}", v),
143+
KeyValue::Range(v) => write!(f, "[{}, {})", v.start, v.end),
144+
KeyValue::Struct(v) => {
145+
write!(
146+
f,
147+
"[{}]",
148+
v.iter()
149+
.map(|v| v.to_string())
150+
.collect::<Vec<_>>()
151+
.join(", ")
152+
)
153+
}
154+
}
155+
}
156+
}
157+
136158
impl KeyValue {
137159
fn parts_from_str(
138160
values_iter: &mut impl Iterator<Item = String>,

src/execution/evaluator.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::{Mutex, OnceLock};
22
use std::{borrow::Cow, collections::BTreeMap};
33

4-
use anyhow::{bail, Ok, Result};
4+
use anyhow::{bail, Context, Ok, Result};
55
use futures::future::try_join_all;
66

77
use crate::builder::{plan::*, AnalyzedTransientFlow};
@@ -298,7 +298,17 @@ async fn evaluate_child_op_scope(
298298
child_scope_entry: ScopeEntry<'_>,
299299
cache: Option<&EvaluationCache>,
300300
) -> Result<()> {
301-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache).await
301+
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache)
302+
.await
303+
.with_context(|| {
304+
format!(
305+
"Evaluating in scope with key {}",
306+
match child_scope_entry.key.key() {
307+
Some(k) => k.to_string(),
308+
None => "()".to_string(),
309+
}
310+
)
311+
})
302312
}
303313

304314
async fn evaluate_op_scope(
@@ -331,15 +341,16 @@ async fn evaluate_op_scope(
331341
let output_value = evaluate_with_cell(output_value_cell.as_ref(), move || {
332342
op.executor.evaluate(input_values)
333343
})
334-
.await?;
344+
.await
345+
.with_context(|| format!("Evaluating Transform op `{}`", op.name,))?;
335346
head_scope.define_field(&op.output, &output_value)?;
336347
}
337348

338349
AnalyzedReactiveOp::ForEach(op) => {
339350
let target_field_schema = head_scope.get_field_schema(&op.local_field_ref)?;
340351
let collection_schema = match &target_field_schema.value_type.typ {
341352
schema::ValueType::Collection(cs) => cs,
342-
_ => panic!("Expect target field to be a collection"),
353+
_ => bail!("Expect target field to be a collection"),
343354
};
344355

345356
let target_field = head_scope.get_value_field_builder(&op.local_field_ref);
@@ -391,10 +402,12 @@ async fn evaluate_op_scope(
391402
})
392403
.collect::<Vec<_>>(),
393404
_ => {
394-
panic!("Target field type is expected to be a collection");
405+
bail!("Target field type is expected to be a collection");
395406
}
396407
};
397-
try_join_all(task_futs).await?;
408+
try_join_all(task_futs)
409+
.await
410+
.with_context(|| format!("Evaluating ForEach op `{}`", op.name,))?;
398411
}
399412

400413
AnalyzedReactiveOp::Collect(op) => {

src/execution/indexer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use futures::future::{join_all, try_join, try_join_all};
3-
use log::{debug, error};
3+
use log::error;
44
use serde::Serialize;
55
use sqlx::PgPool;
66
use std::collections::{HashMap, HashSet};
@@ -597,8 +597,7 @@ async fn update_source(
597597
let num_errors = join_all(all_keys_set.into_iter().map(|key| async move {
598598
let result = update_source_entry(plan, source_op_idx, schema, &key, pool).await;
599599
if let Err(e) = result {
600-
error!("Error indexing source row: {}", e);
601-
debug!("Detailed error: {:?}", e);
600+
error!("{:?}", e.context("Error in indexing a source row"));
602601
1
603602
} else {
604603
0

0 commit comments

Comments
 (0)