Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ rustls = { version = "0.23.25" }
http-body-util = "0.1.3"
yaml-rust2 = "0.10.0"
urlencoding = "2.1.3"
uuid = "1.16.0"
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }
58 changes: 55 additions & 3 deletions src/base/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub struct CollectionSchema {
pub row: StructSchema,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub collectors: Vec<NamedSpec<StructSchema>>,
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
}

impl CollectionSchema {
Expand Down Expand Up @@ -157,7 +157,7 @@ impl CollectionSchema {
.iter()
.map(|c| NamedSpec {
name: c.name.clone(),
spec: c.spec.without_attrs(),
spec: Arc::from(c.spec.without_attrs()),
})
.collect(),
}
Expand Down Expand Up @@ -339,13 +339,65 @@ impl std::fmt::Display for FieldSchema {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CollectorSchema {
pub fields: Vec<FieldSchema>,
/// If specified, the collector will have an automatically generated UUID field with the given index.
pub auto_uuid_field_idx: Option<usize>,
}

impl std::fmt::Display for CollectorSchema {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Collector(")?;
for (i, field) in self.fields.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", field)?;
}
write!(f, ")")
}
}

impl CollectorSchema {
pub fn from_fields(fields: Vec<FieldSchema>, has_auto_uuid_field: bool) -> Self {
let mut fields = fields;
let auto_uuid_field_idx = if has_auto_uuid_field {
fields.insert(
0,
FieldSchema::new(
"uuid".to_string(),
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Uuid),
nullable: false,
attrs: Default::default(),
},
),
);
Some(0)
} else {
None
};
Self {
fields,
auto_uuid_field_idx,
}
}
pub fn without_attrs(&self) -> Self {
Self {
fields: self.fields.iter().map(|f| f.without_attrs()).collect(),
auto_uuid_field_idx: self.auto_uuid_field_idx,
}
}
}

/// Top-level schema for a flow instance.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSchema {
pub schema: StructSchema,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub collectors: Vec<NamedSpec<StructSchema>>,
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
}

impl Deref for DataSchema {
Expand Down
6 changes: 6 additions & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,15 @@ pub struct ForEachOpSpec {
/// Emit data to a given collector at the given scope.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectOpSpec {
/// Field values to be collected.
pub input: StructMapping,
/// Scope for the collector.
pub scope_name: ScopeName,
/// Name of the collector.
pub collector_name: FieldName,
/// If specified, the collector will have an automatically generated UUID field with the given name.
/// The uuid will remain stable when collected input values remain unchanged.
pub auto_uuid_field: Option<FieldName>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
Expand Down
135 changes: 79 additions & 56 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn try_make_common_value_type(
ValueType::Basic(basic_type1.clone())
}
(ValueType::Struct(struct_type1), ValueType::Struct(struct_type2)) => {
let common_schema = try_make_common_struct_schemas(struct_type1, struct_type2)?;
let common_schema = try_merge_struct_schemas(struct_type1, struct_type2)?;
ValueType::Struct(common_schema)
}
(ValueType::Collection(collection_type1), ValueType::Collection(collection_type2)) => {
Expand All @@ -190,7 +190,7 @@ fn try_make_common_value_type(
collection_type2
);
}
let row = try_make_common_struct_schemas(&collection_type1.row, &collection_type2.row)?;
let row = try_merge_struct_schemas(&collection_type1.row, &collection_type2.row)?;

if collection_type1.collectors.len() != collection_type2.collectors.len() {
api_bail!(
Expand All @@ -213,7 +213,7 @@ fn try_make_common_value_type(
}
let collector = NamedSpec {
name: c1.name.clone(),
spec: try_make_common_struct_schemas(&c1.spec, &c2.spec)?,
spec: Arc::new(try_merge_collector_schemas(&c1.spec, &c2.spec)?),
};
Ok(collector)
})
Expand Down Expand Up @@ -258,72 +258,95 @@ fn try_make_common_value_type(
})
}

