Skip to content

Commit 5be47e4

Browse files
authored
Rust core support for generating stable automatic UUID in collector (#210)
* Data type changes to add an auto uuid to collector. * Create a separate schema type `CollectorSchema`. * Introduce `auto_uuid_field_idx` field to `CollectorSchema`. * Introduce a `Fingerprinter` for collector fields schema. * Support auto UUID in `StoredMemoizationInfo` and `EvaluationMemory`. * Add the auto UUID field in the evaluator.
1 parent 9cfbced commit 5be47e4

File tree

14 files changed

+441
-243
lines changed

14 files changed

+441
-243
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ rustls = { version = "0.23.25" }
8989
http-body-util = "0.1.3"
9090
yaml-rust2 = "0.10.0"
9191
urlencoding = "2.1.3"
92-
uuid = "1.16.0"
92+
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }

src/base/schema.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub struct CollectionSchema {
125125
pub row: StructSchema,
126126

127127
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
128-
pub collectors: Vec<NamedSpec<StructSchema>>,
128+
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
129129
}
130130

131131
impl CollectionSchema {
@@ -157,7 +157,7 @@ impl CollectionSchema {
157157
.iter()
158158
.map(|c| NamedSpec {
159159
name: c.name.clone(),
160-
spec: c.spec.without_attrs(),
160+
spec: Arc::from(c.spec.without_attrs()),
161161
})
162162
.collect(),
163163
}
@@ -339,13 +339,65 @@ impl std::fmt::Display for FieldSchema {
339339
}
340340
}
341341

342+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
343+
pub struct CollectorSchema {
344+
pub fields: Vec<FieldSchema>,
345+
/// If specified, the collector will have an automatically generated UUID field with the given index.
346+
pub auto_uuid_field_idx: Option<usize>,
347+
}
348+
349+
impl std::fmt::Display for CollectorSchema {
350+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351+
write!(f, "Collector(")?;
352+
for (i, field) in self.fields.iter().enumerate() {
353+
if i > 0 {
354+
write!(f, ", ")?;
355+
}
356+
write!(f, "{}", field)?;
357+
}
358+
write!(f, ")")
359+
}
360+
}
361+
362+
impl CollectorSchema {
363+
pub fn from_fields(fields: Vec<FieldSchema>, has_auto_uuid_field: bool) -> Self {
364+
let mut fields = fields;
365+
let auto_uuid_field_idx = if has_auto_uuid_field {
366+
fields.insert(
367+
0,
368+
FieldSchema::new(
369+
"uuid".to_string(),
370+
EnrichedValueType {
371+
typ: ValueType::Basic(BasicValueType::Uuid),
372+
nullable: false,
373+
attrs: Default::default(),
374+
},
375+
),
376+
);
377+
Some(0)
378+
} else {
379+
None
380+
};
381+
Self {
382+
fields,
383+
auto_uuid_field_idx,
384+
}
385+
}
386+
pub fn without_attrs(&self) -> Self {
387+
Self {
388+
fields: self.fields.iter().map(|f| f.without_attrs()).collect(),
389+
auto_uuid_field_idx: self.auto_uuid_field_idx,
390+
}
391+
}
392+
}
393+
342394
/// Top-level schema for a flow instance.
343395
#[derive(Debug, Clone, Serialize, Deserialize)]
344396
pub struct DataSchema {
345397
pub schema: StructSchema,
346398

347399
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
348-
pub collectors: Vec<NamedSpec<StructSchema>>,
400+
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
349401
}
350402

351403
impl Deref for DataSchema {

src/base/spec.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,15 @@ pub struct ForEachOpSpec {
181181
/// Emit data to a given collector at the given scope.
182182
#[derive(Debug, Clone, Serialize, Deserialize)]
183183
pub struct CollectOpSpec {
184+
/// Field values to be collected.
184185
pub input: StructMapping,
186+
/// Scope for the collector.
185187
pub scope_name: ScopeName,
188+
/// Name of the collector.
186189
pub collector_name: FieldName,
190+
/// If specified, the collector will have an automatically generated UUID field with the given name.
191+
/// The uuid will remain stable when collected input values remain unchanged.
192+
pub auto_uuid_field: Option<FieldName>,
187193
}
188194

189195
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]

src/builder/analyzer.rs

Lines changed: 79 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ fn try_make_common_value_type(
179179
ValueType::Basic(basic_type1.clone())
180180
}
181181
(ValueType::Struct(struct_type1), ValueType::Struct(struct_type2)) => {
182-
let common_schema = try_make_common_struct_schemas(struct_type1, struct_type2)?;
182+
let common_schema = try_merge_struct_schemas(struct_type1, struct_type2)?;
183183
ValueType::Struct(common_schema)
184184
}
185185
(ValueType::Collection(collection_type1), ValueType::Collection(collection_type2)) => {
@@ -190,7 +190,7 @@ fn try_make_common_value_type(
190190
collection_type2
191191
);
192192
}
193-
let row = try_make_common_struct_schemas(&collection_type1.row, &collection_type2.row)?;
193+
let row = try_merge_struct_schemas(&collection_type1.row, &collection_type2.row)?;
194194

195195
if collection_type1.collectors.len() != collection_type2.collectors.len() {
196196
api_bail!(
@@ -213,7 +213,7 @@ fn try_make_common_value_type(
213213
}
214214
let collector = NamedSpec {
215215
name: c1.name.clone(),
216-
spec: try_make_common_struct_schemas(&c1.spec, &c2.spec)?,
216+
spec: Arc::new(try_merge_collector_schemas(&c1.spec, &c2.spec)?),
217217
};
218218
Ok(collector)
219219
})
@@ -258,72 +258,95 @@ fn try_make_common_value_type(
258258
})
259259
}
260260

