diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index b9f0ab587..297aa56f6 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -437,7 +437,10 @@ def export( target_name, _spec_kind(target_spec), dump_engine_object(target_spec), - dump_engine_object(attachments), + [ + {"kind": _spec_kind(att), **dump_engine_object(att)} + for att in attachments + ], dump_engine_object(index_options), self._engine_data_collector, setup_by_user, diff --git a/python/cocoindex/targets/_engine_builtin_specs.py b/python/cocoindex/targets/_engine_builtin_specs.py index baec18b34..91a6be156 100644 --- a/python/cocoindex/targets/_engine_builtin_specs.py +++ b/python/cocoindex/targets/_engine_builtin_specs.py @@ -16,6 +16,14 @@ class Postgres(op.TargetSpec): table_name: str | None = None +class PostgresSqlAttachment(op.TargetAttachmentSpec): + """Attachment to execute specified SQL statements for Postgres targets.""" + + name: str + setup_sql: str + teardown_sql: str | None = None + + @dataclass class QdrantConnection: """Connection spec for Qdrant.""" diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 2b0b19ae4..6deb89cb7 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -642,13 +642,17 @@ pub struct TypedTargetAttachmentState &str; + fn get_state( &self, target_name: &str, @@ -656,11 +660,13 @@ pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static { attachment_spec: Self::Spec, ) -> Result>; - fn diff_setup_states( + async fn diff_setup_states( &self, - key: &serde_json::Value, - new_state: Option, - existing_states: setup::CombinedState, + target_key: &Self::TargetKey, + attachment_key: &Self::SetupKey, + new_state: Option, + existing_states: setup::CombinedState, + context: &interface::FlowInstanceContext, ) -> Result>; /// Deserialize the setup key from a JSON value. @@ -668,6 +674,16 @@ pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static { fn deserialize_setup_key(key: serde_json::Value) -> Result { Ok(utils::deser::from_json_value(key)?) } + + fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()> + where + Self: Sized, + { + registry.register( + self.name().to_string(), + ExecutorFactory::TargetAttachment(Arc::new(self)), + ) + } } #[async_trait] @@ -695,19 +711,25 @@ impl TargetAttachmentFactory for T { }) } - fn diff_setup_states( + async fn diff_setup_states( &self, - key: &serde_json::Value, + target_key: &serde_json::Value, + attachment_key: &serde_json::Value, new_state: Option, existing_states: setup::CombinedState, + context: &interface::FlowInstanceContext, ) -> Result>> { - let setup_change = self.diff_setup_states( - &utils::deser::from_json_value(key.clone())?, - new_state - .map(|v| utils::deser::from_json_value(v)) - .transpose()?, - from_json_combined_state(existing_states)?, - )?; + let setup_change = self + .diff_setup_states( + &utils::deser::from_json_value(target_key.clone())?, + &utils::deser::from_json_value(attachment_key.clone())?, + new_state + .map(|v| utils::deser::from_json_value(v)) + .transpose()?, + from_json_combined_state(existing_states)?, + context, + ) + .await?; Ok(setup_change.map(|s| Box::new(s) as Box)) } } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 709a71ecc..b93f47a6a 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -323,11 +323,12 @@ pub struct TargetAttachmentState { #[async_trait] pub trait AttachmentSetupChange { - fn describe_change(&self) -> String; + fn describe_changes(&self) -> Vec; async fn apply_change(&self) -> Result<()>; } +#[async_trait] pub trait TargetAttachmentFactory: Send + Sync { /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed. /// This should always return the canonical serialized form. @@ -341,11 +342,13 @@ pub trait TargetAttachmentFactory: Send + Sync { ) -> Result; /// Should return Some if and only if any changes are needed. - fn diff_setup_states( + async fn diff_setup_states( &self, - key: &serde_json::Value, + target_key: &serde_json::Value, + attachment_key: &serde_json::Value, new_state: Option, existing_states: setup::CombinedState, + context: &interface::FlowInstanceContext, ) -> Result>>; } diff --git a/src/ops/registration.rs b/src/ops/registration.rs index f91ee1c62..29034db69 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -20,7 +20,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result functions::embed_text::register(registry)?; functions::split_by_separators::register(registry)?; - targets::postgres::Factory::default().register(registry)?; + targets::postgres::register(registry)?; targets::qdrant::register(registry)?; targets::kuzu::register(registry, reqwest_client)?; diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index 100812e32..03010d2a2 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -248,8 +248,7 @@ impl ExportContext { } } -#[derive(Default)] -pub struct Factory {} +struct TargetFactory; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct TableId { @@ -614,7 +613,7 @@ impl SetupChange { } #[async_trait] -impl TargetFactoryBase for Factory { +impl TargetFactoryBase for TargetFactory { type Spec = Spec; type DeclarationSpec = (); type SetupState = SetupState; @@ -752,3 +751,123 @@ impl TargetFactoryBase for Factory { Ok(()) } } + +//////////////////////////////////////////////////////////// +// Attachment Factory +//////////////////////////////////////////////////////////// + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SqlStatementAttachmentSpec { + name: String, + setup_sql: String, + teardown_sql: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SqlStatementAttachmentState { + setup_sql: String, + teardown_sql: Option, +} + +pub struct SqlStatementAttachmentSetupChange { + db_pool: PgPool, + setup_sql_to_run: Option, + teardown_sql_to_run: IndexSet, +} + +#[async_trait] +impl AttachmentSetupChange for SqlStatementAttachmentSetupChange { + fn describe_changes(&self) -> Vec { + let mut result = vec![]; + for teardown_sql in self.teardown_sql_to_run.iter() { + result.push(format!("Run teardown SQL: {}", teardown_sql)); + } + if let Some(setup_sql) = &self.setup_sql_to_run { + result.push(format!("Run setup SQL: {}", setup_sql)); + } + result + } + + async fn apply_change(&self) -> Result<()> { + for teardown_sql in self.teardown_sql_to_run.iter() { + sqlx::query(teardown_sql).execute(&self.db_pool).await?; + } + if let Some(setup_sql) = &self.setup_sql_to_run { + sqlx::query(setup_sql).execute(&self.db_pool).await?; + } + Ok(()) + } +} + +struct SqlAttachmentFactory; + +#[async_trait] +impl TargetSpecificAttachmentFactoryBase for SqlAttachmentFactory { + type TargetKey = TableId; + type TargetSpec = Spec; + type Spec = SqlStatementAttachmentSpec; + type SetupKey = String; + type SetupState = SqlStatementAttachmentState; + type SetupChange = SqlStatementAttachmentSetupChange; + + fn name(&self) -> &str { + "PostgresSqlAttachment" + } + + fn get_state( + &self, + _target_name: &str, + _target_spec: &Spec, + attachment_spec: SqlStatementAttachmentSpec, + ) -> Result> { + Ok(TypedTargetAttachmentState { + setup_key: attachment_spec.name, + setup_state: SqlStatementAttachmentState { + setup_sql: attachment_spec.setup_sql, + teardown_sql: attachment_spec.teardown_sql, + }, + }) + } + + async fn diff_setup_states( + &self, + target_key: &TableId, + _attachment_key: &String, + new_state: Option, + existing_states: setup::CombinedState, + context: &interface::FlowInstanceContext, + ) -> Result> { + let teardown_sql_to_run: IndexSet = if new_state.is_none() { + existing_states + .possible_versions() + .filter_map(|s| s.teardown_sql.clone()) + .collect() + } else { + IndexSet::new() + }; + let setup_sql_to_run = if let Some(new_state) = new_state + && !existing_states.always_exists_and(|s| s.setup_sql == new_state.setup_sql) + { + Some(new_state.setup_sql) + } else { + None + }; + let change = if setup_sql_to_run.is_some() || !teardown_sql_to_run.is_empty() { + let db_pool = get_db_pool(target_key.database.as_ref(), &context.auth_registry).await?; + Some(SqlStatementAttachmentSetupChange { + db_pool, + setup_sql_to_run, + teardown_sql_to_run, + }) + } else { + None + }; + Ok(change) + } +} + +pub fn register(registry: &mut ExecutorFactoryRegistry) -> Result<()> { + TargetFactory.register(registry)?; + SqlAttachmentFactory.register(registry)?; + Ok(()) +} diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 6ea254c33..9b71a7109 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -256,9 +256,11 @@ fn group_states, existing: &CombinedState, + context: &interface::FlowInstanceContext, ) -> Result { let existing_current_attachments = existing .current @@ -309,8 +311,15 @@ fn collect_attachments_setup_change( for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() { let factory = get_attachment_factory(&kind)?; let is_upsertion = setup_state.desired.is_some(); - if let Some(action) = - factory.diff_setup_states(&key, setup_state.desired, setup_state.existing)? + if let Some(action) = factory + .diff_setup_states( + &target_key, + &key, + setup_state.desired, + setup_state.existing, + context, + ) + .await? { if is_upsertion { attachments_change.upserts.push(action); @@ -411,9 +420,12 @@ pub async fn diff_flow_setup_states( }; let attachments_change = collect_attachments_setup_change( + &resource_id.key, target_states_group.desired.as_ref(), &target_states_group.existing, - )?; + &flow_instance_ctx, + ) + .await?; let desired_state = target_states_group.desired.clone(); let target_state = target_states_group diff --git a/src/setup/states.rs b/src/setup/states.rs index 8db1d6bbd..defd4034b 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -91,6 +91,10 @@ impl CombinedState { self.current.is_some() && self.staging.iter().all(|s| !s.is_delete()) } + pub fn always_exists_and(&self, predicate: impl Fn(&T) -> bool) -> bool { + self.always_exists() && self.possible_versions().all(predicate) + } + pub fn legacy_values &V>( &self, desired: Option<&T>, @@ -420,19 +424,17 @@ pub struct TargetSetupChange { impl ResourceSetupChange for TargetSetupChange { fn describe_changes(&self) -> Vec { let mut result = vec![]; - result.extend( - self.attachments_change - .deletes - .iter() - .map(|a| ChangeDescription::Action(a.describe_change())), - ); + self.attachments_change + .deletes + .iter() + .flat_map(|a| a.describe_changes().into_iter()) + .for_each(|change| result.push(ChangeDescription::Action(change))); result.extend(self.target_change.describe_changes()); - result.extend( - self.attachments_change - .upserts - .iter() - .map(|a| ChangeDescription::Action(a.describe_change())), - ); + self.attachments_change + .upserts + .iter() + .flat_map(|a| a.describe_changes().into_iter()) + .for_each(|change| result.push(ChangeDescription::Action(change))); result }