Skip to content

Commit 794ec2f

Browse files
authored
Serializing/deserializing struct to JSON object for Postgres. (#65)
1 parent 8395a4b commit 794ec2f

File tree

3 files changed

+155
-36
lines changed

3 files changed

+155
-36
lines changed

src/base/value.rs

Lines changed: 150 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::Result;
55
use base64::prelude::*;
66
use serde::{
77
de::{SeqAccess, Visitor},
8-
ser::{SerializeSeq, SerializeTuple},
8+
ser::{SerializeMap, SerializeSeq, SerializeTuple},
99
Deserialize, Serialize,
1010
};
1111
use std::{collections::BTreeMap, ops::Deref, sync::Arc};
@@ -577,30 +577,58 @@ where
577577
Self { fields }
578578
}
579579

580-
pub fn from_json_values(
581-
values: impl Iterator<Item = serde_json::Value>,
582-
schema: &[FieldSchema],
580+
fn from_json_values<'a>(
581+
fields: impl Iterator<Item = (&'a FieldSchema, serde_json::Value)>,
583582
) -> Result<Self> {
584-
let fields = values
585-
.zip(schema)
586-
.map(|(v, s)| {
587-
let value = Value::<VS>::from_json(v, &s.value_type.typ)?;
588-
if value.is_null() && !s.value_type.nullable {
589-
api_bail!("expected non-null value for `{}`", s.name);
590-
}
591-
Ok(value)
592-
})
593-
.collect::<Result<Vec<_>>>()?;
594-
Ok(Self { fields })
583+
Ok(Self {
584+
fields: fields
585+
.map(|(s, v)| {
586+
let value = Value::<VS>::from_json(v, &s.value_type.typ)?;
587+
if value.is_null() && !s.value_type.nullable {
588+
api_bail!("expected non-null value for `{}`", s.name);
589+
}
590+
Ok(value)
591+
})
592+
.collect::<Result<Vec<_>>>()?,
593+
})
594+
}
595+
596+
fn from_json_object<'a>(
597+
values: serde_json::Map<String, serde_json::Value>,
598+
fields_schema: impl Iterator<Item = &'a FieldSchema>,
599+
) -> Result<Self> {
600+
let mut values = values;
601+
Ok(Self {
602+
fields: fields_schema
603+
.map(|field| {
604+
let value = match values.get_mut(&field.name) {
605+
Some(v) => {
606+
Value::<VS>::from_json(std::mem::take(v), &field.value_type.typ)?
607+
}
608+
None => Value::<VS>::default(),
609+
};
610+
if value.is_null() && !field.value_type.nullable {
611+
api_bail!("expected non-null value for `{}`", field.name);
612+
}
613+
Ok(value)
614+
})
615+
.collect::<Result<Vec<_>>>()?,
616+
})
595617
}
596618

