Skip to content

Commit 391059e

Browse files
committed
feat: added with_context to user-configured operations (source, function, target) to hydrate error propagation
1 parent b8344f6 commit 391059e

File tree

1 file changed

+99
-22
lines changed

1 file changed

+99
-22
lines changed

src/ops/py_factory.rs

Lines changed: 99 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ impl PyFunctionExecutor {
8080
.transpose()?
8181
.as_ref(),
8282
)
83-
.to_result_with_py_trace(py)?;
83+
.to_result_with_py_trace(py)
84+
.with_context(|| format!("while calling user-configured function"))?;
8485
Ok(result.into_bound(py))
8586
}
8687
}
@@ -221,7 +222,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
221222
PyTuple::new(py, args.into_iter())?,
222223
Some(&kwargs.into_py_dict(py)?),
223224
)
224-
.to_result_with_py_trace(py)?;
225+
.to_result_with_py_trace(py)
226+
.with_context(|| format!("while building user-configured function"))?;
225227
let (result_type, executor) = result
226228
.extract::<(crate::py::Pythonized<schema::EnrichedValueType>, Py<PyAny>)>(py)?;
227229
Ok((
@@ -244,7 +246,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
244246
Python::with_gil(|py| -> anyhow::Result<_> {
245247
let prepare_coro = executor
246248
.call_method(py, "prepare", (), None)
247-
.to_result_with_py_trace(py)?;
249+
.to_result_with_py_trace(py)
250+
.with_context(|| format!("while preparing user-configured function"))?;
248251
let prepare_fut = from_py_future(
249252
py,
250253
&pyo3_async_runtimes::TaskLocals::new(
@@ -337,6 +340,7 @@ impl interface::SourceExecutor for PySourceExecutor {
337340
py_source_executor
338341
.call_method(py, "list_async", (pythonize(py, options)?,), None)
339342
.to_result_with_py_trace(py)
343+
.with_context(|| format!("while listing user-configured source"))
340344
})?;
341345

342346
// Create a stream that pulls from the Python async iterator one item at a time
@@ -376,7 +380,13 @@ impl interface::SourceExecutor for PySourceExecutor {
376380
),
377381
None,
378382
)
379-
.to_result_with_py_trace(py)?;
383+
.to_result_with_py_trace(py)
384+
.with_context(|| {
385+
format!(
386+
"while fetching user-configured source for key: {:?}",
387+
&key_clone
388+
)
389+
})?;
380390
let task_locals =
381391
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
382392
Ok(from_py_future(
@@ -416,7 +426,8 @@ impl PySourceExecutor {
416426
let next_item_coro = Python::with_gil(|py| -> Result<_> {
417427
let coro = py_async_iter
418428
.call_method0(py, "__anext__")
419-
.to_result_with_py_trace(py)?;
429+
.to_result_with_py_trace(py)
430+
.with_context(|| format!("while iterating over user-configured source"))?;
420431
let task_locals =
421432
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
422433
Ok(from_py_future(py, &task_locals, coro.into_bound(py))?)
@@ -549,7 +560,7 @@ impl PySourceExecutor {
549560
impl interface::SourceFactory for PySourceConnectorFactory {
550561
async fn build(
551562
self: Arc<Self>,
552-
_source_name: &str,
563+
source_name: &str,
553564
spec: serde_json::Value,
554565
context: Arc<interface::FlowInstanceContext>,
555566
) -> Result<(
@@ -567,7 +578,13 @@ impl interface::SourceFactory for PySourceConnectorFactory {
567578
let value_type_result = self
568579
.py_source_connector
569580
.call_method(py, "get_table_type", (), None)
570-
.to_result_with_py_trace(py)?;
581+
.to_result_with_py_trace(py)
582+
.with_context(|| {
583+
format!(
584+
"while fetching table type from user-configured source `{}`",
585+
source_name
586+
)
587+
})?;
571588
let table_type: schema::EnrichedValueType =
572589
depythonize(&value_type_result.into_bound(py))?;
573590
Ok(table_type)
@@ -596,14 +613,20 @@ impl interface::SourceFactory for PySourceConnectorFactory {
596613
table_type.typ
597614
),
598615
};
599-
616+
let source_name = source_name.to_string();
600617
let executor_fut = async move {
601618
// Create the executor using the async create_executor method
602619
let create_future = Python::with_gil(|py| -> Result<_> {
603620
let create_coro = self
604621
.py_source_connector
605622
.call_method(py, "create_executor", (pythonize(py, &spec)?,), None)
606-
.to_result_with_py_trace(py)?;
623+
.to_result_with_py_trace(py)
624+
.with_context(|| {
625+
format!(
626+
"while constructing executor for user-configured source `{}`",
627+
source_name
628+
)
629+
})?;
607630
let task_locals =
608631
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
609632
let create_future = from_py_future(py, &task_locals, create_coro.into_bound(py))?;
@@ -614,13 +637,25 @@ impl interface::SourceFactory for PySourceConnectorFactory {
614637

615638
let (py_source_executor_context, provides_ordinal) =
616639
Python::with_gil(|py| -> Result<_> {
617-
let executor_context =
618-
py_executor_context_result.to_result_with_py_trace(py)?;
640+
let executor_context = py_executor_context_result
641+
.to_result_with_py_trace(py)
642+
.with_context(|| {
643+
format!(
644+
"while getting executor context for user-configured source `{}`",
645+
source_name
646+
)
647+
})?;
619648

620649
// Get provides_ordinal from the executor context
621650
let provides_ordinal = executor_context
622651
.call_method(py, "provides_ordinal", (), None)
623-
.to_result_with_py_trace(py)?
652+
.to_result_with_py_trace(py)
653+
.with_context(|| {
654+
format!(
655+
"while calling provides_ordinal for user-configured source `{}`",
656+
source_name
657+
)
658+
})?
624659
.extract::<bool>(py)?;
625660

626661
Ok((executor_context, provides_ordinal))
@@ -720,20 +755,38 @@ impl interface::TargetFactory for PyExportTargetFactory {
720755
),
721756
None,
722757
)
723-
.to_result_with_py_trace(py)?;
758+
.to_result_with_py_trace(py)
759+
.with_context(|| {
760+
format!(
761+
"while setting up export context for user-configured target `{}`",
762+
&data_collection.name
763+
)
764+
})?;
724765

725766
// Call the `get_persistent_key` method to get the persistent key.
726767
let persistent_key = self
727768
.py_target_connector
728769
.call_method(py, "get_persistent_key", (&py_export_ctx,), None)
729-
.to_result_with_py_trace(py)?;
770+
.to_result_with_py_trace(py)
771+
.with_context(|| {
772+
format!(
773+
"while getting persistent key for user-configured target `{}`",
774+
&data_collection.name
775+
)
776+
})?;
730777
let persistent_key: serde_json::Value =
731778
depythonize(&persistent_key.into_bound(py))?;
732779

733780
let setup_state = self
734781
.py_target_connector
735782
.call_method(py, "get_setup_state", (&py_export_ctx,), None)
736-
.to_result_with_py_trace(py)?;
783+
.to_result_with_py_trace(py)
784+
.with_context(|| {
785+
format!(
786+
"while getting setup state for user-configured target `{}`",
787+
&data_collection.name
788+
)
789+
})?;
737790
let setup_state: serde_json::Value = depythonize(&setup_state.into_bound(py))?;
738791

739792
anyhow::Ok((py_export_ctx, persistent_key, setup_state))
@@ -747,7 +800,13 @@ impl interface::TargetFactory for PyExportTargetFactory {
747800
let prepare_coro = factory
748801
.py_target_connector
749802
.call_method(py, "prepare_async", (&py_export_ctx,), None)
750-
.to_result_with_py_trace(py)?;
803+
.to_result_with_py_trace(py)
804+
.with_context(|| {
805+
format!(
806+
"while preparing user-configured target `{}`",
807+
&data_collection.name
808+
)
809+
})?;
751810
let task_locals = pyo3_async_runtimes::TaskLocals::new(
752811
py_exec_ctx.event_loop.bind(py).clone(),
753812
);
@@ -816,7 +875,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
816875
),
817876
None,
818877
)
819-
.to_result_with_py_trace(py)?;
878+
.to_result_with_py_trace(py)
879+
.with_context(|| {
880+
format!("while calling check_state_compatibility in user-configured target")
881+
})?;
820882
let compatibility: SetupStateCompatibility = depythonize(&result.into_bound(py))?;
821883
Ok(compatibility)
822884
})?;
@@ -828,7 +890,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
828890
let result = self
829891
.py_target_connector
830892
.call_method(py, "describe_resource", (pythonize(py, key)?,), None)
831-
.to_result_with_py_trace(py)?;
893+
.to_result_with_py_trace(py)
894+
.with_context(|| {
895+
format!("while calling describe_resource in user-configured target")
896+
})?;
832897
let description = result.extract::<String>(py)?;
833898
Ok(description)
834899
})
@@ -885,7 +950,10 @@ impl interface::TargetFactory for PyExportTargetFactory {
885950
(pythonize(py, &setup_changes)?,),
886951
None,
887952
)
888-
.to_result_with_py_trace(py)?;
953+
.to_result_with_py_trace(py)
954+
.with_context(|| {
955+
format!("while calling apply_setup_changes_async in user-configured target")
956+
})?;
889957
let task_locals =
890958
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
891959
Ok(from_py_future(
@@ -895,7 +963,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
895963
)?)
896964
})?
897965
.await;
898-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
966+
Python::with_gil(move |py| {
967+
py_result
968+
.to_result_with_py_trace(py)
969+
.with_context(|| format!("while applying setup changes in user-configured target"))
970+
})?;
899971

900972
Ok(())
901973
}
@@ -946,7 +1018,8 @@ impl interface::TargetFactory for PyExportTargetFactory {
9461018
let result_coro = self
9471019
.py_target_connector
9481020
.call_method(py, "mutate_async", (py_args,), None)
949-
.to_result_with_py_trace(py)?;
1021+
.to_result_with_py_trace(py)
1022+
.with_context(|| format!("while calling mutate_async in user-configured target"))?;
9501023
let task_locals =
9511024
pyo3_async_runtimes::TaskLocals::new(py_exec_ctx.event_loop.bind(py).clone());
9521025
Ok(from_py_future(
@@ -957,7 +1030,11 @@ impl interface::TargetFactory for PyExportTargetFactory {
9571030
})?
9581031
.await;
9591032

960-
Python::with_gil(move |py| py_result.to_result_with_py_trace(py))?;
1033+
Python::with_gil(move |py| {
1034+
py_result
1035+
.to_result_with_py_trace(py)
1036+
.with_context(|| format!("while applying mutations in user-configured target"))
1037+
})?;
9611038
Ok(())
9621039
}
9631040
}

0 commit comments

Comments
 (0)