Skip to content

Commit bac81a5

Browse files
authored
feat(op-scope): expose op type to FlowSchema & analyzer/builder cleanup (#429)
1 parent d925438 commit bac81a5

File tree

10 files changed

+548
-634
lines changed

10 files changed

+548
-634
lines changed

src/base/schema.rs

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use crate::builder::plan::AnalyzedValueMapping;
1+
use crate::prelude::*;
22

33
use super::spec::*;
4-
use anyhow::Result;
5-
use serde::{Deserialize, Serialize};
6-
use std::{collections::BTreeMap, ops::Deref, sync::Arc};
4+
use crate::builder::plan::AnalyzedValueMapping;
75

86
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
97
pub struct VectorTypeSchema {
@@ -141,9 +139,6 @@ impl std::fmt::Display for TableKind {
141139
pub struct TableSchema {
142140
pub kind: TableKind,
143141
pub row: StructSchema,
144-
145-
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
146-
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
147142
}
148143

149144
impl TableSchema {
@@ -170,36 +165,19 @@ impl TableSchema {
170165
Self {
171166
kind: self.kind,
172167
row: self.row.without_attrs(),
173-
collectors: self
174-
.collectors
175-
.iter()
176-
.map(|c| NamedSpec {
177-
name: c.name.clone(),
178-
spec: Arc::from(c.spec.without_attrs()),
179-
})
180-
.collect(),
181168
}
182169
}
183170
}
184171

185172
impl std::fmt::Display for TableSchema {
186173
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187-
write!(f, "{}({}", self.kind, self.row)?;
188-
for collector in self.collectors.iter() {
189-
write!(f, "; COLLECTOR {} ({})", collector.name, collector.spec)?;
190-
}
191-
write!(f, ")")?;
192-
Ok(())
174+
write!(f, "{}({})", self.kind, self.row)
193175
}
194176
}
195177

196178
impl TableSchema {
197179
pub fn new(kind: TableKind, row: StructSchema) -> Self {
198-
Self {
199-
kind,
200-
row,
201-
collectors: Default::default(),
202-
}
180+
Self { kind, row }
203181
}
204182

205183
pub fn key_field(&self) -> Option<&FieldSchema> {
@@ -409,16 +387,27 @@ impl CollectorSchema {
409387
}
410388
}
411389

390+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
391+
pub struct OpScopeSchema {
392+
/// Output schema for transform ops.
393+
pub op_output_types: HashMap<FieldName, EnrichedValueType>,
394+
395+
/// Child op scope for foreach ops.
396+
pub op_scopes: HashMap<String, Arc<OpScopeSchema>>,
397+
398+
/// Collectors for the current scope.
399+
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
400+
}
401+
412402
/// Top-level schema for a flow instance.
413403
#[derive(Debug, Clone, Serialize, Deserialize)]
414-
pub struct DataSchema {
404+
pub struct FlowSchema {
415405
pub schema: StructSchema,
416406

417-
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
418-
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
407+
pub root_op_scope: OpScopeSchema,
419408
}
420409

421-
impl Deref for DataSchema {
410+
impl std::ops::Deref for FlowSchema {
422411
type Target = StructSchema;
423412

424413
fn deref(&self) -> &Self::Target {

src/builder/analyzed_flow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99

1010
pub struct AnalyzedFlow {
1111
pub flow_instance: spec::FlowInstanceSpec,
12-
pub data_schema: schema::DataSchema,
12+
pub data_schema: schema::FlowSchema,
1313
pub desired_state: setup::FlowSetupState<setup::DesiredMode>,
1414
/// It's None if the flow is not up to date
1515
pub execution_plan:
@@ -67,7 +67,7 @@ impl AnalyzedFlow {
6767

6868
pub struct AnalyzedTransientFlow {
6969
pub transient_flow_instance: spec::TransientFlowSpec,
70-
pub data_schema: schema::DataSchema,
70+
pub data_schema: schema::FlowSchema,
7171
pub execution_plan: plan::TransientExecutionPlan,
7272
pub output_type: schema::EnrichedValueType,
7373
}

0 commit comments

Comments
 (0)