fn try_make_common_struct_schemas(
schema1: &StructSchema,
schema2: &StructSchema,
) -> Result<StructSchema> {
if schema1.fields.len() != schema2.fields.len() {
fn try_merge_fields_schemas(
schema1: &[FieldSchema],
schema2: &[FieldSchema],
) -> Result<Vec<FieldSchema>> {
if schema1.len() != schema2.len() {
api_bail!(
"Structs are not compatible as they have different fields count:\n {}\n {}\n",
schema1,
"Fields are not compatible as they have different fields count:\n ({})\n ({})\n",
schema1
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(", "),
schema2
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(", ")
);
}
let mut result_fields = Vec::with_capacity(schema1.fields.len());
for (field1, field2) in schema1.fields.iter().zip(schema2.fields.iter()) {
let mut result_fields = Vec::with_capacity(schema1.len());
for (field1, field2) in schema1.iter().zip(schema2.iter()) {
if field1.name != field2.name {
api_bail!(
"Structs are not compatible as they have incompatible field names `{}` vs `{}`:\n {}\n {}\n",
"Structs are not compatible as they have incompatible field names `{}` vs `{}`",
field1.name,
field2.name,
schema1,
schema2
field2.name
);
}
result_fields.push(FieldSchema {
name: field1.name.clone(),
value_type: try_make_common_value_type(&field1.value_type, &field2.value_type)?,
});
}
Ok(result_fields)
}

fn try_merge_struct_schemas(
schema1: &StructSchema,
schema2: &StructSchema,
) -> Result<StructSchema> {
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
Ok(StructSchema {
fields: Arc::new(result_fields),
fields: Arc::new(fields),
description: schema1
.description
.clone()
.or_else(|| schema2.description.clone()),
})
}

fn try_merge_collector_schemas(
schema1: &CollectorSchema,
schema2: &CollectorSchema,
) -> Result<CollectorSchema> {
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
Ok(CollectorSchema {
fields,
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
schema1.auto_uuid_field_idx
} else {
None
},
})
}

#[derive(Debug)]
pub(super) struct CollectorBuilder {
pub schema: StructSchema,
pub schema: Arc<CollectorSchema>,
pub is_used: bool,
}

impl CollectorBuilder {
pub fn new(schema: StructSchema) -> Self {
pub fn new(schema: Arc<CollectorSchema>) -> Self {
Self {
schema,
is_used: false,
}
}

pub fn merge_schema(&mut self, schema: &StructSchema) -> Result<()> {
pub fn merge_schema(&mut self, schema: &CollectorSchema) -> Result<()> {
if self.is_used {
api_bail!("Collector is already used");
}
let common_schema =
try_make_common_struct_schemas(&self.schema, schema).with_context(|| {
format!(
"Collectors are sent with entries in incompatible schemas:\n {}\n {}\n",
self.schema, schema
)
})?;
self.schema = common_schema;
let existing_schema = Arc::make_mut(&mut self.schema);
*existing_schema = try_merge_collector_schemas(existing_schema, schema)?;
Ok(())
}

pub fn use_schema(&mut self) -> StructSchema {
pub fn use_schema(&mut self) -> Arc<CollectorSchema> {
self.is_used = true;
self.schema.clone()
}
Expand Down Expand Up @@ -401,7 +424,7 @@ impl DataScopeBuilder {
pub fn consume_collector(
&self,
collector_name: &FieldName,
) -> Result<(AnalyzedLocalCollectorReference, StructSchema)> {
) -> Result<(AnalyzedLocalCollectorReference, Arc<CollectorSchema>)> {
let mut collectors = self.collectors.lock().unwrap();
let (collector_idx, _, collector) = collectors
.get_full_mut(collector_name)
Expand All @@ -417,7 +440,7 @@ impl DataScopeBuilder {
pub fn add_collector(
&self,
collector_name: FieldName,
schema: StructSchema,
schema: CollectorSchema,
) -> Result<AnalyzedLocalCollectorReference> {
let mut collectors = self.collectors.lock().unwrap();
let collector_idx = collectors.len() as u32;
Expand All @@ -426,7 +449,7 @@ impl DataScopeBuilder {
entry.get_mut().merge_schema(&schema)?;
}
indexmap::map::Entry::Vacant(entry) => {
entry.insert(CollectorBuilder::new(schema));
entry.insert(CollectorBuilder::new(Arc::new(schema)));
}
}
Ok(AnalyzedLocalCollectorReference { collector_idx })
Expand Down Expand Up @@ -474,7 +497,7 @@ fn find_scope<'a>(
fn analyze_struct_mapping(
mapping: &StructMapping,
scopes: RefList<'_, &'_ ExecutionScope<'_>>,
) -> Result<(AnalyzedStructMapping, StructSchema)> {
) -> Result<(AnalyzedStructMapping, Vec<FieldSchema>)> {
let mut field_mappings = Vec::with_capacity(mapping.fields.len());
let mut field_schemas = Vec::with_capacity(mapping.fields.len());
for field in mapping.fields.iter() {
Expand All @@ -489,10 +512,7 @@ fn analyze_struct_mapping(
AnalyzedStructMapping {
fields: field_mappings,
},
StructSchema {
fields: Arc::new(field_schemas),
description: None,
},
field_schemas,
))
}

Expand Down Expand Up @@ -523,11 +543,14 @@ fn analyze_value_mapping(
}

ValueMapping::Struct(v) => {
let (struct_mapping, struct_schema) = analyze_struct_mapping(v, scopes)?;
let (struct_mapping, field_schemas) = analyze_struct_mapping(v, scopes)?;
(
AnalyzedValueMapping::Struct(struct_mapping),
EnrichedValueType {
typ: ValueType::Struct(struct_schema),
typ: ValueType::Struct(StructSchema {
fields: Arc::new(field_schemas),
description: None,
}),
nullable: false,
attrs: Default::default(),
},
Expand Down Expand Up @@ -556,7 +579,7 @@ fn analyze_input_fields(
fn add_collector(
scope_name: &ScopeName,
collector_name: FieldName,
schema: StructSchema,
schema: CollectorSchema,
scopes: RefList<'_, &'_ ExecutionScope<'_>>,
) -> Result<AnalyzedCollectorReference> {
let (scope_up_level, scope) = find_scope(scope_name, scopes)?;
Expand Down Expand Up @@ -766,22 +789,22 @@ impl AnalyzerContext<'_> {

ReactiveOpSpec::Collect(op) => {
let scopes = parent_scopes.prepend(scope);
let (struct_mapping, struct_schema) = analyze_struct_mapping(&op.input, scopes)?;
let collector_ref = add_collector(
&op.scope_name,
op.collector_name.clone(),
struct_schema,
scopes,
)?;
let op_name = reactive_op.name.clone();
async move {
Ok(AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
name: op_name,
input: struct_mapping,
collector_ref,
}))
}
.boxed()
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, scopes)?;
let has_auto_uuid_field = op.auto_uuid_field.is_some();
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
name: reactive_op.name.clone(),
has_auto_uuid_field,
input: struct_mapping,
collector_ref: add_collector(
&op.scope_name,
op.collector_name.clone(),
CollectorSchema::from_fields(fields_schema, has_auto_uuid_field),
scopes,
)?,
fingerprinter,
});
async move { Ok(collect_op) }.boxed()
}
};
Ok(result_fut)
Expand Down
Loading