261-
fn try_make_common_struct_schemas(
262-
schema1: &StructSchema,
263-
schema2: &StructSchema,
264-
) -> Result<StructSchema> {
265-
if schema1.fields.len() != schema2.fields.len() {
261+
fn try_merge_fields_schemas(
262+
schema1: &[FieldSchema],
263+
schema2: &[FieldSchema],
264+
) -> Result<Vec<FieldSchema>> {
265+
if schema1.len() != schema2.len() {
266266
api_bail!(
267-
"Structs are not compatible as they have different fields count:\n {}\n {}\n",
268-
schema1,
267+
"Fields are not compatible as they have different fields count:\n ({})\n ({})\n",
268+
schema1
269+
.iter()
270+
.map(|f| f.to_string())
271+
.collect::<Vec<_>>()
272+
.join(", "),
269273
schema2
274+
.iter()
275+
.map(|f| f.to_string())
276+
.collect::<Vec<_>>()
277+
.join(", ")
270278
);
271279
}
272-
let mut result_fields = Vec::with_capacity(schema1.fields.len());
273-
for (field1, field2) in schema1.fields.iter().zip(schema2.fields.iter()) {
280+
let mut result_fields = Vec::with_capacity(schema1.len());
281+
for (field1, field2) in schema1.iter().zip(schema2.iter()) {
274282
if field1.name != field2.name {
275283
api_bail!(
276-
"Structs are not compatible as they have incompatible field names `{}` vs `{}`:\n {}\n {}\n",
284+
"Structs are not compatible as they have incompatible field names `{}` vs `{}`",
277285
field1.name,
278-
field2.name,
279-
schema1,
280-
schema2
286+
field2.name
281287
);
282288
}
283289
result_fields.push(FieldSchema {
284290
name: field1.name.clone(),
285291
value_type: try_make_common_value_type(&field1.value_type, &field2.value_type)?,
286292
});
287293
}
294+
Ok(result_fields)
295+
}
296+
297+
fn try_merge_struct_schemas(
298+
schema1: &StructSchema,
299+
schema2: &StructSchema,
300+
) -> Result<StructSchema> {
301+
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
288302
Ok(StructSchema {
289-
fields: Arc::new(result_fields),
303+
fields: Arc::new(fields),
290304
description: schema1
291305
.description
292306
.clone()
293307
.or_else(|| schema2.description.clone()),
294308
})
295309
}
296310

311+
fn try_merge_collector_schemas(
312+
schema1: &CollectorSchema,
313+
schema2: &CollectorSchema,
314+
) -> Result<CollectorSchema> {
315+
let fields = try_merge_fields_schemas(&schema1.fields, &schema2.fields)?;
316+
Ok(CollectorSchema {
317+
fields,
318+
auto_uuid_field_idx: if schema1.auto_uuid_field_idx == schema2.auto_uuid_field_idx {
319+
schema1.auto_uuid_field_idx
320+
} else {
321+
None
322+
},
323+
})
324+
}
325+
297326
#[derive(Debug)]
298327
pub(super) struct CollectorBuilder {
299-
pub schema: StructSchema,
328+
pub schema: Arc<CollectorSchema>,
300329
pub is_used: bool,
301330
}
302331

303332
impl CollectorBuilder {
304-
pub fn new(schema: StructSchema) -> Self {
333+
pub fn new(schema: Arc<CollectorSchema>) -> Self {
305334
Self {
306335
schema,
307336
is_used: false,
308337
}
309338
}
310339

311-
pub fn merge_schema(&mut self, schema: &StructSchema) -> Result<()> {
340+
pub fn merge_schema(&mut self, schema: &CollectorSchema) -> Result<()> {
312341
if self.is_used {
313342
api_bail!("Collector is already used");
314343
}
315-
let common_schema =
316-
try_make_common_struct_schemas(&self.schema, schema).with_context(|| {
317-
format!(
318-
"Collectors are sent with entries in incompatible schemas:\n {}\n {}\n",
319-
self.schema, schema
320-
)
321-
})?;
322-
self.schema = common_schema;
344+
let existing_schema = Arc::make_mut(&mut self.schema);
345+
*existing_schema = try_merge_collector_schemas(existing_schema, schema)?;
323346
Ok(())
324347
}
325348

326-
pub fn use_schema(&mut self) -> StructSchema {
349+
pub fn use_schema(&mut self) -> Arc<CollectorSchema> {
327350
self.is_used = true;
328351
self.schema.clone()
329352
}
@@ -401,7 +424,7 @@ impl DataScopeBuilder {
401424
pub fn consume_collector(
402425
&self,
403426
collector_name: &FieldName,
404-
) -> Result<(AnalyzedLocalCollectorReference, StructSchema)> {
427+
) -> Result<(AnalyzedLocalCollectorReference, Arc<CollectorSchema>)> {
405428
let mut collectors = self.collectors.lock().unwrap();
406429
let (collector_idx, _, collector) = collectors
407430
.get_full_mut(collector_name)
@@ -417,7 +440,7 @@ impl DataScopeBuilder {
417440
pub fn add_collector(
418441
&self,
419442
collector_name: FieldName,
420-
schema: StructSchema,
443+
schema: CollectorSchema,
421444
) -> Result<AnalyzedLocalCollectorReference> {
422445
let mut collectors = self.collectors.lock().unwrap();
423446
let collector_idx = collectors.len() as u32;
@@ -426,7 +449,7 @@ impl DataScopeBuilder {
426449
entry.get_mut().merge_schema(&schema)?;
427450
}
428451
indexmap::map::Entry::Vacant(entry) => {
429-
entry.insert(CollectorBuilder::new(schema));
452+
entry.insert(CollectorBuilder::new(Arc::new(schema)));
430453
}
431454
}
432455
Ok(AnalyzedLocalCollectorReference { collector_idx })
@@ -474,7 +497,7 @@ fn find_scope<'a>(
474497
fn analyze_struct_mapping(
475498
mapping: &StructMapping,
476499
scopes: RefList<'_, &'_ ExecutionScope<'_>>,
477-
) -> Result<(AnalyzedStructMapping, StructSchema)> {
500+
) -> Result<(AnalyzedStructMapping, Vec<FieldSchema>)> {
478501
let mut field_mappings = Vec::with_capacity(mapping.fields.len());
479502
let mut field_schemas = Vec::with_capacity(mapping.fields.len());
480503
for field in mapping.fields.iter() {
@@ -489,10 +512,7 @@ fn analyze_struct_mapping(
489512
AnalyzedStructMapping {
490513
fields: field_mappings,
491514
},
492-
StructSchema {
493-
fields: Arc::new(field_schemas),
494-
description: None,
495-
},
515+
field_schemas,
496516
))
497517
}
498518

@@ -523,11 +543,14 @@ fn analyze_value_mapping(
523543
}
524544

525545
ValueMapping::Struct(v) => {
526-
let (struct_mapping, struct_schema) = analyze_struct_mapping(v, scopes)?;
546+
let (struct_mapping, field_schemas) = analyze_struct_mapping(v, scopes)?;
527547
(
528548
AnalyzedValueMapping::Struct(struct_mapping),
529549
EnrichedValueType {
530-
typ: ValueType::Struct(struct_schema),
550+
typ: ValueType::Struct(StructSchema {
551+
fields: Arc::new(field_schemas),
552+
description: None,
553+
}),
531554
nullable: false,
532555
attrs: Default::default(),
533556
},
@@ -556,7 +579,7 @@ fn analyze_input_fields(
556579
fn add_collector(
557580
scope_name: &ScopeName,
558581
collector_name: FieldName,
559-
schema: StructSchema,
582+
schema: CollectorSchema,
560583
scopes: RefList<'_, &'_ ExecutionScope<'_>>,
561584
) -> Result<AnalyzedCollectorReference> {
562585
let (scope_up_level, scope) = find_scope(scope_name, scopes)?;
@@ -766,22 +789,22 @@ impl AnalyzerContext<'_> {
766789

767790
ReactiveOpSpec::Collect(op) => {
768791
let scopes = parent_scopes.prepend(scope);
769-
let (struct_mapping, struct_schema) = analyze_struct_mapping(&op.input, scopes)?;
770-
let collector_ref = add_collector(
771-
&op.scope_name,
772-
op.collector_name.clone(),
773-
struct_schema,
774-
scopes,
775-
)?;
776-
let op_name = reactive_op.name.clone();
777-
async move {
778-
Ok(AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
779-
name: op_name,
780-
input: struct_mapping,
781-
collector_ref,
782-
}))
783-
}
784-
.boxed()
792+
let (struct_mapping, fields_schema) = analyze_struct_mapping(&op.input, scopes)?;
793+
let has_auto_uuid_field = op.auto_uuid_field.is_some();
794+
let fingerprinter = Fingerprinter::default().with(&fields_schema)?;
795+
let collect_op = AnalyzedReactiveOp::Collect(AnalyzedCollectOp {
796+
name: reactive_op.name.clone(),
797+
has_auto_uuid_field,
798+
input: struct_mapping,
799+
collector_ref: add_collector(
800+
&op.scope_name,
801+
op.collector_name.clone(),
802+
CollectorSchema::from_fields(fields_schema, has_auto_uuid_field),
803+
scopes,
804+
)?,
805+
fingerprinter,
806+
});
807+
async move { Ok(collect_op) }.boxed()
785808
}
786809
};
787810
Ok(result_fut)

0 commit comments

Comments
 (0)