Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,10 @@ def export(
target_name,
_spec_kind(target_spec),
dump_engine_object(target_spec),
dump_engine_object(attachments),
[
{"kind": _spec_kind(att), **dump_engine_object(att)}
for att in attachments
],
dump_engine_object(index_options),
self._engine_data_collector,
setup_by_user,
Expand Down
8 changes: 8 additions & 0 deletions python/cocoindex/targets/_engine_builtin_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ class Postgres(op.TargetSpec):
table_name: str | None = None


class PostgresSqlAttachment(op.TargetAttachmentSpec):
"""Attachment to execute specified SQL statements for Postgres targets."""

name: str
setup_sql: str
teardown_sql: str | None = None


@dataclass
class QdrantConnection:
"""Connection spec for Qdrant."""
Expand Down
48 changes: 35 additions & 13 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,32 +642,48 @@ pub struct TypedTargetAttachmentState<F: TargetSpecificAttachmentFactoryBase + ?
}

/// A factory for target-specific attachments.
#[async_trait]
pub trait TargetSpecificAttachmentFactoryBase: Send + Sync + 'static {
type TargetKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type TargetSpec: DeserializeOwned + Send + Sync;
type Spec: DeserializeOwned + Send + Sync;
type SetupKey: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type SetupChange: interface::AttachmentSetupChange + Send + Sync;

fn name(&self) -> &str;

fn get_state(
&self,
target_name: &str,
target_spec: &Self::TargetSpec,
attachment_spec: Self::Spec,
) -> Result<TypedTargetAttachmentState<Self>>;

fn diff_setup_states(
async fn diff_setup_states(
&self,
key: &serde_json::Value,
new_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
target_key: &Self::TargetKey,
attachment_key: &Self::SetupKey,
new_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
context: &interface::FlowInstanceContext,
) -> Result<Option<Self::SetupChange>>;

/// Deserialize the setup key from a JSON value.
/// You can override this method to provide a custom deserialization logic, e.g. to perform backward compatible deserialization.
fn deserialize_setup_key(key: serde_json::Value) -> Result<Self::SetupKey> {
Ok(utils::deser::from_json_value(key)?)
}

fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
where
Self: Sized,
{
registry.register(
self.name().to_string(),
ExecutorFactory::TargetAttachment(Arc::new(self)),
)
}
}

#[async_trait]
Expand Down Expand Up @@ -695,19 +711,25 @@ impl<T: TargetSpecificAttachmentFactoryBase> TargetAttachmentFactory for T {
})
}

