Skip to content

Commit f530811

Browse files
authored
refactor(target-factory): batch all specs for build() for target (#361)
1 parent 51e532c commit f530811

File tree

8 files changed

+469
-372
lines changed

8 files changed

+469
-372
lines changed

src/builder/analyzer.rs

Lines changed: 223 additions & 185 deletions
Large diffs are not rendered by default.

src/ops/factory_bases.rs

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -269,15 +269,23 @@ pub struct TypedExportTargetExecutors<F: StorageFactoryBase + ?Sized> {
269269
pub query_target: Option<Arc<dyn QueryTarget>>,
270270
}
271271

272-
pub struct TypedExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
272+
pub struct TypedExportDataCollectionBuildOutput<F: StorageFactoryBase + ?Sized> {
273273
pub executors: BoxFuture<'static, Result<TypedExportTargetExecutors<F>>>,
274274
pub setup_key: F::Key,
275275
pub desired_setup_state: F::SetupState,
276276
}
277+
pub struct TypedExportDataCollectionSpec<F: StorageFactoryBase + ?Sized> {
278+
pub name: String,
279+
pub spec: F::Spec,
280+
pub key_fields_schema: Vec<FieldSchema>,
281+
pub value_fields_schema: Vec<FieldSchema>,
282+
pub index_options: IndexOptions,
283+
}
277284

278285
#[async_trait]
279286
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
280287
type Spec: DeserializeOwned + Send + Sync;
288+
type DeclarationSpec: DeserializeOwned + Send + Sync;
281289
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
282290
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
283291
type ExportContext: Send + Sync + 'static;
@@ -286,13 +294,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
286294

287295
fn build(
288296
self: Arc<Self>,
289-
name: String,
290-
spec: Self::Spec,
291-
key_fields_schema: Vec<FieldSchema>,
292-
value_fields_schema: Vec<FieldSchema>,
293-
storage_options: IndexOptions,
297+
data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
298+
declarations: Vec<Self::DeclarationSpec>,
294299
context: Arc<FlowInstanceContext>,
295-
) -> Result<TypedExportTargetBuildOutput<Self>>;
300+
) -> Result<(
301+
Vec<TypedExportDataCollectionBuildOutput<Self>>,
302+
Vec<(Self::Key, Self::SetupState)>,
303+
)>;
296304

