Skip to content

Commit 7e80c85

Browse files
authored
feat(clear-error): add field path into error msg for deserialization (#988)
1 parent 88b7137 commit 7e80c85

File tree

16 files changed

+83
-36
lines changed

16 files changed

+83
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,4 @@ azure_storage_blobs = { version = "0.21.0", default-features = false, features =
144144
"enable_reqwest_rustls",
145145
"hmac_rust",
146146
] }
147+
serde_path_to_error = "0.1.17"

src/base/value.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1+
use crate::prelude::*;
2+
13
use super::schema::*;
24
use crate::base::duration::parse_duration;
3-
use crate::{api_bail, api_error};
4-
use anyhow::Result;
55
use base64::prelude::*;
66
use bytes::Bytes;
77
use chrono::Offset;
88
use log::warn;
99
use serde::{
10-
Deserialize, Serialize,
1110
de::{SeqAccess, Visitor},
1211
ser::{SerializeMap, SerializeSeq, SerializeTuple},
1312
};
@@ -1014,7 +1013,8 @@ where
10141013
Ok(Self {
10151014
fields: fields
10161015
.map(|(s, v)| {
1017-
let value = Value::<VS>::from_json(v, &s.value_type.typ)?;
1016+
let value = Value::<VS>::from_json(v, &s.value_type.typ)
1017+
.with_context(|| format!("while deserializing field `{}`", s.name))?;
10181018
if value.is_null() && !s.value_type.nullable {
10191019
api_bail!("expected non-null value for `{}`", s.name);
10201020
}
@@ -1033,9 +1033,10 @@ where
10331033
fields: fields_schema
10341034
.map(|field| {
10351035
let value = match values.get_mut(&field.name) {
1036-
Some(v) => {
1037-
Value::<VS>::from_json(std::mem::take(v), &field.value_type.typ)?
1038-
}
1036+
Some(v) => Value::<VS>::from_json(std::mem::take(v), &field.value_type.typ)
1037+
.with_context(|| {
1038+
format!("while deserializing field `{}`", field.name)
1039+
})?,
10391040
None => Value::<VS>::default(),
10401041
};
10411042
if value.is_null() && !field.value_type.nullable {
@@ -1137,7 +1138,7 @@ impl BasicValue {
11371138
v.as_f64()
11381139
.ok_or_else(|| anyhow::anyhow!("invalid fp64 value {v}"))?,
11391140
),
1140-
(v, BasicValueType::Range) => BasicValue::Range(serde_json::from_value(v)?),
1141+
(v, BasicValueType::Range) => BasicValue::Range(utils::deser::from_json_value(v)?),
11411142
(serde_json::Value::String(v), BasicValueType::Uuid) => BasicValue::Uuid(v.parse()?),
11421143
(serde_json::Value::String(v), BasicValueType::Date) => BasicValue::Date(v.parse()?),
11431144
(serde_json::Value::String(v), BasicValueType::Time) => BasicValue::Time(v.parse()?),
@@ -1170,7 +1171,11 @@ impl BasicValue {
11701171
) => {
11711172
let vec = v
11721173
.into_iter()
1173-
.map(|v| BasicValue::from_json(v, element_type))
1174+
.enumerate()
1175+
.map(|(i, v)| {
1176+
BasicValue::from_json(v, element_type)
1177+
.with_context(|| format!("while deserializing Vector element #{i}"))
1178+
})
11741179
.collect::<Result<Vec<_>>>()?;
11751180
BasicValue::Vector(Arc::from(vec))
11761181
}
@@ -1267,7 +1272,11 @@ where
12671272
TableKind::UTable => {
12681273
let rows = v
12691274
.into_iter()
1270-
.map(|v| Ok(FieldValues::from_json(v, &s.row.fields)?.into()))
1275+
.map(|v| {
1276+
Ok(FieldValues::from_json(v, &s.row.fields)
1277+
.with_context(|| format!("while deserializing UTable row"))?
1278+
.into())
1279+
})
12711280
.collect::<Result<Vec<_>>>()?;
12721281
Value::LTable(rows)
12731282
}
@@ -1289,10 +1298,13 @@ where
12891298
let mut field_vals_iter = v.into_iter();
12901299
let keys: Box<[KeyPart]> = (0..num_key_parts)
12911300
.map(|_| {
1301+
let field_schema = fields_iter.next().unwrap();
12921302
Self::from_json(
12931303
field_vals_iter.next().unwrap(),
1294-
&fields_iter.next().unwrap().value_type.typ,
1295-
)?
1304+
&field_schema.value_type.typ,
1305+
).with_context(|| {
1306+
format!("while deserializing key part `{}`", field_schema.name)
1307+
})?
12961308
.into_key()
12971309
})
12981310
.collect::<Result<_>>()?;
@@ -1328,7 +1340,14 @@ where
13281340
TableKind::LTable => {
13291341
let rows = v
13301342
.into_iter()
1331-
.map(|v| Ok(FieldValues::from_json(v, &s.row.fields)?.into()))
1343+
.enumerate()
1344+
.map(|(i, v)| {
1345+
Ok(FieldValues::from_json(v, &s.row.fields)
1346+
.with_context(|| {
1347+
format!("while deserializing LTable row #{i}")
1348+
})?
1349+
.into())
1350+
})
13321351
.collect::<Result<Vec<_>>>()?;
13331352
Value::LTable(rows)
13341353
}

src/llm/anthropic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl LlmGenerationClient for Client {
138138
match &mut resp_json["content"][0]["text"] {
139139
serde_json::Value::String(s) => {
140140
// Try strict JSON parsing first
141-
match serde_json::from_str::<serde_json::Value>(s) {
141+
match utils::deser::from_json_str::<serde_json::Value>(s) {
142142
Ok(_) => std::mem::take(s),
143143
Err(e) => {
144144
// Try permissive json5 parsing as fallback

src/llm/gemini.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl LlmGenerationClient for VertexAiClient {
276276
generation_config = Some(
277277
GenerationConfig::new()
278278
.set_response_mime_type("application/json".to_string())
279-
.set_response_schema(serde_json::from_value::<Schema>(schema_json)?),
279+
.set_response_schema(utils::deser::from_json_value::<Schema>(schema_json)?),
280280
);
281281
}
282282

@@ -358,7 +358,7 @@ impl LlmEmbeddingClient for VertexAiClient {
358358
.next()
359359
.and_then(|mut e| e.get_mut("embeddings").map(|v| v.take()))
360360
.ok_or_else(|| anyhow::anyhow!("No embeddings in response"))?;
361-
let embedding: ContentEmbedding = serde_json::from_value(embeddings)?;
361+
let embedding: ContentEmbedding = utils::deser::from_json_value(embeddings)?;
362362
Ok(super::LlmEmbeddingResponse {
363363
embedding: embedding.values,
364364
})

src/ops/factory_bases.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl<T: SourceFactoryBase> SourceFactory for T {
245245
EnrichedValueType,
246246
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
247247
)> {
248-
let spec: T::Spec = serde_json::from_value(spec)
248+
let spec: T::Spec = utils::deser::from_json_value(spec)
249249
.with_context(|| format!("Failed in parsing spec for source `{source_name}`"))?;
250250
let output_schema = self.get_output_schema(&spec, &context).await?;
251251
let source_name = source_name.to_string();
@@ -324,7 +324,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
324324
EnrichedValueType,
325325
BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
326326
)> {
327-
let spec: T::Spec = serde_json::from_value(spec)
327+
let spec: T::Spec = utils::deser::from_json_value(spec)
328328
.with_context(|| format!("Failed in parsing spec for function `{}`", self.name()))?;
329329
let mut nonnull_args_idx = vec![];
330330
let mut may_nullify_output = false;
@@ -399,7 +399,7 @@ pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
399399
/// Deserialize the setup key from a JSON value.
400400
/// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
401401
fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
402-
Ok(serde_json::from_value(key)?)
402+
Ok(utils::deser::from_json_value(key)?)
403403
}
404404

405405
/// Will not be called if it's setup by user.
@@ -468,7 +468,7 @@ impl<T: TargetFactoryBase> TargetFactory for T {
468468
.into_iter()
469469
.map(|d| {
470470
anyhow::Ok(TypedExportDataCollectionSpec {
471-
spec: serde_json::from_value(d.spec).with_context(|| {
471+
spec: utils::deser::from_json_value(d.spec).with_context(|| {
472472
format!("Failed in parsing spec for target `{}`", d.name)
473473
})?,
474474
name: d.name,
@@ -480,7 +480,7 @@ impl<T: TargetFactoryBase> TargetFactory for T {
480480
.collect::<Result<Vec<_>>>()?,
481481
declarations
482482
.into_iter()
483-
.map(|d| anyhow::Ok(serde_json::from_value(d)?))
483+
.map(|d| anyhow::Ok(utils::deser::from_json_value(d)?))
484484
.collect::<Result<Vec<_>>>()?,
485485
context,
486486
)
@@ -515,7 +515,7 @@ impl<T: TargetFactoryBase> TargetFactory for T {
515515
) -> Result<Box<dyn setup::ResourceSetupChange>> {
516516
let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
517517
let desired_state: Option<T::SetupState> = desired_state
518-
.map(|v| serde_json::from_value(v.clone()))
518+
.map(|v| utils::deser::from_json_value(v.clone()))
519519
.transpose()?;
520520
let existing_states = from_json_combined_state(existing_states)?;
521521
let setup_change = TargetFactoryBase::diff_setup_states(
@@ -546,8 +546,8 @@ impl<T: TargetFactoryBase> TargetFactory for T {
546546
) -> Result<SetupStateCompatibility> {
547547
let result = TargetFactoryBase::check_state_compatibility(
548548
self,
549-
&serde_json::from_value(desired_state.clone())?,
550-
&serde_json::from_value(existing_state.clone())?,
549+
&utils::deser::from_json_value(desired_state.clone())?,
550+
&utils::deser::from_json_value(existing_state.clone())?,
551551
)?;
552552
Ok(result)
553553
}
@@ -600,7 +600,7 @@ impl<T: TargetFactoryBase> TargetFactory for T {
600600
.into_iter()
601601
.map(|item| -> anyhow::Result<_> {
602602
Ok(TypedResourceSetupChangeItem {
603-
key: serde_json::from_value(item.key.clone())?,
603+
key: utils::deser::from_json_value(item.key.clone())?,
604604
setup_change: (item.setup_change as &dyn Any)
605605
.downcast_ref::<T::SetupChange>()
606606
.ok_or_else(invariance_violation)?,
@@ -618,15 +618,15 @@ fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
618618
Ok(setup::CombinedState {
619619
current: existing_states
620620
.current
621-
.map(|v| serde_json::from_value(v))
621+
.map(|v| utils::deser::from_json_value(v))
622622
.transpose()?,
623623
staging: existing_states
624624
.staging
625625
.into_iter()
626626
.map(|v| {
627627
anyhow::Ok(match v {
628628
setup::StateChange::Upsert(v) => {
629-
setup::StateChange::Upsert(serde_json::from_value(v)?)
629+
setup::StateChange::Upsert(utils::deser::from_json_value(v)?)
630630
}
631631
setup::StateChange::Delete => setup::StateChange::Delete,
632632
})

src/ops/functions/extract_by_llm.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl SimpleFunctionExecutor for Executor {
113113
}),
114114
};
115115
let res = self.client.generate(req).await?;
116-
let json_value: serde_json::Value = serde_json::from_str(res.text.as_str())?;
116+
let json_value: serde_json::Value = utils::deser::from_json_str(res.text.as_str())?;
117117
let value = self.value_extractor.extract_value(json_value)?;
118118
Ok(value)
119119
}

src/ops/functions/parse_json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ fn add_language(
2929
}
3030

3131
fn parse_json(text: &str) -> Result<serde_json::Value> {
32-
Ok(serde_json::from_str(text)?)
32+
Ok(utils::deser::from_json_str(text)?)
3333
}
3434

3535
static PARSE_FN_BY_LANG: LazyLock<HashMap<UniCase<&'static str>, Arc<LanguageConfig>>> =

src/ops/sources/amazon_s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ impl Executor {
241241
let mut change_messages = vec![];
242242
for message in messages.into_iter() {
243243
if let Some(body) = message.body {
244-
let notification: S3EventNotification = serde_json::from_str(&body)?;
244+
let notification: S3EventNotification = utils::deser::from_json_str(&body)?;
245245
let mut changes = vec![];
246246
for record in notification.records {
247247
let s3 = if let Some(s3) = record.s3 {

src/ops/sources/postgres.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ impl PostgresSourceExecutor {
600600
}
601601

602602
fn parse_notification_payload(&self, notification: &PgNotification) -> Result<SourceChange> {
603-
let mut payload: serde_json::Value = serde_json::from_str(notification.payload())?;
603+
let mut payload: serde_json::Value = utils::deser::from_json_str(notification.payload())?;
604604
let payload = payload
605605
.as_object_mut()
606606
.ok_or_else(|| anyhow::anyhow!("'fields' field is not an object"))?;

0 commit comments

Comments
 (0)