Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/text_embedding/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ def text_to_embedding(
) -> cocoindex.DataSlice[NDArray[np.float32]]:
"""
Embed the text using a SentenceTransformer model.
This is a shared logic between indexing and querying, so extract it as a function.
"""
This is a shared logic between indexing and querying, so extract it as a function."""
# You can also switch to remote embedding model:
# return text.transform(
# cocoindex.functions.EmbedText(
Expand Down
82 changes: 40 additions & 42 deletions src/ops/targets/shared/table_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,56 +53,54 @@ impl<T: Eq + Serialize + DeserializeOwned> TableMainSetupAction<T> {
for<'a> &'a S: Into<Cow<'a, TableColumnsSchema<T>>>,
T: Clone,
{
let desired_cols: Option<Cow<'_, TableColumnsSchema<T>>> =
desired_state.map(|desired| desired.into());
let existing_may_exists = existing.possible_versions().next().is_some();
let possible_existing_cols: Vec<Cow<'_, TableColumnsSchema<T>>> = existing
.possible_versions()
.map(Into::<Cow<'_, TableColumnsSchema<T>>>::into)
.collect();
let drop_existing = desired_cols.as_ref().is_none_or(|desired| {
existing_invalidated
|| possible_existing_cols
.iter()
.any(|v| v.key_columns != desired.key_columns)
});
let Some(desired_state) = desired_state else {
return Self {
drop_existing: existing_may_exists,
table_upsertion: None,
};
};

let table_upsertion = desired_cols
.map(|desired| {
if !existing_invalidated
&& existing.always_exists()
&& possible_existing_cols
.iter()
.all(|v| desired.key_columns == v.key_columns)
{
TableUpsertionAction::Update {
columns_to_delete: possible_existing_cols
.iter()
.flat_map(|v| v.value_columns.keys())
.filter(|column_name| !desired.value_columns.contains_key(*column_name))
.cloned()
.collect(),
columns_to_upsert: desired
.value_columns
let desired_cols: Cow<'_, TableColumnsSchema<T>> = desired_state.into();
let drop_existing = existing_invalidated
|| possible_existing_cols
.iter()
.any(|v| v.key_columns != desired_cols.key_columns)
|| (existing_may_exists && !existing.always_exists());

let table_upsertion = if existing.always_exists() && !drop_existing {
TableUpsertionAction::Update {
columns_to_delete: possible_existing_cols
.iter()
.flat_map(|v| v.value_columns.keys())
.filter(|column_name| !desired_cols.value_columns.contains_key(*column_name))
.cloned()
.collect(),
columns_to_upsert: desired_cols
.value_columns
.iter()
.filter(|(column_name, schema)| {
!possible_existing_cols
.iter()
.filter(|(column_name, schema)| {
!possible_existing_cols
.iter()
.all(|v| v.value_columns.get(*column_name) == Some(schema))
})
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect(),
}
} else {
TableUpsertionAction::Create {
keys: desired.key_columns.to_owned(),
values: desired.value_columns.to_owned(),
}
}
})
.filter(|action| !action.is_empty());
.all(|v| v.value_columns.get(*column_name) == Some(schema))
})
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect(),
}
} else {
TableUpsertionAction::Create {
keys: desired_cols.key_columns.to_owned(),
values: desired_cols.value_columns.to_owned(),
}
};

Self {
drop_existing,
table_upsertion,
table_upsertion: Some(table_upsertion).filter(|action| !action.is_empty()),
}
}

Expand Down
Loading