Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 19 additions & 30 deletions src/base/schema.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::builder::plan::AnalyzedValueMapping;
use crate::prelude::*;

use super::spec::*;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, ops::Deref, sync::Arc};
use crate::builder::plan::AnalyzedValueMapping;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VectorTypeSchema {
Expand Down Expand Up @@ -141,9 +139,6 @@ impl std::fmt::Display for TableKind {
pub struct TableSchema {
pub kind: TableKind,
pub row: StructSchema,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
}

impl TableSchema {
Expand All @@ -170,36 +165,19 @@ impl TableSchema {
Self {
kind: self.kind,
row: self.row.without_attrs(),
collectors: self
.collectors
.iter()
.map(|c| NamedSpec {
name: c.name.clone(),
spec: Arc::from(c.spec.without_attrs()),
})
.collect(),
}
}
}

impl std::fmt::Display for TableSchema {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}({}", self.kind, self.row)?;
for collector in self.collectors.iter() {
write!(f, "; COLLECTOR {} ({})", collector.name, collector.spec)?;
}
write!(f, ")")?;
Ok(())
write!(f, "{}({})", self.kind, self.row)
}
}

impl TableSchema {
pub fn new(kind: TableKind, row: StructSchema) -> Self {
Self {
kind,
row,
collectors: Default::default(),
}
Self { kind, row }
}

pub fn key_field(&self) -> Option<&FieldSchema> {
Expand Down Expand Up @@ -409,16 +387,27 @@ impl CollectorSchema {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct OpScopeSchema {
/// Output schema for transform ops.
pub op_output_types: HashMap<FieldName, EnrichedValueType>,

/// Child op scope for foreach ops.
pub op_scopes: HashMap<String, Arc<OpScopeSchema>>,

/// Collectors for the current scope.
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
}

/// Top-level schema for a flow instance.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSchema {
pub struct FlowSchema {
pub schema: StructSchema,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
pub root_op_scope: OpScopeSchema,
}

impl Deref for DataSchema {
impl std::ops::Deref for FlowSchema {
type Target = StructSchema;

fn deref(&self) -> &Self::Target {
Expand Down
4 changes: 2 additions & 2 deletions src/builder/analyzed_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{

pub struct AnalyzedFlow {
pub flow_instance: spec::FlowInstanceSpec,
pub data_schema: schema::DataSchema,
pub data_schema: schema::FlowSchema,
pub desired_state: setup::FlowSetupState<setup::DesiredMode>,
/// It's None if the flow is not up to date
pub execution_plan:
Expand Down Expand Up @@ -67,7 +67,7 @@ impl AnalyzedFlow {

pub struct AnalyzedTransientFlow {
pub transient_flow_instance: spec::TransientFlowSpec,
pub data_schema: schema::DataSchema,
pub data_schema: schema::FlowSchema,
pub execution_plan: plan::TransientExecutionPlan,
pub output_type: schema::EnrichedValueType,
}
Expand Down
Loading