Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ pub struct OpSpec {
pub spec: serde_json::Map<String, serde_json::Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportOpSpec {
pub source: OpSpec,
}

/// Transform data using a given operator.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransformOpSpec {
Expand Down Expand Up @@ -244,7 +249,7 @@ pub struct FlowInstanceSpec {
pub name: String,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub source_ops: Vec<NamedSpec<OpSpec>>,
pub import_ops: Vec<NamedSpec<ImportOpSpec>>,

#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,
Expand Down
38 changes: 19 additions & 19 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,25 +591,25 @@ fn add_collector(
}

impl AnalyzerContext<'_> {
pub(super) fn analyze_source_op(
pub(super) fn analyze_import_op(
&self,
scope: &mut DataScopeBuilder,
source_op: NamedSpec<OpSpec>,
import_op: NamedSpec<ImportOpSpec>,
metadata: Option<&mut FlowSetupMetadata>,
existing_source_states: Option<&Vec<&SourceSetupState>>,
) -> Result<impl Future<Output = Result<AnalyzedSourceOp>> + Send> {
let factory = self.registry.get(&source_op.spec.kind);
) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send> {
let factory = self.registry.get(&import_op.spec.source.kind);
let source_factory = match factory {
Some(ExecutorFactory::Source(source_executor)) => source_executor.clone(),
_ => {
return Err(anyhow::anyhow!(
"Source executor not found for kind: {}",
source_op.spec.kind
import_op.spec.source.kind
))
}
};
let (output_type, executor) = source_factory.build(
serde_json::Value::Object(source_op.spec.spec),
serde_json::Value::Object(import_op.spec.source.spec),
self.flow_ctx.clone(),
)?;

Expand Down Expand Up @@ -642,7 +642,7 @@ impl AnalyzerContext<'_> {
metadata.last_source_id
};
metadata.sources.insert(
source_op.name.clone(),
import_op.name.clone(),
SourceSetupState {
source_id,
key_schema: key_schema_no_attrs,
Expand All @@ -651,13 +651,13 @@ impl AnalyzerContext<'_> {
source_id
});

let op_name = source_op.name.clone();
let output = scope.add_field(source_op.name, &output_type)?;
let op_name = import_op.name.clone();
let output = scope.add_field(import_op.name, &output_type)?;
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
let executor = executor.await?;
trace!("Finished building executor for source op `{}`", op_name);
Ok(AnalyzedSourceOp {
Ok(AnalyzedImportOp {
source_id: source_id.unwrap_or_default(),
executor,
output,
Expand Down Expand Up @@ -1100,14 +1100,14 @@ pub fn analyze_flow(
name: ROOT_SCOPE_NAME,
data: &mut root_data_scope,
};
let source_ops_futs = flow_inst
.source_ops
let import_ops_futs = flow_inst
.import_ops
.iter()
.map(|source_op| {
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
analyzer_ctx.analyze_source_op(
.map(|import_op| {
let existing_source_states = source_states_by_name.get(import_op.name.as_str());
analyzer_ctx.analyze_import_op(
root_exec_scope.data,
source_op.clone(),
import_op.clone(),
Some(&mut setup_state.metadata),
existing_source_states,
)
Expand Down Expand Up @@ -1138,8 +1138,8 @@ pub fn analyze_flow(
.with(&data_schema)?
.into_fingerprint();
let plan_fut = async move {
let (source_ops, op_scope, export_ops) = try_join3(
try_join_all(source_ops_futs),
let (import_ops, op_scope, export_ops) = try_join3(
try_join_all(import_ops_futs),
op_scope_fut,
try_join_all(export_ops_futs),
)
Expand All @@ -1148,7 +1148,7 @@ pub fn analyze_flow(
Ok(ExecutionPlan {
tracking_table_setup,
logic_fingerprint,
source_ops,
import_ops,
op_scope,
export_ops,
})
Expand Down
22 changes: 12 additions & 10 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pub struct FlowBuilder {
direct_input_fields: Vec<FieldSchema>,
direct_output_value: Option<spec::ValueMapping>,

source_ops: Vec<NamedSpec<spec::OpSpec>>,
import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,

next_generated_op_id: usize,
Expand Down Expand Up @@ -365,7 +365,7 @@ impl FlowBuilder {

reactive_ops: vec![],

source_ops: vec![],
import_ops: vec![],
export_ops: vec![],

direct_input_fields: vec![],
Expand Down Expand Up @@ -395,11 +395,13 @@ impl FlowBuilder {
));
}
}
let source_op = spec::NamedSpec {
let import_op = spec::NamedSpec {
name,
spec: spec::OpSpec {
kind,
spec: op_spec.into_inner(),
spec: spec::ImportOpSpec {
source: spec::OpSpec {
kind,
spec: op_spec.into_inner(),
},
},
};
let analyzer_ctx = AnalyzerContext {
Expand All @@ -409,14 +411,14 @@ impl FlowBuilder {
let mut root_data_scope = self.root_data_scope.lock().unwrap();

let analyzed = analyzer_ctx
.analyze_source_op(&mut root_data_scope, source_op.clone(), None, None)
.analyze_import_op(&mut root_data_scope, import_op.clone(), None, None)
.into_py_result()?;
std::mem::drop(analyzed);

let result =
Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone())
.into_py_result()?;
self.source_ops.push(source_op);
self.import_ops.push(import_op);
Ok(result)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ impl FlowBuilder {
pub fn build_flow(&self, py: Python<'_>) -> PyResult<py::Flow> {
let spec = spec::FlowInstanceSpec {
name: self.flow_instance_name.clone(),
source_ops: self.source_ops.clone(),
import_ops: self.import_ops.clone(),
reactive_ops: self.reactive_ops.clone(),
export_ops: self.export_ops.clone(),
};
Expand Down Expand Up @@ -705,7 +707,7 @@ impl FlowBuilder {
impl std::fmt::Display for FlowBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Flow instance name: {}\n\n", self.flow_instance_name)?;
for op in self.source_ops.iter() {
for op in self.import_ops.iter() {
write!(
f,
"Source op {}\n{}\n",
Expand Down
4 changes: 2 additions & 2 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct AnalyzedOpOutput {
pub field_idx: u32,
}

pub struct AnalyzedSourceOp {
pub struct AnalyzedImportOp {
pub name: String,
pub source_id: i32,
pub executor: Box<dyn SourceExecutor>,
Expand Down Expand Up @@ -128,7 +128,7 @@ pub struct ExecutionPlan {
pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState,
pub logic_fingerprint: Fingerprint,

pub source_ops: Vec<AnalyzedSourceOp>,
pub import_ops: Vec<AnalyzedImportOp>,
pub op_scope: AnalyzedOpScope,
pub export_ops: Vec<AnalyzedExportOp>,
}
Expand Down
26 changes: 13 additions & 13 deletions src/execution/dumper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use yaml_rust2::YamlEmitter;
use super::memoization::EvaluationMemoryOptions;
use super::row_indexer;
use crate::base::{schema, value};
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
use crate::ops::interface::SourceExecutorListOptions;
use crate::utils::yaml_ser::YamlSerializer;

Expand Down Expand Up @@ -69,7 +69,7 @@ struct Dumper<'a> {
impl<'a> Dumper<'a> {
async fn evaluate_source_entry<'b>(
&'a self,
source_op: &'a AnalyzedSourceOp,
import_op: &'a AnalyzedImportOp,
key: &value::KeyValue,
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
Expand All @@ -78,7 +78,7 @@ impl<'a> Dumper<'a> {
{
let data_builder = row_indexer::evaluate_source_entry_with_memory(
self.plan,
source_op,
import_op,
self.schema,
key,
EvaluationMemoryOptions {
Expand Down Expand Up @@ -130,13 +130,13 @@ impl<'a> Dumper<'a> {

async fn evaluate_and_dump_source_entry(
&self,
source_op: &AnalyzedSourceOp,
import_op: &AnalyzedImportOp,
key: value::KeyValue,
file_path: PathBuf,
) -> Result<()> {
let mut collected_values_buffer = Vec::new();
let (exports, error) = match self
.evaluate_source_entry(source_op, &key, &mut collected_values_buffer)
.evaluate_source_entry(import_op, &key, &mut collected_values_buffer)
.await
{
Ok(exports) => (exports, None),
Expand All @@ -145,7 +145,7 @@ impl<'a> Dumper<'a> {
let key_value = value::Value::from(key);
let file_data = SourceOutputData {
key: value::TypedValue {
t: &source_op.primary_key_type,
t: &import_op.primary_key_type,
v: &key_value,
},
exports,
Expand All @@ -166,10 +166,10 @@ impl<'a> Dumper<'a> {
Ok(())
}

async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> {
async fn evaluate_and_dump_for_source(&self, import_op: &AnalyzedImportOp) -> Result<()> {
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();

let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
let mut rows_stream = import_op.executor.list(SourceExecutorListOptions {
include_ordinal: false,
});
while let Some(rows) = rows_stream.next().await {
Expand All @@ -181,7 +181,7 @@ impl<'a> Dumper<'a> {
.map(|s| urlencoding::encode(&s).into_owned())
.join(":");
s.truncate(
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
(0..(FILENAME_PREFIX_MAX_LENGTH - import_op.name.as_str().len()))
.rev()
.find(|i| s.is_char_boundary(*i))
.unwrap_or(0),
Expand All @@ -202,9 +202,9 @@ impl<'a> Dumper<'a> {
Cow::Borrowed("")
};
let file_name =
format!("{}@{}{}.yaml", source_op.name, filename_prefix, extra_id);
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
let file_path = output_dir.join(Path::new(&file_name));
self.evaluate_and_dump_source_entry(source_op, key, file_path)
self.evaluate_and_dump_source_entry(import_op, key, file_path)
})
});
try_join_all(evaluate_futs).await?;
Expand All @@ -214,9 +214,9 @@ impl<'a> Dumper<'a> {
async fn evaluate_and_dump(&self) -> Result<()> {
try_join_all(
self.plan
.source_ops
.import_ops
.iter()
.map(|source_op| self.evaluate_and_dump_for_source_op(source_op)),
.map(|import_op| self.evaluate_and_dump_for_source(import_op)),
)
.await?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async fn evaluate_op_scope(

pub async fn evaluate_source_entry(
plan: &ExecutionPlan,
source_op: &AnalyzedSourceOp,
import_op: &AnalyzedImportOp,
schema: &schema::DataSchema,
key: &value::KeyValue,
source_value: value::FieldValues,
Expand All @@ -455,7 +455,7 @@ pub async fn evaluate_source_entry(
schema: root_schema,
};

let collection_schema = match &root_schema.fields[source_op.output.field_idx as usize]
let collection_schema = match &root_schema.fields[import_op.output.field_idx as usize]
.value_type
.typ
{
Expand All @@ -468,7 +468,7 @@ pub async fn evaluate_source_entry(
let scope_value =
ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?;
root_scope_entry.define_field_w_builder(
&source_op.output,
&import_op.output,
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
);

Expand Down
Loading