diff --git a/examples/text_embedding/main.py b/examples/text_embedding/main.py index af99585a4..a4b3f254d 100644 --- a/examples/text_embedding/main.py +++ b/examples/text_embedding/main.py @@ -15,8 +15,7 @@ def text_to_embedding( ) -> cocoindex.DataSlice[NDArray[np.float32]]: """ Embed the text using a SentenceTransformer model. - This is a shared logic between indexing and querying, so extract it as a function. - """ + This is a shared logic between indexing and querying, so extract it as a function.""" # You can also switch to remote embedding model: # return text.transform( # cocoindex.functions.EmbedText( diff --git a/src/ops/targets/shared/table_columns.rs b/src/ops/targets/shared/table_columns.rs index 22c499500..af2fc768a 100644 --- a/src/ops/targets/shared/table_columns.rs +++ b/src/ops/targets/shared/table_columns.rs @@ -53,56 +53,54 @@ impl TableMainSetupAction { for<'a> &'a S: Into>>, T: Clone, { - let desired_cols: Option>> = - desired_state.map(|desired| desired.into()); + let existing_may_exists = existing.possible_versions().next().is_some(); let possible_existing_cols: Vec>> = existing .possible_versions() .map(Into::>>::into) .collect(); - let drop_existing = desired_cols.as_ref().is_none_or(|desired| { - existing_invalidated - || possible_existing_cols - .iter() - .any(|v| v.key_columns != desired.key_columns) - }); + let Some(desired_state) = desired_state else { + return Self { + drop_existing: existing_may_exists, + table_upsertion: None, + }; + }; - let table_upsertion = desired_cols - .map(|desired| { - if !existing_invalidated - && existing.always_exists() - && possible_existing_cols - .iter() - .all(|v| desired.key_columns == v.key_columns) - { - TableUpsertionAction::Update { - columns_to_delete: possible_existing_cols - .iter() - .flat_map(|v| v.value_columns.keys()) - .filter(|column_name| !desired.value_columns.contains_key(*column_name)) - .cloned() - .collect(), - columns_to_upsert: desired - .value_columns + let desired_cols: Cow<'_, TableColumnsSchema> = desired_state.into(); + let drop_existing = existing_invalidated + || possible_existing_cols + .iter() + .any(|v| v.key_columns != desired_cols.key_columns) + || (existing_may_exists && !existing.always_exists()); + + let table_upsertion = if existing.always_exists() && !drop_existing { + TableUpsertionAction::Update { + columns_to_delete: possible_existing_cols + .iter() + .flat_map(|v| v.value_columns.keys()) + .filter(|column_name| !desired_cols.value_columns.contains_key(*column_name)) + .cloned() + .collect(), + columns_to_upsert: desired_cols + .value_columns + .iter() + .filter(|(column_name, schema)| { + !possible_existing_cols .iter() - .filter(|(column_name, schema)| { - !possible_existing_cols - .iter() - .all(|v| v.value_columns.get(*column_name) == Some(schema)) - }) - .map(|(k, v)| (k.to_owned(), v.to_owned())) - .collect(), - } - } else { - TableUpsertionAction::Create { - keys: desired.key_columns.to_owned(), - values: desired.value_columns.to_owned(), - } - } - }) - .filter(|action| !action.is_empty()); + .all(|v| v.value_columns.get(*column_name) == Some(schema)) + }) + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect(), + } + } else { + TableUpsertionAction::Create { + keys: desired_cols.key_columns.to_owned(), + values: desired_cols.value_columns.to_owned(), + } + }; + Self { drop_existing, - table_upsertion, + table_upsertion: Some(table_upsertion).filter(|action| !action.is_empty()), } }