Skip to content

Commit 3b3efa3

Browse files
authored
feat(declaration): merge states from declarations in analyzer (#373)
1 parent 471fa6b commit 3b3efa3

File tree

1 file changed

+61
-58
lines changed

1 file changed

+61
-58
lines changed

src/builder/analyzer.rs

Lines changed: 61 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -818,32 +818,29 @@ impl AnalyzerContext<'_> {
818818
Ok(result_fut)
819819
}
820820

821-
fn analyze_export_op(
821+
fn merge_export_op_states(
822822
&self,
823-
export_op: &NamedSpec<ExportOpSpec>,
823+
target_kind: String,
824+
setup_key: serde_json::Value,
825+
setup_state: serde_json::Value,
826+
setup_by_user: bool,
824827
export_factory: &dyn ExportTargetFactory,
825-
data_coll_output: ExportDataCollectionBuildOutput,
826-
data_fields_info: ExportDataFieldsInfo,
827828
flow_setup_state: &mut FlowSetupState<DesiredMode>,
828829
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
829-
) -> Result<impl Future<Output = Result<AnalyzedExportOp>> + Send> {
830+
) -> Result<i32> {
830831
let resource_id = ResourceIdentifier {
831-
key: data_coll_output.setup_key.clone(),
832-
target_kind: export_op.spec.target.kind.clone(),
832+
key: setup_key,
833+
target_kind,
833834
};
834835
let existing_target_states = existing_target_states.get(&resource_id);
835836
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
836837
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
837838
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
838-
let compatibility =
839-
if export_op.spec.setup_by_user == existing_state.common.setup_by_user {
840-
export_factory.check_state_compatibility(
841-
&data_coll_output.desired_setup_state,
842-
&existing_state.state,
843-
)?
844-
} else {
845-
SetupStateCompatibility::NotCompatible
846-
};
839+
let compatibility = if setup_by_user == existing_state.common.setup_by_user {
840+
export_factory.check_state_compatibility(&setup_state, &existing_state.state)?
841+
} else {
842+
SetupStateCompatibility::NotCompatible
843+
};
847844
let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible {
848845
reusable_schema_version_ids.insert(
849846
(compatibility == SetupStateCompatibility::Compatible)
@@ -897,36 +894,18 @@ impl AnalyzerContext<'_> {
897894
target_id,
898895
schema_version_id,
899896
max_schema_version_id: max_schema_version_id.max(schema_version_id),
900-
setup_by_user: export_op.spec.setup_by_user,
897+
setup_by_user,
901898
},
902-
state: data_coll_output.desired_setup_state,
899+
state: setup_state,
903900
});
904901
}
905902
}
906-
let op_name = export_op.name.clone();
907-
Ok(async move {
908-
trace!("Start building executor for export op `{op_name}`");
909-
let executors = data_coll_output
910-
.executors
911-
.await
912-
.with_context(|| format!("Analyzing export op: {op_name}"))?;
913-
trace!("Finished building executor for export op `{op_name}`");
914-
Ok(AnalyzedExportOp {
915-
name: op_name,
916-
target_id,
917-
input: data_fields_info.local_collector_ref,
918-
export_context: executors.export_context,
919-
query_target: executors.query_target,
920-
primary_key_def: data_fields_info.primary_key_def,
921-
primary_key_type: data_fields_info.primary_key_type,
922-
value_fields: data_fields_info.value_fields_idx,
923-
value_stable: data_fields_info.value_stable,
924-
})
925-
})
903+
Ok(target_id)
926904
}
927905

928906
fn analyze_export_op_group(
929907
&self,
908+
target_kind: String,
930909
scope: &mut DataScopeBuilder,
931910
flow_inst: &FlowInstanceSpec,
932911
export_op_group: &AnalyzedExportTargetOpGroup,
@@ -1006,38 +985,61 @@ impl AnalyzerContext<'_> {
1006985
});
1007986
data_fields_infos.push(data_collection_info);
1008987
}
1009-
let (data_collections_output, _) = export_op_group.target_factory.clone().build(
1010-
collection_specs,
1011-
declarations,
1012-
self.flow_ctx.clone(),
1013-
)?;
1014-
if data_collections_output.len() != data_fields_infos.len() {
1015-
api_bail!(
1016-
"Data collection output length mismatch: expect {}, got {}",
1017-
data_fields_infos.len(),
1018-
data_collections_output.len()
1019-
);
1020-
}
1021-
1022-
let result = export_op_group
988+
let (data_collections_output, declarations_output) = export_op_group
989+
.target_factory
990+
.clone()
991+
.build(collection_specs, declarations, self.flow_ctx.clone())?;
992+
let analyzed_export_ops = export_op_group
1023993
.op_idx
1024994
.iter()
1025995
.zip(data_collections_output.into_iter())
1026996
.zip(data_fields_infos.into_iter())
1027997
.map(|((idx, data_coll_output), data_fields_info)| {
1028998
let export_op = &flow_inst.export_ops[*idx];
1029-
Ok(self.analyze_export_op(
1030-
export_op,
999+
let target_id = self.merge_export_op_states(
1000+
export_op.spec.target.kind.clone(),
1001+
data_coll_output.setup_key,
1002+
data_coll_output.desired_setup_state,
1003+
export_op.spec.setup_by_user,
10311004
export_op_group.target_factory.as_ref(),
1032-
data_coll_output,
1033-
data_fields_info,
10341005
flow_setup_state,
10351006
existing_target_states,
1036-
)?)
1007+
)?;
1008+
let op_name = export_op.name.clone();
1009+
Ok(async move {
1010+
trace!("Start building executor for export op `{op_name}`");
1011+
let executors = data_coll_output
1012+
.executors
1013+
.await
1014+
.with_context(|| format!("Analyzing export op: {op_name}"))?;
1015+
trace!("Finished building executor for export op `{op_name}`");
1016+
Ok(AnalyzedExportOp {
1017+
name: op_name,
1018+
target_id,
1019+
input: data_fields_info.local_collector_ref,
1020+
export_context: executors.export_context,
1021+
query_target: executors.query_target,
1022+
primary_key_def: data_fields_info.primary_key_def,
1023+
primary_key_type: data_fields_info.primary_key_type,
1024+
value_fields: data_fields_info.value_fields_idx,
1025+
value_stable: data_fields_info.value_stable,
1026+
})
1027+
})
10371028
})
10381029
.collect::<Result<Vec<_>>>()?;
1030+
for (setup_key, setup_state) in declarations_output.into_iter() {
1031+
self.merge_export_op_states(
1032+
target_kind.clone(),
1033+
setup_key,
1034+
setup_state,
1035+
/*setup_by_user=*/ false,
1036+
export_op_group.target_factory.as_ref(),
1037+
flow_setup_state,
1038+
existing_target_states,
1039+
)?;
1040+
}
10391041

1040-
Ok(result)
1042+
Ok(analyzed_export_ops)
10411043
}
10421044

10431045
fn analyze_op_scope(
@@ -1198,6 +1200,7 @@ pub fn analyze_flow(
11981200
op_idx: op_ids.export_op_ids,
11991201
};
12001202
export_ops_futs.extend(analyzer_ctx.analyze_export_op_group(
1203+
target_kind,
12011204
root_exec_scope.data,
12021205
flow_inst,
12031206
&analyzed_target_op_group,

0 commit comments

Comments
 (0)