Skip to content

Commit 93e2890

Browse files
authored
fix(data-slice): make data slice types reflect newly added fields (#825)
1 parent cfe277e commit 93e2890

File tree

3 files changed

+66
-79
lines changed

3 files changed

+66
-79
lines changed

src/base/spec.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ impl fmt::Display for StructMapping {
161161
pub enum ValueMapping {
162162
Constant(ConstantMapping),
163163
Field(FieldMapping),
164-
Struct(StructMapping),
165164
// TODO: Add support for collections
166165
}
167166

@@ -189,15 +188,6 @@ impl std::fmt::Display for ValueMapping {
189188
ValueMapping::Field(v) => {
190189
write!(f, "{}.{}", v.scope.as_deref().unwrap_or(""), v.field_path)
191190
}
192-
ValueMapping::Struct(v) => write!(
193-
f,
194-
"Struct({})",
195-
v.fields
196-
.iter()
197-
.map(|f| format!("{}={}", f.name, f.spec))
198-
.collect::<Vec<_>>()
199-
.join(", ")
200-
),
201191
}
202192
}
203193
}

src/builder/analyzer.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ impl DataScopeBuilder {
322322
})
323323
}
324324

325+
/// Must be called on an non-empty field path.
325326
pub fn analyze_field_path<'a>(
326327
&'a self,
327328
field_path: &'_ FieldPath,
@@ -332,6 +333,10 @@ impl DataScopeBuilder {
332333
let mut indices = Vec::with_capacity(field_path.len());
333334
let mut struct_schema = &self.data;
334335

336+
if field_path.is_empty() {
337+
bail!("Field path is empty");
338+
}
339+
335340
let mut i = 0;
336341
let value_type = loop {
337342
let field_name = &field_path[i];
@@ -597,21 +602,6 @@ fn analyze_value_mapping(
597602
EnrichedValueType::from_alternative(value_type)?,
598603
)
599604
}
600-
601-
ValueMapping::Struct(v) => {
602-
let (struct_mapping, field_schemas) = analyze_struct_mapping(v, op_scope)?;
603-
(
604-
AnalyzedValueMapping::Struct(struct_mapping),
605-
EnrichedValueType {
606-
typ: ValueType::Struct(StructSchema {
607-
fields: Arc::new(field_schemas),
608-
description: None,
609-
}),
610-
nullable: false,
611-
attrs: Default::default(),
612-
},
613-
)
614-
}
615605
};
616606
Ok(result)
617607
}

src/builder/flow_builder.rs

Lines changed: 61 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use crate::{prelude::*, py::Pythonized};
1+
use crate::{base::schema::EnrichedValueType, prelude::*, py::Pythonized};
22

33
use pyo3::{exceptions::PyException, prelude::*};
44
use pyo3_async_runtimes::tokio::future_into_py;
55
use std::{collections::btree_map, ops::Deref};
66
use tokio::task::LocalSet;
77