597-
pub fn from_json(value: serde_json::Value, schema: &[FieldSchema]) -> Result<Self> {
619+
pub fn from_json<'a>(value: serde_json::Value, fields_schema: &[FieldSchema]) -> Result<Self> {
598620
match value {
599621
serde_json::Value::Array(v) => {
600-
if v.len() != schema.len() {
622+
if v.len() != fields_schema.len() {
601623
api_bail!("unmatched value length");
602624
}
603-
Self::from_json_values(v.into_iter(), &schema)
625+
Self::from_json_values(fields_schema.iter().zip(v.into_iter()))
626+
}
627+
serde_json::Value::Object(v) => {
628+
if v.len() != fields_schema.len() {
629+
api_bail!("unmatched value length");
630+
}
631+
Self::from_json_object(v, fields_schema.iter())
604632
}
605633
_ => api_bail!("invalid value type"),
606634
}
@@ -738,22 +766,45 @@ where
738766
CollectionKind::Table => {
739767
let rows = v
740768
.into_iter()
741-
.map(|v| match v {
742-
serde_json::Value::Array(v) => {
743-
let mut fields_iter = v.into_iter();
744-
let key = Self::from_json(
745-
fields_iter
746-
.next()
747-
.ok_or_else(|| api_error!("Empty struct field values"))?,
748-
&s.row.fields[0].value_type.typ,
749-
)?
750-
.to_key()?;
751-
let values =
752-
FieldValues::from_json_values(fields_iter, &s.row.fields[1..])?
753-
.into();
754-
Ok((key, values))
769+
.map(|v| {
770+
let mut fields_iter = s.row.fields.iter();
771+
let key_field = fields_iter
772+
.next()
773+
.ok_or_else(|| api_error!("Empty struct field values"))?;
774+
775+
match v {
776+
serde_json::Value::Array(v) => {
777+
let mut field_vals_iter = v.into_iter();
778+
let key = Self::from_json(
779+
field_vals_iter.next().ok_or_else(|| {
780+
api_error!("Empty struct field values")
781+
})?,
782+
&key_field.value_type.typ,
783+
)?
784+
.to_key()?;
785+
let values = FieldValues::from_json_values(
786+
fields_iter.zip(field_vals_iter),
787+
)?;
788+
Ok((key, values.into()))
789+
}
790+
serde_json::Value::Object(mut v) => {
791+
let key = Self::from_json(
792+
std::mem::take(v.get_mut(&key_field.name).ok_or_else(
793+
|| {
794+
api_error!(
795+
"key field `{}` doesn't exist in value",
796+
key_field.name
797+
)
798+
},
799+
)?),
800+
&key_field.value_type.typ,
801+
)?
802+
.to_key()?;
803+
let values = FieldValues::from_json_object(v, fields_iter)?;
804+
Ok((key, values.into()))
805+
}
806+
_ => api_bail!("Table value must be a JSON array or object"),
755807
}
756-
_ => api_bail!("Table value must be a JSON array"),
757808
})
758809
.collect::<Result<BTreeMap<_, _>>>()?;
759810
Value::Table(rows)
@@ -773,3 +824,69 @@ where
773824
Ok(result)
774825
}
775826
}
827+
828+
#[derive(Debug, Clone, Copy)]
829+
pub struct TypedValue<'a> {
830+
pub t: &'a ValueType,
831+
pub v: &'a Value,
832+
}
833+
834+
impl<'a> Serialize for TypedValue<'a> {
835+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
836+
match (self.t, self.v) {
837+
(ValueType::Basic(_), v) => v.serialize(serializer),
838+
(ValueType::Struct(s), Value::Struct(field_values)) => TypedFieldsValue {
839+
schema: s,
840+
values_iter: field_values.fields.iter(),
841+
}
842+
.serialize(serializer),
843+
(ValueType::Collection(c), Value::Collection(rows) | Value::List(rows)) => {
844+
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
845+
for row in rows {
846+
seq.serialize_element(&TypedFieldsValue {
847+
schema: &c.row,
848+
values_iter: row.fields.iter(),
849+
})?;
850+
}
851+
seq.end()
852+
}
853+
(ValueType::Collection(c), Value::Table(rows)) => {
854+
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
855+
for (k, v) in rows {
856+
seq.serialize_element(&TypedFieldsValue {
857+
schema: &c.row,
858+
values_iter: std::iter::once(&Value::from(k.clone()))
859+
.chain(v.fields.iter()),
860+
})?;
861+
}
862+
seq.end()
863+
}
864+
_ => Err(serde::ser::Error::custom(format!(
865+
"Incompatible value type: {:?} {:?}",
866+
self.t, self.v
867+
))),
868+
}
869+
}
870+
}
871+
872+
pub struct TypedFieldsValue<'a, I: Iterator<Item = &'a Value> + Clone> {
873+
schema: &'a StructSchema,
874+
values_iter: I,
875+
}
876+
877+
impl<'a, I: Iterator<Item = &'a Value> + Clone> Serialize for TypedFieldsValue<'a, I> {
878+
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
879+
let mut map = serializer.serialize_map(Some(self.schema.fields.len()))?;
880+
let values_iter = self.values_iter.clone();
881+
for (field, value) in self.schema.fields.iter().zip(values_iter) {
882+
map.serialize_entry(
883+
&field.name,
884+
&TypedValue {
885+
t: &field.value_type.typ,
886+
v: value,
887+
},
888+
)?;
889+
}
890+
map.end()
891+
}
892+
}

src/ops/py_factory.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{collections::BTreeMap, sync::Arc};
33
use axum::async_trait;
44
use blocking::unblock;
55
use futures::FutureExt;
6-
use log::warn;
76
use pyo3::{
87
exceptions::PyException,
98
pyclass, pymethods,

src/ops/storages/postgres.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ fn bind_key_field<'arg>(
8989

9090
fn bind_value_field<'arg>(
9191
builder: &mut sqlx::QueryBuilder<'arg, sqlx::Postgres>,
92-
field_schema: &FieldSchema,
92+
field_schema: &'arg FieldSchema,
9393
value: &'arg Value,
9494
) -> Result<()> {
9595
match &value {
@@ -145,7 +145,10 @@ fn bind_value_field<'arg>(
145145
builder.push("NULL");
146146
}
147147
v => {
148-
builder.push_bind(sqlx::types::Json(*v));
148+
builder.push_bind(sqlx::types::Json(TypedValue {
149+
t: &field_schema.value_type.typ,
150+
v,
151+
}));
149152
}
150153
};
151154
Ok(())

0 commit comments

Comments
 (0)