Skip to content

Commit d41a69f

Browse files
authored
feat(clear-error): add more context info for flow build time error (#986)
1 parent dca017c commit d41a69f

File tree

3 files changed

+21
-14
lines changed

3 files changed

+21
-14
lines changed

examples/text_embedding/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ def text_embedding_flow(
6363

6464
doc_embeddings.export(
6565
"doc_embeddings",
66-
cocoindex.targets.Postgres(),
66+
cocoindex.targets.Postgres(
67+
database=cocoindex.setting.DatabaseConnectionSpec(url="123"),
68+
),
6769
primary_key_fields=["filename", "location"],
6870
vector_indexes=[
6971
cocoindex.VectorIndexDef(

src/builder/analyzer.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,9 @@ impl AnalyzerContext {
674674
let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone();
675675
let result_fut = async move {
676676
trace!("Start building executor for source op `{op_name}`");
677-
let executor = executor.await?;
677+
let executor = executor
678+
.await
679+
.with_context(|| format!("Preparing for source op: {op_name}"))?;
678680
trace!("Finished building executor for source op `{op_name}`");
679681
Ok(AnalyzedImportOp {
680682
executor,
@@ -700,10 +702,7 @@ impl AnalyzerContext {
700702
ReactiveOpSpec::Transform(op) => {
701703
let input_field_schemas =
702704
analyze_input_fields(&op.inputs, op_scope).with_context(|| {
703-
format!(
704-
"Failed to analyze inputs for transform op: {}",
705-
reactive_op.name
706-
)
705+
format!("Preparing inputs for transform op: {}", reactive_op.name)
707706
})?;
708707
let spec = serde_json::Value::Object(op.op.spec.clone());
709708

@@ -725,7 +724,7 @@ impl AnalyzerContext {
725724
async move {
726725
trace!("Start building executor for transform op `{op_name}`");
727726
let executor = executor.await.with_context(|| {
728-
format!("Failed to build executor for transform op: {op_name}")
727+
format!("Preparing for transform op: {op_name}")
729728
})?;
730729
let enable_cache = executor.enable_cache();
731730
let behavior_version = executor.behavior_version();
@@ -784,7 +783,7 @@ impl AnalyzerContext {
784783
local_field_ref,
785784
op_scope: analyzed_op_scope_fut
786785
.await
787-
.with_context(|| format!("Analyzing foreach op: {op_name}"))?,
786+
.with_context(|| format!("Preparing for foreach op: {op_name}"))?,
788787
name: op_name,
789788
concurrency_controller: concur_control::ConcurrencyController::new(
790789
&concur_control_options,
@@ -920,7 +919,7 @@ impl AnalyzerContext {
920919
let export_context = data_coll_output
921920
.export_context
922921
.await
923-
.with_context(|| format!("Analyzing export op: {op_name}"))?;
922+
.with_context(|| format!("Preparing for export op: {op_name}"))?;
924923
trace!("Finished building executor for export op `{op_name}`");
925924
Ok(AnalyzedExportOp {
926925
name: op_name,
@@ -1006,7 +1005,8 @@ pub async fn analyze_flow(
10061005
import_ops_futs.push(
10071006
analyzer_ctx
10081007
.analyze_import_op(&root_op_scope, import_op.clone())
1009-
.await?,
1008+
.await
1009+
.with_context(|| format!("Preparing for import op: {}", import_op.name))?,
10101010
);
10111011
}
10121012
let op_scope_fut = analyzer_ctx
@@ -1059,7 +1059,8 @@ pub async fn analyze_flow(
10591059
&mut targets_analyzed_ss,
10601060
&mut declarations_analyzed_ss,
10611061
)
1062-
.await?,
1062+
.await
1063+
.with_context(|| format!("Analyzing export ops for target `{target_kind}`"))?,
10631064
);
10641065
analyzed_target_op_groups.push(analyzed_target_op_group);
10651066
}

src/ops/factory_bases.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ impl<T: SourceFactoryBase> SourceFactory for T {
245245
EnrichedValueType,
246246
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
247247
)> {
248-
let spec: T::Spec = serde_json::from_value(spec)?;
248+
let spec: T::Spec = serde_json::from_value(spec)
249+
.with_context(|| format!("Failed in parsing spec for source `{source_name}`"))?;
249250
let output_schema = self.get_output_schema(&spec, &context).await?;
250251
let source_name = source_name.to_string();
251252
let executor = async move { self.build_executor(&source_name, spec, context).await };
@@ -323,7 +324,8 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
323324
EnrichedValueType,
324325
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
325326
)> {
326-
let spec: T::Spec = serde_json::from_value(spec)?;
327+
let spec: T::Spec = serde_json::from_value(spec)
328+
.with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
327329
let mut nonnull_args_idx = vec![];
328330
let mut may_nullify_output = false;
329331
let mut args_resolver = OpArgsResolver::new(
@@ -466,8 +468,10 @@ impl<T: TargetFactoryBase> TargetFactory for T {
466468
.into_iter()
467469
.map(|d| {
468470
anyhow::Ok(TypedExportDataCollectionSpec {
471+
spec: serde_json::from_value(d.spec).with_context(|| {
472+
format!("Failed in parsing spec for target `{}`", d.name)
473+
})?,
469474
name: d.name,
470-
spec: serde_json::from_value(d.spec)?,
471475
key_fields_schema: d.key_fields_schema,
472476
value_fields_schema: d.value_fields_schema,
473477
index_options: d.index_options,

0 commit comments

Comments
 (0)