Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {

pub struct TypedExportDataCollectionBuildOutput<F: TargetFactoryBase + ?Sized> {
pub export_context: BoxFuture<'static, Result<Arc<F::ExportContext>>>,
pub setup_key: F::Key,
pub setup_key: F::SetupKey,
pub desired_setup_state: F::SetupState,
}
pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
Expand All @@ -364,17 +364,19 @@ pub struct TypedExportDataCollectionSpec<F: TargetFactoryBase + ?Sized> {
}

pub struct TypedResourceSetupChangeItem<'a, F: TargetFactoryBase + ?Sized> {
pub key: F::Key,
pub key: F::SetupKey,
pub setup_change: &'a F::SetupChange,
}

#[async_trait]
pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
type Spec: DeserializeOwned + Send + Sync;
type DeclarationSpec: DeserializeOwned + Send + Sync;
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;

type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type SetupChange: ResourceSetupChange;

type ExportContext: Send + Sync + 'static;

fn name(&self) -> &str;
Expand All @@ -386,20 +388,20 @@ pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
context: Arc<FlowInstanceContext>,
) -> Result<(
Vec<TypedExportDataCollectionBuildOutput<Self>>,
Vec<(Self::Key, Self::SetupState)>,
Vec<(Self::SetupKey, Self::SetupState)>,
)>;

/// Deserialize the setup key from a JSON value.
/// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::Key> {
fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
Ok(serde_json::from_value(key)?)
}

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
async fn diff_setup_states(
&self,
key: Self::Key,
key: Self::SetupKey,
desired_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
flow_instance_ctx: Arc<FlowInstanceContext>,
Expand All @@ -411,7 +413,7 @@ pub trait TargetFactoryBase: TargetFactory + Send + Sync + 'static {
existing_state: &Self::SetupState,
) -> Result<SetupStateCompatibility>;

fn describe_resource(&self, key: &Self::Key) -> Result<String>;
fn describe_resource(&self, key: &Self::SetupKey) -> Result<String>;

fn extract_additional_key(
&self,
Expand Down Expand Up @@ -504,7 +506,7 @@ impl<T: TargetFactoryBase> TargetFactory for T {
existing_states: setup::CombinedState<serde_json::Value>,
flow_instance_ctx: Arc<FlowInstanceContext>,
) -> Result<Box<dyn setup::ResourceSetupChange>> {
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
let desired_state: Option<T::SetupState> = desired_state
.map(|v| serde_json::from_value(v.clone()))
.transpose()?;
Expand All @@ -521,12 +523,12 @@ impl<T: TargetFactoryBase> TargetFactory for T {
}

fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
TargetFactoryBase::describe_resource(self, &key)
}

fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value> {
let key: T::Key = Self::deserialize_setup_key(key.clone())?;
let key: T::SetupKey = Self::deserialize_setup_key(key.clone())?;
Ok(serde_json::to_value(key)?)
}

Expand Down
2 changes: 1 addition & 1 deletion src/ops/targets/kuzu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ impl TargetFactoryBase for Factory {
type SetupState = SetupState;
type SetupChange = GraphElementDataSetupChange;

type Key = KuzuGraphElement;
type SetupKey = KuzuGraphElement;
type ExportContext = ExportContext;

fn name(&self) -> &str {
Expand Down
2 changes: 1 addition & 1 deletion src/ops/targets/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ impl TargetFactoryBase for Factory {
GraphElementDataSetupChange,
components::SetupChange<SetupComponentOperator>,
);
type Key = Neo4jGraphElement;
type SetupKey = Neo4jGraphElement;
type ExportContext = ExportContext;

fn name(&self) -> &str {
Expand Down
2 changes: 1 addition & 1 deletion src/ops/targets/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl TargetFactoryBase for Factory {
type DeclarationSpec = ();
type SetupState = SetupState;
type SetupChange = SetupChange;
type Key = TableId;
type SetupKey = TableId;
type ExportContext = ExportContext;

fn name(&self) -> &str {
Expand Down
2 changes: 1 addition & 1 deletion src/ops/targets/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl TargetFactoryBase for Factory {
type DeclarationSpec = ();
type SetupState = SetupState;
type SetupChange = SetupChange;
type Key = CollectionKey;
type SetupKey = CollectionKey;
type ExportContext = ExportContext;

fn name(&self) -> &str {
Expand Down
Loading