297305
/// Will not be called if it's setup by user.
298306
/// It returns an error if the target only supports setup by user.
@@ -332,35 +340,56 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
332340
impl<T: StorageFactoryBase> ExportTargetFactory for T {
333341
fn build(
334342
self: Arc<Self>,
335-
name: String,
336-
spec: serde_json::Value,
337-
key_fields_schema: Vec<FieldSchema>,
338-
value_fields_schema: Vec<FieldSchema>,
339-
storage_options: IndexOptions,
343+
data_collections: Vec<interface::ExportDataCollectionSpec>,
344+
declarations: Vec<serde_json::Value>,
340345
context: Arc<FlowInstanceContext>,
341-
) -> Result<interface::ExportTargetBuildOutput> {
342-
let spec: T::Spec = serde_json::from_value(spec)?;
343-
let build_output = StorageFactoryBase::build(
346+
) -> Result<(
347+
Vec<interface::ExportDataCollectionBuildOutput>,
348+
Vec<(serde_json::Value, serde_json::Value)>,
349+
)> {
350+
let (data_coll_output, decl_output) = StorageFactoryBase::build(
344351
self,
345-
name,
346-
spec,
347-
key_fields_schema,
348-
value_fields_schema,
349-
storage_options,
352+
data_collections
353+
.into_iter()
354+
.map(|d| {
355+
anyhow::Ok(TypedExportDataCollectionSpec {
356+
name: d.name,
357+
spec: serde_json::from_value(d.spec)?,
358+
key_fields_schema: d.key_fields_schema,
359+
value_fields_schema: d.value_fields_schema,
360+
index_options: d.index_options,
361+
})
362+
})
363+
.collect::<Result<Vec<_>>>()?,
364+
declarations
365+
.into_iter()
366+
.map(|d| anyhow::Ok(serde_json::from_value(d)?))
367+
.collect::<Result<Vec<_>>>()?,
350368
context,
351369
)?;
352-
let executors = async move {
353-
let executors = build_output.executors.await?;
354-
Ok(interface::ExportTargetExecutors {
355-
export_context: executors.export_context,
356-
query_target: executors.query_target,
370+
371+
let data_coll_output = data_coll_output
372+
.into_iter()
373+
.map(|d| {
374+
Ok(interface::ExportDataCollectionBuildOutput {
375+
executors: async move {
376+
let executors = d.executors.await?;
377+
Ok(interface::ExportTargetExecutors {
378+
export_context: executors.export_context,
379+
query_target: executors.query_target,
380+
})
381+
}
382+
.boxed(),
383+
setup_key: serde_json::to_value(d.setup_key)?,
384+
desired_setup_state: serde_json::to_value(d.desired_setup_state)?,
385+
})
357386
})
358-
};
359-
Ok(interface::ExportTargetBuildOutput {
360-
setup_key: serde_json::to_value(build_output.setup_key)?,
361-
desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?,
362-
executors: executors.boxed(),
363-
})
387+
.collect::<Result<Vec<_>>>()?;
388+
let decl_output = decl_output
389+
.into_iter()
390+
.map(|(key, state)| Ok((serde_json::to_value(key)?, serde_json::to_value(state)?)))
391+
.collect::<Result<Vec<_>>>()?;
392+
Ok((data_coll_output, decl_output))
364393
}
365394

366395
fn check_setup_status(

src/ops/interface.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,31 @@ pub struct ExportTargetExecutors {
162162
pub export_context: Arc<dyn Any + Send + Sync>,
163163
pub query_target: Option<Arc<dyn QueryTarget>>,
164164
}
165-
pub struct ExportTargetBuildOutput {
165+
pub struct ExportDataCollectionBuildOutput {
166166
pub executors: BoxFuture<'static, Result<ExportTargetExecutors>>,
167167
pub setup_key: serde_json::Value,
168168
pub desired_setup_state: serde_json::Value,
169169
}
170170

171+
pub struct ExportDataCollectionSpec {
172+
pub name: String,
173+
pub spec: serde_json::Value,
174+
pub key_fields_schema: Vec<FieldSchema>,
175+
pub value_fields_schema: Vec<FieldSchema>,
176+
pub index_options: IndexOptions,
177+
}
178+
171179
#[async_trait]
172180
pub trait ExportTargetFactory: Send + Sync {
173181
fn build(
174182
self: Arc<Self>,
175-
name: String,
176-
spec: serde_json::Value,
177-
key_fields_schema: Vec<FieldSchema>,
178-
value_fields_schema: Vec<FieldSchema>,
179-
storage_options: IndexOptions,
183+
data_collections: Vec<ExportDataCollectionSpec>,
184+
declarations: Vec<serde_json::Value>,
180185
context: Arc<FlowInstanceContext>,
181-
) -> Result<ExportTargetBuildOutput>;
186+
) -> Result<(
187+
Vec<ExportDataCollectionBuildOutput>,
188+
Vec<(serde_json::Value, serde_json::Value)>,
189+
)>;
182190

183191
/// Will not be called if it's setup by user.
184192
/// It returns an error if the target only supports setup by user.

src/ops/sdk.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub use crate::base::spec::*;
1111
pub use crate::base::value::*;
1212

1313
// Disambiguate the ExportTargetBuildOutput type.
14-
pub use super::factory_bases::TypedExportTargetBuildOutput;
14+
pub use super::factory_bases::TypedExportDataCollectionBuildOutput;
1515
/// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories.
1616
pub trait TypeCore {
1717
fn into_type(self) -> ValueType;

src/ops/storages/neo4j.rs

Lines changed: 80 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ impl<'a> DependentNodeLabelAnalyzer<'a> {
11221122
#[async_trait]
11231123
impl StorageFactoryBase for Factory {
11241124
type Spec = Spec;
1125+
type DeclarationSpec = ();
11251126
type SetupState = RelationshipSetupState;
11261127
type Key = GraphElement;
11271128
type ExportContext = ExportContext;
@@ -1132,82 +1133,89 @@ impl StorageFactoryBase for Factory {
11321133

11331134
fn build(
11341135
self: Arc<Self>,
1135-
_name: String,
1136-
spec: Spec,
1137-
key_fields_schema: Vec<FieldSchema>,
1138-
value_fields_schema: Vec<FieldSchema>,
1139-
index_options: IndexOptions,
1136+
data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
1137+
_declarations: Vec<()>,
11401138
context: Arc<FlowInstanceContext>,
1141-
) -> Result<TypedExportTargetBuildOutput<Self>> {
1142-
let setup_key = GraphElement::from_spec(&spec);
1143-
1144-
let (value_fields_info, rel_end_label_info) = match &spec.mapping {
1145-
GraphElementMapping::Node(_) => (
1146-
value_fields_schema
1147-
.into_iter()
1148-
.enumerate()
1149-
.map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping {
1150-
field_idx,
1151-
field_name: field_schema.name.clone(),
1152-
value_type: field_schema.value_type.typ.clone(),
1153-
})
1154-
.collect(),
1155-
None,
1156-
),
1157-
GraphElementMapping::Relationship(rel_spec) => {
1158-
let mut src_label_analyzer =
1159-
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?;
1160-
let mut tgt_label_analyzer =
1161-
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?;
1162-
let mut value_fields_info = vec![];
1163-
for (field_idx, field_schema) in value_fields_schema.iter().enumerate() {
1164-
if !src_label_analyzer.process_field(field_idx, field_schema)
1165-
&& !tgt_label_analyzer.process_field(field_idx, field_schema)
1166-
{
1167-
value_fields_info.push(AnalyzedGraphFieldMapping {
1168-
field_idx,
1169-
field_name: field_schema.name.clone(),
1170-
value_type: field_schema.value_type.typ.clone(),
1171-
});
1139+
) -> Result<(
1140+
Vec<TypedExportDataCollectionBuildOutput<Self>>,
1141+
Vec<(GraphElement, RelationshipSetupState)>,
1142+
)> {
1143+
let data_coll_output = data_collections
1144+
.into_iter()
1145+
.map(|d| {
1146+
let setup_key = GraphElement::from_spec(&d.spec);
1147+
1148+
let (value_fields_info, rel_end_label_info) = match &d.spec.mapping {
1149+
GraphElementMapping::Node(_) => (
1150+
d.value_fields_schema
1151+
.into_iter()
1152+
.enumerate()
1153+
.map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping {
1154+
field_idx,
1155+
field_name: field_schema.name.clone(),
1156+
value_type: field_schema.value_type.typ.clone(),
1157+
})
1158+
.collect(),
1159+
None,
1160+
),
1161+
GraphElementMapping::Relationship(rel_spec) => {
1162+
let mut src_label_analyzer =
1163+
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?;
1164+
let mut tgt_label_analyzer =
1165+
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?;
1166+
let mut value_fields_info = vec![];
1167+
for (field_idx, field_schema) in d.value_fields_schema.iter().enumerate() {
1168+
if !src_label_analyzer.process_field(field_idx, field_schema)
1169+
&& !tgt_label_analyzer.process_field(field_idx, field_schema)
1170+
{
1171+
value_fields_info.push(AnalyzedGraphFieldMapping {
1172+
field_idx,
1173+
field_name: field_schema.name.clone(),
1174+
value_type: field_schema.value_type.typ.clone(),
1175+
});
1176+
}
1177+
}
1178+
let src_label_info = src_label_analyzer.build()?;
1179+
let tgt_label_info = tgt_label_analyzer.build()?;
1180+
(value_fields_info, Some((src_label_info, tgt_label_info)))
11721181
}
1173-
}
1174-
let src_label_info = src_label_analyzer.build()?;
1175-
let tgt_label_info = tgt_label_analyzer.build()?;
1176-
(value_fields_info, Some((src_label_info, tgt_label_info)))
1177-
}
1178-
};
1179-
1180-
let desired_setup_state = RelationshipSetupState::new(
1181-
&spec,
1182-
key_fields_schema.iter().map(|f| f.name.clone()).collect(),
1183-
&index_options,
1184-
&value_fields_info,
1185-
rel_end_label_info.as_ref(),
1186-
)?;
1182+
};
11871183

1188-
let conn_spec = context
1189-
.auth_registry
1190-
.get::<ConnectionSpec>(&spec.connection)?;
1191-
let executors = async move {
1192-
let graph = self.graph_pool.get_graph(&conn_spec).await?;
1193-
let executor = Arc::new(ExportContext::new(
1194-
graph,
1195-
spec,
1196-
key_fields_schema,
1197-
value_fields_info,
1198-
rel_end_label_info,
1199-
)?);
1200-
Ok(TypedExportTargetExecutors {
1201-
export_context: executor,
1202-
query_target: None,
1184+
let desired_setup_state = RelationshipSetupState::new(
1185+
&d.spec,
1186+
d.key_fields_schema.iter().map(|f| f.name.clone()).collect(),
1187+
&d.index_options,
1188+
&value_fields_info,
1189+
rel_end_label_info.as_ref(),
1190+
)?;
1191+
1192+
let conn_spec = context
1193+
.auth_registry
1194+
.get::<ConnectionSpec>(&d.spec.connection)?;
1195+
let factory = self.clone();
1196+
let executors = async move {
1197+
let graph = factory.graph_pool.get_graph(&conn_spec).await?;
1198+
let executor = Arc::new(ExportContext::new(
1199+
graph,
1200+
d.spec,
1201+
d.key_fields_schema,
1202+
value_fields_info,
1203+
rel_end_label_info,
1204+
)?);
1205+
Ok(TypedExportTargetExecutors {
1206+
export_context: executor,
1207+
query_target: None,
1208+
})
1209+
}
1210+
.boxed();
1211+
Ok(TypedExportDataCollectionBuildOutput {
1212+
executors,
1213+
setup_key,
1214+
desired_setup_state,
1215+
})
12031216
})
1204-
}
1205-
.boxed();
1206-
Ok(TypedExportTargetBuildOutput {
1207-
executors,
1208-
setup_key,
1209-
desired_setup_state,
1210-
})
1217+
.collect::<Result<Vec<_>>>()?;
1218+
Ok((data_coll_output, vec![]))
12111219
}
12121220

12131221
fn check_setup_status(

0 commit comments

Comments
 (0)