88
use super::analyzer::{
9-
AnalyzerContext, CollectorBuilder, DataScopeBuilder, OpScope, build_flow_instance_context,
9+
AnalyzerContext, CollectorBuilder, DataScopeBuilder, OpScope, ValueTypeBuilder,
10+
build_flow_instance_context,
1011
};
1112
use crate::{
1213
base::{
@@ -95,13 +96,12 @@ impl DataType {
9596
pub struct DataSlice {
9697
scope: Arc<OpScope>,
9798
value: Arc<spec::ValueMapping>,
98-
data_type: DataType,
9999
}
100100

101101
#[pymethods]
102102
impl DataSlice {
103-
pub fn data_type(&self) -> DataType {
104-
self.data_type.clone()
103+
pub fn data_type(&self) -> PyResult<DataType> {
104+
Ok(DataType::from(self.value_type().into_py_result()?))
105105
}
106106

107107
pub fn __str__(&self) -> String {
@@ -113,36 +113,32 @@ impl DataSlice {
113113
}
114114

115115
pub fn field(&self, field_name: &str) -> PyResult<Option<DataSlice>> {
116-
let field_schema = match &self.data_type.schema.typ {
117-
schema::ValueType::Struct(struct_type) => {
118-
match struct_type.fields.iter().find(|f| f.name == field_name) {
119-
Some(field) => field,
120-
None => return Ok(None),
116+
let value_mapping = match self.value.as_ref() {
117+
spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => {
118+
let data_scope_builder = self.scope.data.lock().unwrap();
119+
let struct_schema = {
120+
let (_, val_type) = data_scope_builder
121+
.analyze_field_path(field_path)
122+
.into_py_result()?;
123+
match &val_type.typ {
124+
ValueTypeBuilder::Struct(struct_type) => struct_type,
125+
_ => return Err(PyException::new_err("expect struct type in field path")),
126+
}
127+
};
128+
if struct_schema.find_field(field_name).is_none() {
129+
return Ok(None);
121130
}
131+
spec::ValueMapping::Field(spec::FieldMapping {
132+
scope: scope.clone(),
133+
field_path: spec::FieldPath(
134+
field_path
135+
.iter()
136+
.cloned()
137+
.chain([field_name.to_string()])
138+
.collect(),
139+
),
140+
})
122141
}
123-
_ => return Err(PyException::new_err("expect struct type")),
124-
};
125-
let value_mapping = match self.value.as_ref() {
126-
spec::ValueMapping::Field(spec::FieldMapping {
127-
scope,
128-
field_path: spec::FieldPath(field_path),
129-
}) => spec::ValueMapping::Field(spec::FieldMapping {
130-
scope: scope.clone(),
131-
field_path: spec::FieldPath(
132-
field_path
133-
.iter()
134-
.cloned()
135-
.chain([field_name.to_string()])
136-
.collect(),
137-
),
138-
}),
139-
140-
spec::ValueMapping::Struct(v) => v
141-
.fields
142-
.iter()
143-
.find(|f| f.name == field_name)
144-
.map(|f| f.spec.clone())
145-
.ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?,
146142

147143
spec::ValueMapping::Constant { .. } => {
148144
return Err(PyException::new_err(
@@ -153,7 +149,6 @@ impl DataSlice {
153149
Ok(Some(DataSlice {
154150
scope: self.scope.clone(),
155151
value: Arc::new(value_mapping),
156-
data_type: field_schema.value_type.clone().into(),
157152
}))
158153
}
159154
}
@@ -168,15 +163,28 @@ impl DataSlice {
168163
v => v.clone(),
169164
}
170165
}
166+
167+
fn value_type(&self) -> Result<schema::EnrichedValueType> {
168+
let result = match self.value.as_ref() {
169+
spec::ValueMapping::Constant(c) => c.schema.clone(),
170+
spec::ValueMapping::Field(v) => {
171+
let data_scope_builder = self.scope.data.lock().unwrap();
172+
let (_, val_type) = data_scope_builder.analyze_field_path(&v.field_path)?;
173+
EnrichedValueType::from_alternative(val_type)?
174+
}
175+
};
176+
Ok(result)
177+
}
171178
}
172179

173180
impl std::fmt::Display for DataSlice {
174181
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175-
write!(
176-
f,
177-
"DataSlice({}; {} {}) ",
178-
self.data_type.schema, self.scope, self.value
179-
)?;
182+
write!(f, "DataSlice(")?;
183+
match self.value_type() {
184+
Ok(value_type) => write!(f, "{value_type}")?,
185+
Err(e) => write!(f, "<error: {}>", e)?,
186+
}
187+
write!(f, "; {} {}) ", self.scope, self.value)?;
180188
Ok(())
181189
}
182190
}
@@ -333,7 +341,6 @@ impl FlowBuilder {
333341
schema: schema.clone(),
334342
value: serde_json::to_value(value).into_py_result()?,
335343
})),
336-
data_type: schema.into(),
337344
};
338345
Ok(slice)
339346
}
@@ -510,11 +517,14 @@ impl FlowBuilder {
510517
let collector_schema = CollectorSchema::from_fields(
511518
fields
512519
.into_iter()
513-
.map(|(name, ds)| FieldSchema {
514-
name,
515-
value_type: ds.data_type.schema,
520+
.map(|(name, ds)| {
521+
Ok(FieldSchema {
522+
name,
523+
value_type: ds.value_type()?,
524+
})
516525
})
517-
.collect(),
526+
.collect::<Result<Vec<FieldSchema>>>()
527+
.into_py_result()?,
518528
auto_uuid_field,
519529
);
520530
{
@@ -567,22 +577,20 @@ impl FlowBuilder {
567577
}
568578

569579
pub fn scope_field(&self, scope: OpScopeRef, field_name: &str) -> PyResult<Option<DataSlice>> {
570-
let field_type = {
580+
{
571581
let scope_builder = scope.0.data.lock().unwrap();
572-
let (_, field_schema) = scope_builder
573-
.data
574-
.find_field(field_name)
575-
.ok_or_else(|| PyException::new_err(format!("field {field_name} not found")))?;
576-
schema::EnrichedValueType::from_alternative(&field_schema.value_type)
577-
.into_py_result()?
578-
};
582+
if scope_builder.data.find_field(field_name).is_none() {
583+
return Err(PyException::new_err(format!(
584+
"field {field_name} not found"
585+
)));
586+
}
587+
}
579588
Ok(Some(DataSlice {
580589
scope: scope.0,
581590
value: Arc::new(spec::ValueMapping::Field(spec::FieldMapping {
582591
scope: None,
583592
field_path: spec::FieldPath(vec![field_name.to_string()]),
584593
})),
585-
data_type: DataType { schema: field_type },
586594
}))
587595
}
588596

@@ -730,7 +738,6 @@ impl FlowBuilder {
730738
scope: None,
731739
field_path: spec::FieldPath(vec![last_field.name.clone()]),
732740
})),
733-
data_type: schema::EnrichedValueType::from_alternative(&last_field.value_type)?.into(),
734741
};
735742
Ok(result)
736743
}

0 commit comments

Comments
 (0)