Skip to content

Commit 5f6660a

Browse files
committed
feat: added with_context to user-configured operations (source, function, target) to hydrate error propagation
1 parent ad10f8f commit 5f6660a

File tree

1 file changed

+99
-22
lines changed

1 file changed

+99
-22
lines changed

rust/cocoindex/src/ops/py_factory.rs

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ impl PyFunctionExecutor {
8181
.transpose()?
8282
.as_ref(),
8383
)
84-
.to_result_with_py_trace(py)?;
84+
.to_result_with_py_trace(py)
85+
.with_context(|| format!("while calling user-configured function"))?;
8586
Ok(result.into_bound(py))
8687
}
8788
}
@@ -230,7 +231,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
230231
PyTuple::new(py, args.into_iter())?,
231232
Some(&kwargs.into_py_dict(py)?),
232233
)
233-
.to_result_with_py_trace(py)?;
234+
.to_result_with_py_trace(py)
235+
.with_context(|| format!("while building user-configured function"))?;
234236
let (result_type, executor) = result
235237
.extract::<(crate::py::Pythonized<schema::EnrichedValueType>, Py<PyAny>)>(py)?;
236238
Ok((
@@ -253,7 +255,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
253255
Python::with_gil(|py| -> anyhow::Result<_> {
254256
let prepare_coro = executor
255257
.call_method(py, "prepare", (), None)
256-
.to_result_with_py_trace(py)?;
258+
.to_result_with_py_trace(py)
259+
.with_context(|| format!("while preparing user-configured function"))?;
257260
let prepare_fut = from_py_future(
258261
py,
259262
&pyo3_async_runtimes::TaskLocals::new(
@@ -360,6 +363,7 @@ impl interface::SourceExecutor for PySourceExecutor {
360363
py_source_executor
361364
.call_method(py, "list_async", (pythonize(py, options)?,), None)
362365
.to_result_with_py_trace(py)
366+
.with_context(|| format!("while listing user-configured source"))
363367
})?;
364368

365369
// Create a stream that pulls from the Python async iterator one item at a time
@@ -399,7 +403,13 @@ impl interface::SourceExecutor for PySourceExecutor {
399403
),
400404
None,
401405
)
402-
.to_result_with_py_trace(py)?;
406+
.to_result_with_py_trace(py)
407+
.with_context(|| {
408+
format!(
409+
"while fetching user-configured source for key: {:?}",
410+
&key_clone
411+
)
412+
})?;
403413
let task_locals =
404414
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
405415
Ok(from_py_future(
@@ -439,7 +449,8 @@ impl PySourceExecutor {
439449
let next_item_coro = Python::with_gil(|py| -> Result<_> {
440450
let coro = py_async_iter
441451
.call_method0(py, "__anext__")
442-
.to_result_with_py_trace(py)?;
452+
.to_result_with_py_trace(py)
453+
.with_context(|| format!("while iterating over user-configured source"))?;
443454
let task_locals =
444455
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
445456
Ok(from_py_future(py, &task_locals, coro.into_bound(py))?)
@@ -572,7 +583,7 @@ impl PySourceExecutor {
572583
impl interface::SourceFactory for PySourceConnectorFactory {
573584
async fn build(
574585
self: Arc<Self>,
575-
_source_name: &str,
586+
source_name: &str,
576587
spec: serde_json::Value,
577588
context: Arc<interface::FlowInstanceContext>,
578589
) -> Result<(
@@ -590,7 +601,13 @@ impl interface::SourceFactory for PySourceConnectorFactory {
590601
let value_type_result = self
591602
.py_source_connector
592603
.call_method(py, "get_table_type", (), None)
593-
.to_result_with_py_trace(py)?;
604+
.to_result_with_py_trace(py)
605+
.with_context(|| {
606+
format!(
607+
"while fetching table type from user-configured source `{}`",
608+
source_name
609+
)
610+
})?;
594611
let table_type: schema::EnrichedValueType =
595612
depythonize(&value_type_result.into_bound(py))?;
596613
Ok(table_type)
@@ -619,14 +636,20 @@ impl interface::SourceFactory for PySourceConnectorFactory {
619636
table_type.typ
620637
),
621638
};
622-
639+
let source_name = source_name.to_string();
623640
let executor_fut = async move {
624641
// Create the executor using the async create_executor method
625642
let create_future = Python::with_gil(|py| -> Result<_> {
626643
let create_coro = self
627644
.py_source_connector
628645
.call_method(py, "create_executor", (pythonize(py, &spec)?,), None)
629-
.to_result_with_py_trace(py)?;
646+
.to_result_with_py_trace(py)
647+
.with_context(|| {
648+
format!(
649+
"while constructing executor for user-configured source `{}`",
650+
source_name
651+
)
652+
})?;
630653
let task_locals =
631654
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
632655
let create_future = from_py_future(py, &task_locals, create_coro.into_bound(py))?;
@@ -637,13 +660,25 @@ impl interface::SourceFactory for PySourceConnectorFactory {
637660

638661
let (py_source_executor_context, provides_ordinal) =
639662
Python::with_gil(|py| -> Result<_> {
640-
let executor_context =
641-
py_executor_context_result.to_result_with_py_trace(py)?;
663+
let executor_context = py_executor_context_result
664+
.to_result_with_py_trace(py)
665+
.with_context(|| {
666+
format!(
667+
"while getting executor context for user-configured source `{}`",
668+
source_name
669+
)
670+
})?;
642671

643672
// Get provides_ordinal from the executor context
644673
let provides_ordinal = executor_context
645674
.call_method(py, "provides_ordinal", (), None)
646-
.to_result_with_py_trace(py)?
675+
.to_result_with_py_trace(py)
676+
.with_context(|| {
677+
format!(
678+
"while calling provides_ordinal for user-configured source `{}`",
679+
source_name
680+
)
681+
})?
647682
.extract::<bool>(py)?;
648683

649684
Ok((executor_context, provides_ordinal))
@@ -743,20 +778,38 @@ impl interface::TargetFactory for PyExportTargetFactory {
743778
),
744779
None,
745780
)
746-
.to_result_with_py_trace(py)?;
781+
.to_result_with_py_trace(py)
782+
.with_context(|| {
783+
format!(
784+
"while setting up export context for user-configured target `{}`",
785+
&data_collection.name
786+
)
787+
})?;
747788

748789
// Call the `get_persistent_key` method to get the persistent key.
749790
let persistent_key = self
750791
.py_target_connector
751792
.call_method(py, "get_persistent_key", (&py_export_ctx,), None)
752-
.to_result_with_py_trace(py)?;
793+
.to_result_with_py_trace(py)
794+
.with_context(|| {
795+
format!(
796+
"while getting persistent key for user-configured target `{}`",
797+
&data_collection.name
798+
)
799+
})?;
753800
let persistent_key: serde_json::Value =
754801
depythonize(&persistent_key.into_bound(py))?;
755802

756803
let setup_state = self
757804
.py_target_connector
758805
.call_method(py, "get_setup_state", (&py_export_ctx,), None)
759-
.to_result_with_py_trace(py)?;
806+
.to_result_with_py_trace(py)
807+
.with_context(|| {
808+
format!(
809+
"while getting setup state for user-configured target `{}`",
810+
&data_collection.name
811+
)
812+
})?;
760813
let setup_state: serde_json::Value = depythonize(&setup_state.into_bound(py))?;
761814

762815
anyhow::Ok((py_export_ctx, persistent_key, setup_state))
@@ -770,7 +823,13 @@ impl interface::TargetFactory for PyExportTargetFactory {
770823
let prepare_coro = factory
771824
.py_target_connector
772825
.call_method(py, "prepare_async", (&py_export_ctx,), None)
773-
.to_result_with_py_trace(py)?;
826+
.to_result_with_py_trace(py)
827+
.with_context(|| {
828+
format!(
829+
"while preparing user-configured target `{}`",
830+
&data_collection.name
831+
)
832+
})?;
774833
let task_locals = pyo3_async_runtimes::TaskLocals::new(
775834
py_exec_ctx.event_loop.bind(py).clone(),
776835
);
@@ -839,7 +898,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
839898
),
840899
None,
841900
)
842-
.to_result_with_py_trace(py)?;
901+
.to_result_with_py_trace(py)
902+
.with_context(|| {
903+
format!("while calling check_state_compatibility in user-configured target")
904+
})?;
843905
let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?;
844906
Ok(compatibility)
845907
})?;
@@ -851,7 +913,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
851913
let result = self
852914
.py_target_connector
853915
.call_method(py, "describe_resource", (pythonize(py, key)?,), None)
854-
.to_result_with_py_trace(py)?;
916+
.to_result_with_py_trace(py)
917+
.with_context(|| {
918+
format!("while calling describe_resource in user-configured target")
919+
})?;
855920
let description = result.extract::<String>(py)?;
856921
Ok(description)
857922
})
@@ -908,7 +973,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
908973
(pythonize(py, &setup_changes)?,),
909974
None,
910975
)
911-
.to_result_with_py_trace(py)?;
976+
.to_result_with_py_trace(py)
977+
.with_context(|| {
978+
format!("while calling apply_setup_changes_async in user-configured target")
979+
})?;
912980
let task_locals =
913981
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
914982
Ok(from_py_future(
@@ -918,7 +986,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
918986
)?)
919987
})?
920988
.await;
921-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
989+
Python::with_gil(move |py| {
990+
py_result
991+
.to_result_with_py_trace(py)
992+
.with_context(|| format!("while applying setup changes in user-configured target"))
993+
})?;
922994

923995
Ok(())
924996
}
@@ -969,7 +1041,8 @@ impl interface::TargetFactory for PyExportTargetFactory {
9691041
let result_coro = self
9701042
.py_target_connector
9711043
.call_method(py, "mutate_async", (py_args,), None)
972-
.to_result_with_py_trace(py)?;
1044+
.to_result_with_py_trace(py)
1045+
.with_context(|| format!("while calling mutate_async in user-configured target"))?;
9731046
let task_locals =
9741047
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
9751048
Ok(from_py_future(
@@ -980,7 +1053,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
9801053
})?
9811054
.await;
9821055

983-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
1056+
Python::with_gil(move |py| {
1057+
py_result
1058+
.to_result_with_py_trace(py)
1059+
.with_context(|| format!("while applying mutations in user-configured target"))
1060+
})?;
9841061
Ok(())
9851062
}
9861063
}

0 commit comments

Comments
 (0)