Skip to content

Commit 78d9683

Browse files
authored
Refactor ExportTargetFactory::build to make return data more clear. (#264)
1 parent fe3b41f commit 78d9683

File tree

6 files changed

+44
-45
lines changed

6 files changed

+44
-45
lines changed

src/builder/analyzer.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ impl AnalyzerContext<'_> {
886886
}
887887
};
888888

889-
let ((setup_key, desired_state), executor_fut) = export_factory.clone().build(
889+
let setup_output = export_factory.clone().build(
890890
export_op.name.clone(),
891891
spec,
892892
key_fields_schema,
@@ -895,7 +895,7 @@ impl AnalyzerContext<'_> {
895895
self.flow_ctx.clone(),
896896
)?;
897897
let resource_id = ResourceIdentifier {
898-
key: setup_key.clone(),
898+
key: setup_output.setup_key.clone(),
899899
target_kind: export_target.kind.clone(),
900900
};
901901
let existing_target_states = existing_target_states.get(&resource_id);
@@ -904,8 +904,10 @@ impl AnalyzerContext<'_> {
904904
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
905905
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
906906
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
907-
let compatibility = export_factory
908-
.check_state_compatibility(&desired_state, &existing_state.state)?;
907+
let compatibility = export_factory.check_state_compatibility(
908+
&setup_output.desired_setup_state,
909+
&existing_state.state,
910+
)?;
909911
let compatible_target_id =
910912
if compatibility != SetupStateCompatibility::NotCompatible {
911913
reusable_schema_version_ids.insert(
@@ -946,10 +948,7 @@ impl AnalyzerContext<'_> {
946948
} else {
947949
max_schema_version_id + 1
948950
};
949-
match setup_state.targets.entry(ResourceIdentifier {
950-
key: setup_key,
951-
target_kind: export_target.kind.clone(),
952-
}) {
951+
match setup_state.targets.entry(resource_id) {
953952
indexmap::map::Entry::Occupied(entry) => {
954953
api_bail!(
955954
"Target resource already exists: kind = {}, key = {}",
@@ -964,7 +963,7 @@ impl AnalyzerContext<'_> {
964963
schema_version_id,
965964
max_schema_version_id: max_schema_version_id.max(schema_version_id),
966965
},
967-
state: desired_state,
966+
state: setup_output.desired_setup_state,
968967
});
969968
}
970969
}
@@ -980,7 +979,8 @@ impl AnalyzerContext<'_> {
980979
.unwrap_or(false);
981980
Ok(async move {
982981
trace!("Start building executor for export op `{}`", export_op.name);
983-
let (executor, query_target) = executor_fut
982+
let (executor, query_target) = setup_output
983+
.executor
984984
.await
985985
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
986986
trace!(

src/ops/factory_bases.rs

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
1-
use std::collections::HashMap;
1+
use crate::prelude::*;
22
use std::fmt::Debug;
33
use std::hash::Hash;
4-
use std::sync::Arc;
5-
6-
use anyhow::Result;
7-
use axum::async_trait;
8-
use futures::future::BoxFuture;
9-
use serde::de::DeserializeOwned;
10-
use serde::Serialize;
114

125
use super::interface::*;
136
use super::registry::*;
147
use crate::api_bail;
158
use crate::api_error;
169
use crate::base::schema::*;
1710
use crate::base::spec::*;
18-
use crate::base::value;
1911
use crate::builder::plan::AnalyzedValueMapping;
2012
use crate::setup;
2113
// SourceFactoryBase
@@ -272,6 +264,13 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
272264
}
273265
}
274266

267+
pub struct ExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
268+
pub executor:
269+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
270+
pub setup_key: F::Key,
271+
pub desired_setup_state: F::SetupState,
272+
}
273+
275274
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
276275
type Spec: DeserializeOwned + Send + Sync;
277276
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
@@ -287,11 +286,9 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
287286
value_fields_schema: Vec<FieldSchema>,
288287
storage_options: IndexOptions,
289288
context: Arc<FlowInstanceContext>,
290-
) -> Result<(
291-
(Self::Key, Self::SetupState),
292-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
293-
)>;
289+
) -> Result<ExportTargetBuildOutput<Self>>;
294290