fn diff_setup_states(
async fn diff_setup_states(
&self,
key: &serde_json::Value,
target_key: &serde_json::Value,
attachment_key: &serde_json::Value,
new_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
context: &interface::FlowInstanceContext,
) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>> {
let setup_change = self.diff_setup_states(
&utils::deser::from_json_value(key.clone())?,
new_state
.map(|v| utils::deser::from_json_value(v))
.transpose()?,
from_json_combined_state(existing_states)?,
)?;
let setup_change = self
.diff_setup_states(
&utils::deser::from_json_value(target_key.clone())?,
&utils::deser::from_json_value(attachment_key.clone())?,
new_state
.map(|v| utils::deser::from_json_value(v))
.transpose()?,
from_json_combined_state(existing_states)?,
context,
)
.await?;
Ok(setup_change.map(|s| Box::new(s) as Box<dyn AttachmentSetupChange + Send + Sync>))
}
}
9 changes: 6 additions & 3 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,12 @@ pub struct TargetAttachmentState {

#[async_trait]
pub trait AttachmentSetupChange {
fn describe_change(&self) -> String;
fn describe_changes(&self) -> Vec<String>;

async fn apply_change(&self) -> Result<()>;
}

#[async_trait]
pub trait TargetAttachmentFactory: Send + Sync {
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
/// This should always return the canonical serialized form.
Expand All @@ -341,11 +342,13 @@ pub trait TargetAttachmentFactory: Send + Sync {
) -> Result<TargetAttachmentState>;

/// Should return Some if and only if any changes are needed.
fn diff_setup_states(
async fn diff_setup_states(
&self,
key: &serde_json::Value,
target_key: &serde_json::Value,
attachment_key: &serde_json::Value,
new_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
context: &interface::FlowInstanceContext,
) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>>;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
functions::embed_text::register(registry)?;
functions::split_by_separators::register(registry)?;

targets::postgres::Factory::default().register(registry)?;
targets::postgres::register(registry)?;
targets::qdrant::register(registry)?;
targets::kuzu::register(registry, reqwest_client)?;

Expand Down
125 changes: 122 additions & 3 deletions src/ops/targets/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ impl ExportContext {
}
}

#[derive(Default)]
pub struct Factory {}
struct TargetFactory;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct TableId {
Expand Down Expand Up @@ -614,7 +613,7 @@ impl SetupChange {
}

#[async_trait]
impl TargetFactoryBase for Factory {
impl TargetFactoryBase for TargetFactory {
type Spec = Spec;
type DeclarationSpec = ();
type SetupState = SetupState;
Expand Down Expand Up @@ -752,3 +751,123 @@ impl TargetFactoryBase for Factory {
Ok(())
}
}

////////////////////////////////////////////////////////////
// Attachment Factory
////////////////////////////////////////////////////////////

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SqlStatementAttachmentSpec {
name: String,
setup_sql: String,
teardown_sql: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SqlStatementAttachmentState {
setup_sql: String,
teardown_sql: Option<String>,
}

pub struct SqlStatementAttachmentSetupChange {
db_pool: PgPool,
setup_sql_to_run: Option<String>,
teardown_sql_to_run: IndexSet<String>,
}

#[async_trait]
impl AttachmentSetupChange for SqlStatementAttachmentSetupChange {
fn describe_changes(&self) -> Vec<String> {
let mut result = vec![];
for teardown_sql in self.teardown_sql_to_run.iter() {
result.push(format!("Run teardown SQL: {}", teardown_sql));
}
if let Some(setup_sql) = &self.setup_sql_to_run {
result.push(format!("Run setup SQL: {}", setup_sql));
}
result
}

async fn apply_change(&self) -> Result<()> {
for teardown_sql in self.teardown_sql_to_run.iter() {
sqlx::query(teardown_sql).execute(&self.db_pool).await?;
}
if let Some(setup_sql) = &self.setup_sql_to_run {
sqlx::query(setup_sql).execute(&self.db_pool).await?;
}
Ok(())
}
}

struct SqlAttachmentFactory;

#[async_trait]
impl TargetSpecificAttachmentFactoryBase for SqlAttachmentFactory {
type TargetKey = TableId;
type TargetSpec = Spec;
type Spec = SqlStatementAttachmentSpec;
type SetupKey = String;
type SetupState = SqlStatementAttachmentState;
type SetupChange = SqlStatementAttachmentSetupChange;

fn name(&self) -> &str {
"PostgresSqlAttachment"
}

fn get_state(
&self,
_target_name: &str,
_target_spec: &Spec,
attachment_spec: SqlStatementAttachmentSpec,
) -> Result<TypedTargetAttachmentState<Self>> {
Ok(TypedTargetAttachmentState {
setup_key: attachment_spec.name,
setup_state: SqlStatementAttachmentState {
setup_sql: attachment_spec.setup_sql,
teardown_sql: attachment_spec.teardown_sql,
},
})
}

async fn diff_setup_states(
&self,
target_key: &TableId,
_attachment_key: &String,
new_state: Option<SqlStatementAttachmentState>,
existing_states: setup::CombinedState<SqlStatementAttachmentState>,
context: &interface::FlowInstanceContext,
) -> Result<Option<SqlStatementAttachmentSetupChange>> {
let teardown_sql_to_run: IndexSet<String> = if new_state.is_none() {
existing_states
.possible_versions()
.filter_map(|s| s.teardown_sql.clone())
.collect()
} else {
IndexSet::new()
};
let setup_sql_to_run = if let Some(new_state) = new_state
&& !existing_states.always_exists_and(|s| s.setup_sql == new_state.setup_sql)
{
Some(new_state.setup_sql)
} else {
None
};
let change = if setup_sql_to_run.is_some() || !teardown_sql_to_run.is_empty() {
let db_pool = get_db_pool(target_key.database.as_ref(), &context.auth_registry).await?;
Some(SqlStatementAttachmentSetupChange {
db_pool,
setup_sql_to_run,
teardown_sql_to_run,
})
} else {
None
};
Ok(change)
}
}

pub fn register(registry: &mut ExecutorFactoryRegistry) -> Result<()> {
TargetFactory.register(registry)?;
SqlAttachmentFactory.register(registry)?;
Ok(())
}
20 changes: 16 additions & 4 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,11 @@ fn group_states<K: Hash + Eq + std::fmt::Display + std::fmt::Debug + Clone, S: D
Ok(grouped)
}

fn collect_attachments_setup_change(
async fn collect_attachments_setup_change(
target_key: &serde_json::Value,
desired: Option<&TargetSetupState>,
existing: &CombinedState<TargetSetupState>,
context: &interface::FlowInstanceContext,
) -> Result<AttachmentsSetupChange> {
let existing_current_attachments = existing
.current
Expand Down Expand Up @@ -309,8 +311,15 @@ fn collect_attachments_setup_change(
for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() {
let factory = get_attachment_factory(&kind)?;
let is_upsertion = setup_state.desired.is_some();
if let Some(action) =
factory.diff_setup_states(&key, setup_state.desired, setup_state.existing)?
if let Some(action) = factory
.diff_setup_states(
&target_key,
&key,
setup_state.desired,
setup_state.existing,
context,
)
.await?
{
if is_upsertion {
attachments_change.upserts.push(action);
Expand Down Expand Up @@ -411,9 +420,12 @@ pub async fn diff_flow_setup_states(
};

let attachments_change = collect_attachments_setup_change(
&resource_id.key,
target_states_group.desired.as_ref(),
&target_states_group.existing,
)?;
&flow_instance_ctx,
)
.await?;

let desired_state = target_states_group.desired.clone();
let target_state = target_states_group
Expand Down
26 changes: 14 additions & 12 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl<T> CombinedState<T> {
self.current.is_some() && self.staging.iter().all(|s| !s.is_delete())
}

pub fn always_exists_and(&self, predicate: impl Fn(&T) -> bool) -> bool {
self.always_exists() && self.possible_versions().all(predicate)
}

pub fn legacy_values<V: Ord + Eq, F: Fn(&T) -> &V>(
&self,
desired: Option<&T>,
Expand Down Expand Up @@ -420,19 +424,17 @@ pub struct TargetSetupChange {
impl ResourceSetupChange for TargetSetupChange {
fn describe_changes(&self) -> Vec<ChangeDescription> {
let mut result = vec![];
result.extend(
self.attachments_change
.deletes
.iter()
.map(|a| ChangeDescription::Action(a.describe_change())),
);
self.attachments_change
.deletes
.iter()
.flat_map(|a| a.describe_changes().into_iter())
.for_each(|change| result.push(ChangeDescription::Action(change)));
result.extend(self.target_change.describe_changes());
result.extend(
self.attachments_change
.upserts
.iter()
.map(|a| ChangeDescription::Action(a.describe_change())),
);
self.attachments_change
.upserts
.iter()
.flat_map(|a| a.describe_changes().into_iter())
.for_each(|change| result.push(ChangeDescription::Action(change)));
result
}

Expand Down