Skip to content

Commit 9f09062

Browse files
authored
feat: added with_context to user-configured operations (source, funct… (#1275)
feat: added with_context to user-configured operations (source, function, target) to hydrate error propagation
1 parent 0efd99a commit 9f09062

File tree

1 file changed

+99
-22
lines changed

1 file changed

+99
-22
lines changed

rust/cocoindex/src/ops/py_factory.rs

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ impl PyFunctionExecutor {
8080
.transpose()?
8181
.as_ref(),
8282
)
83-
.to_result_with_py_trace(py)?;
83+
.to_result_with_py_trace(py)
84+
.with_context(|| format!("while calling user-configured function"))?;
8485
Ok(result.into_bound(py))
8586
}
8687
}
@@ -218,7 +219,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
218219
PyTuple::new(py, args.into_iter())?,
219220
Some(&kwargs.into_py_dict(py)?),
220221
)
221-
.to_result_with_py_trace(py)?;
222+
.to_result_with_py_trace(py)
223+
.with_context(|| format!("while building user-configured function"))?;
222224
let (result_type, executor) = result
223225
.extract::<(crate::py::Pythonized<schema::EnrichedValueType>, Py<PyAny>)>(py)?;
224226
let behavior_version = executor
@@ -246,7 +248,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
246248
Python::with_gil(|py| -> anyhow::Result<_> {
247249
let prepare_coro = executor
248250
.call_method(py, "prepare", (), None)
249-
.to_result_with_py_trace(py)?;
251+
.to_result_with_py_trace(py)
252+
.with_context(|| format!("while preparing user-configured function"))?;
250253
let prepare_fut = from_py_future(
251254
py,
252255
&pyo3_async_runtimes::TaskLocals::new(
@@ -345,6 +348,7 @@ impl interface::SourceExecutor for PySourceExecutor {
345348
py_source_executor
346349
.call_method(py, "list_async", (pythonize(py, options)?,), None)
347350
.to_result_with_py_trace(py)
351+
.with_context(|| format!("while listing user-configured source"))
348352
})?;
349353

350354
// Create a stream that pulls from the Python async iterator one item at a time
@@ -384,7 +388,13 @@ impl interface::SourceExecutor for PySourceExecutor {
384388
),
385389
None,
386390
)
387-
.to_result_with_py_trace(py)?;
391+
.to_result_with_py_trace(py)
392+
.with_context(|| {
393+
format!(
394+
"while fetching user-configured source for key: {:?}",
395+
&key_clone
396+
)
397+
})?;
388398
let task_locals =
389399
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
390400
Ok(from_py_future(
@@ -424,7 +434,8 @@ impl PySourceExecutor {
424434
let next_item_coro = Python::with_gil(|py| -> Result<_> {
425435
let coro = py_async_iter
426436
.call_method0(py, "__anext__")
427-
.to_result_with_py_trace(py)?;
437+
.to_result_with_py_trace(py)
438+
.with_context(|| format!("while iterating over user-configured source"))?;
428439
let task_locals =
429440
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
430441
Ok(from_py_future(py, &task_locals, coro.into_bound(py))?)
@@ -557,7 +568,7 @@ impl PySourceExecutor {
557568
impl interface::SourceFactory for PySourceConnectorFactory {
558569
async fn build(
559570
self: Arc<Self>,
560-
_source_name: &str,
571+
source_name: &str,
561572
spec: serde_json::Value,
562573
context: Arc<interface::FlowInstanceContext>,
563574
) -> Result<(
@@ -575,7 +586,13 @@ impl interface::SourceFactory for PySourceConnectorFactory {
575586
let value_type_result = self
576587
.py_source_connector
577588
.call_method(py, "get_table_type", (), None)
578-
.to_result_with_py_trace(py)?;
589+
.to_result_with_py_trace(py)
590+
.with_context(|| {
591+
format!(
592+
"while fetching table type from user-configured source `{}`",
593+
source_name
594+
)
595+
})?;
579596
let table_type: schema::EnrichedValueType =
580597
depythonize(&value_type_result.into_bound(py))?;
581598
Ok(table_type)
@@ -604,14 +621,20 @@ impl interface::SourceFactory for PySourceConnectorFactory {
604621
table_type.typ
605622
),
606623
};
607-
624+
let source_name = source_name.to_string();
608625
let executor_fut = async move {
609626
// Create the executor using the async create_executor method
610627
let create_future = Python::with_gil(|py| -> Result<_> {
611628
let create_coro = self
612629
.py_source_connector
613630
.call_method(py, "create_executor", (pythonize(py, &spec)?,), None)
614-
.to_result_with_py_trace(py)?;
631+
.to_result_with_py_trace(py)
632+
.with_context(|| {
633+
format!(
634+
"while constructing executor for user-configured source `{}`",
635+
source_name
636+
)
637+
})?;
615638
let task_locals =
616639
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
617640
let create_future = from_py_future(py, &task_locals, create_coro.into_bound(py))?;
@@ -622,13 +645,25 @@ impl interface::SourceFactory for PySourceConnectorFactory {
622645

623646
let (py_source_executor_context, provides_ordinal) =
624647
Python::with_gil(|py| -> Result<_> {
625-
let executor_context =
626-
py_executor_context_result.to_result_with_py_trace(py)?;
648+
let executor_context = py_executor_context_result
649+
.to_result_with_py_trace(py)
650+
.with_context(|| {
651+
format!(
652+
"while getting executor context for user-configured source `{}`",
653+
source_name
654+
)
655+
})?;
627656

628657
// Get provides_ordinal from the executor context
629658
let provides_ordinal = executor_context
630659
.call_method(py, "provides_ordinal", (), None)
631-
.to_result_with_py_trace(py)?
660+
.to_result_with_py_trace(py)
661+
.with_context(|| {
662+
format!(
663+
"while calling provides_ordinal for user-configured source `{}`",
664+
source_name
665+
)
666+
})?
632667
.extract::<bool>(py)?;
633668

634669
Ok((executor_context, provides_ordinal))
@@ -728,20 +763,38 @@ impl interface::TargetFactory for PyExportTargetFactory {
728763
),
729764
None,
730765
)
731-
.to_result_with_py_trace(py)?;
766+
.to_result_with_py_trace(py)
767+
.with_context(|| {
768+
format!(
769+
"while setting up export context for user-configured target `{}`",
770+
&data_collection.name
771+
)
772+
})?;
732773

733774
// Call the `get_persistent_key` method to get the persistent key.
734775
let persistent_key = self
735776
.py_target_connector
736777
.call_method(py, "get_persistent_key", (&py_export_ctx,), None)
737-
.to_result_with_py_trace(py)?;
778+
.to_result_with_py_trace(py)
779+
.with_context(|| {
780+
format!(
781+
"while getting persistent key for user-configured target `{}`",
782+
&data_collection.name
783+
)
784+
})?;
738785
let persistent_key: serde_json::Value =
739786
depythonize(&persistent_key.into_bound(py))?;
740787

741788
let setup_state = self
742789
.py_target_connector
743790
.call_method(py, "get_setup_state", (&py_export_ctx,), None)
744-
.to_result_with_py_trace(py)?;
791+
.to_result_with_py_trace(py)
792+
.with_context(|| {
793+
format!(
794+
"while getting setup state for user-configured target `{}`",
795+
&data_collection.name
796+
)
797+
})?;
745798
let setup_state: serde_json::Value = depythonize(&setup_state.into_bound(py))?;
746799

747800
anyhow::Ok((py_export_ctx, persistent_key, setup_state))
@@ -755,7 +808,13 @@ impl interface::TargetFactory for PyExportTargetFactory {
755808
let prepare_coro = factory
756809
.py_target_connector
757810
.call_method(py, "prepare_async", (&py_export_ctx,), None)
758-
.to_result_with_py_trace(py)?;
811+
.to_result_with_py_trace(py)
812+
.with_context(|| {
813+
format!(
814+
"while preparing user-configured target `{}`",
815+
&data_collection.name
816+
)
817+
})?;
759818
let task_locals = pyo3_async_runtimes::TaskLocals::new(
760819
py_exec_ctx.event_loop.bind(py).clone(),
761820
);
@@ -824,7 +883,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
824883
),
825884
None,
826885
)
827-
.to_result_with_py_trace(py)?;
886+
.to_result_with_py_trace(py)
887+
.with_context(|| {
888+
format!("while calling check_state_compatibility in user-configured target")
889+
})?;
828890
let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?;
829891
Ok(compatibility)
830892
})?;
@@ -836,7 +898,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
836898
let result = self
837899
.py_target_connector
838900
.call_method(py, "describe_resource", (pythonize(py, key)?,), None)
839-
.to_result_with_py_trace(py)?;
901+
.to_result_with_py_trace(py)
902+
.with_context(|| {
903+
format!("while calling describe_resource in user-configured target")
904+
})?;
840905
let description = result.extract::<String>(py)?;
841906
Ok(description)
842907
})
@@ -893,7 +958,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
893958
(pythonize(py, &setup_changes)?,),
894959
None,
895960
)
896-
.to_result_with_py_trace(py)?;
961+
.to_result_with_py_trace(py)
962+
.with_context(|| {
963+
format!("while calling apply_setup_changes_async in user-configured target")
964+
})?;
897965
let task_locals =
898966
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
899967
Ok(from_py_future(
@@ -903,7 +971,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
903971
)?)
904972
})?
905973
.await;
906-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
974+
Python::with_gil(move |py| {
975+
py_result
976+
.to_result_with_py_trace(py)
977+
.with_context(|| format!("while applying setup changes in user-configured target"))
978+
})?;
907979

908980
Ok(())
909981
}
@@ -954,7 +1026,8 @@ impl interface::TargetFactory for PyExportTargetFactory {
9541026
let result_coro = self
9551027
.py_target_connector
9561028
.call_method(py, "mutate_async", (py_args,), None)
957-
.to_result_with_py_trace(py)?;
1029+
.to_result_with_py_trace(py)
1030+
.with_context(|| format!("while calling mutate_async in user-configured target"))?;
9581031
let task_locals =
9591032
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
9601033
Ok(from_py_future(
@@ -965,7 +1038,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
9651038
})?
9661039
.await;
9671040

968-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
1041+
Python::with_gil(move |py| {
1042+
py_result
1043+
.to_result_with_py_trace(py)
1044+
.with_context(|| format!("while applying mutations in user-configured target"))
1045+
})?;
9691046
Ok(())
9701047
}
9711048
}

0 commit comments

Comments
 (0)