291+
/// This is only called for non-user-setup targets.
295292
fn check_setup_status(
296293
&self,
297294
key: Self::Key,
@@ -387,12 +384,9 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
387384
value_fields_schema: Vec<FieldSchema>,
388385
storage_options: IndexOptions,
389386
context: Arc<FlowInstanceContext>,
390-
) -> Result<(
391-
(serde_json::Value, serde_json::Value),
392-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
393-
)> {
387+
) -> Result<interface::ExportTargetBuildOutput> {
394388
let spec: T::Spec = serde_json::from_value(spec)?;
395-
let ((setup_key, setup_state), executors) = StorageFactoryBase::build(
389+
let build_output = StorageFactoryBase::build(
396390
self,
397391
name,
398392
spec,
@@ -401,13 +395,11 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
401395
storage_options,
402396
context,
403397
)?;
404-
Ok((
405-
(
406-
serde_json::to_value(setup_key)?,
407-
serde_json::to_value(setup_state)?,
408-
),
409-
executors,
410-
))
398+
Ok(interface::ExportTargetBuildOutput {
399+
executor: build_output.executor,
400+
setup_key: serde_json::to_value(build_output.setup_key)?,
401+
desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?,
402+
})
411403
}
412404

413405
fn check_setup_status(

src/ops/interface.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ pub enum SetupStateCompatibility {
154154
NotCompatible,
155155
}
156156

157+
pub struct ExportTargetBuildOutput {
158+
pub executor:
159+
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
160+
pub setup_key: serde_json::Value,
161+
pub desired_setup_state: serde_json::Value,
162+
}
163+
157164
pub trait ExportTargetFactory {
158165
// The first field of the `input_schema` is the primary key field.
159166
// If it has struct type, it should be converted to composite primary key.
@@ -165,10 +172,7 @@ pub trait ExportTargetFactory {
165172
value_fields_schema: Vec<FieldSchema>,
166173
storage_options: IndexOptions,
167174
context: Arc<FlowInstanceContext>,
168-
) -> Result<(
169-
(serde_json::Value, serde_json::Value),
170-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
171-
)>;
175+
) -> Result<ExportTargetBuildOutput>;
172176

173177
fn check_setup_status(
174178
&self,

src/ops/sdk.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub use crate::base::schema::*;
1010
pub use crate::base::spec::*;
1111
pub use crate::base::value::*;
1212

13+
// Disambiguate the ExportTargetBuildOutput type.
14+
pub use super::factory_bases::ExportTargetBuildOutput;
1315
/// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories.
1416
pub trait TypeCore {
1517
fn into_type(self) -> ValueType;

src/ops/storages/postgres.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -921,10 +921,7 @@ impl StorageFactoryBase for Arc<Factory> {
921921
value_fields_schema: Vec<FieldSchema>,
922922
storage_options: IndexOptions,
923923
context: Arc<FlowInstanceContext>,
924-
) -> Result<(
925-
(TableId, SetupState),
926-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
927-
)> {
924+
) -> Result<ExportTargetBuildOutput<Self>> {
928925
let table_id = TableId {
929926
database_url: spec.database_url.clone(),
930927
table_name: spec
@@ -951,7 +948,11 @@ impl StorageFactoryBase for Arc<Factory> {
951948
Some(query_target as Arc<dyn QueryTarget>),
952949
))
953950
};
954-
Ok(((table_id, setup_state), executors.boxed()))
951+
Ok(ExportTargetBuildOutput {
952+
executor: executors.boxed(),
953+
setup_key: table_id,
954+
desired_setup_state: setup_state,
955+
})
955956
}
956957

957958
fn check_setup_status(

src/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub(crate) use futures::{future::BoxFuture, prelude::*, stream::BoxStream};
77
pub(crate) use futures::{FutureExt, StreamExt};
88
pub(crate) use indexmap::{IndexMap, IndexSet};
99
pub(crate) use itertools::Itertools;
10-
pub(crate) use serde::{Deserialize, Serialize};
10+
pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize};
1111
pub(crate) use std::borrow::Cow;
1212
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
1313
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};

0 commit comments

Comments
 (0)