Skip to content

Commit 158de85

Browse files
authored
fix(setup): drop existing table as long as not always exists (#741)
1 parent 1b0ac88 commit 158de85

File tree

2 files changed

+41
-44
lines changed

2 files changed

+41
-44
lines changed

examples/text_embedding/main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ def text_to_embedding(
1515
) -> cocoindex.DataSlice[NDArray[np.float32]]:
1616
"""
1717
Embed the text using a SentenceTransformer model.
18-
This is a shared logic between indexing and querying, so extract it as a function.
19-
"""
18+
This is a shared logic between indexing and querying, so extract it as a function."""
2019
# You can also switch to remote embedding model:
2120
# return text.transform(
2221
# cocoindex.functions.EmbedText(

src/ops/targets/shared/table_columns.rs

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -53,56 +53,54 @@ impl<T: Eq + Serialize + DeserializeOwned> TableMainSetupAction<T> {
5353
for<'a> &'a S: Into<Cow<'a, TableColumnsSchema<T>>>,
5454
T: Clone,
5555
{
56-
let desired_cols: Option<Cow<'_, TableColumnsSchema<T>>> =
57-
desired_state.map(|desired| desired.into());
56+
let existing_may_exists = existing.possible_versions().next().is_some();
5857
let possible_existing_cols: Vec<Cow<'_, TableColumnsSchema<T>>> = existing
5958
.possible_versions()
6059
.map(Into::<Cow<'_, TableColumnsSchema<T>>>::into)
6160
.collect();
62-
let drop_existing = desired_cols.as_ref().is_none_or(|desired| {
63-
existing_invalidated
64-
|| possible_existing_cols
65-
.iter()
66-
.any(|v| v.key_columns != desired.key_columns)
67-
});
61+
let Some(desired_state) = desired_state else {
62+
return Self {
63+
drop_existing: existing_may_exists,
64+
table_upsertion: None,
65+
};
66+
};
6867

69-
let table_upsertion = desired_cols
70-
.map(|desired| {
71-
if !existing_invalidated
72-
&& existing.always_exists()
73-
&& possible_existing_cols
74-
.iter()
75-
.all(|v| desired.key_columns == v.key_columns)
76-
{
77-
TableUpsertionAction::Update {
78-
columns_to_delete: possible_existing_cols
79-
.iter()
80-
.flat_map(|v| v.value_columns.keys())
81-
.filter(|column_name| !desired.value_columns.contains_key(*column_name))
82-
.cloned()
83-
.collect(),
84-
columns_to_upsert: desired
85-
.value_columns
68+
let desired_cols: Cow<'_, TableColumnsSchema<T>> = desired_state.into();
69+
let drop_existing = existing_invalidated
70+
|| possible_existing_cols
71+
.iter()
72+
.any(|v| v.key_columns != desired_cols.key_columns)
73+
|| (existing_may_exists && !existing.always_exists());
74+
75+
let table_upsertion = if existing.always_exists() && !drop_existing {
76+
TableUpsertionAction::Update {
77+
columns_to_delete: possible_existing_cols
78+
.iter()
79+
.flat_map(|v| v.value_columns.keys())
80+
.filter(|column_name| !desired_cols.value_columns.contains_key(*column_name))
81+
.cloned()
82+
.collect(),
83+
columns_to_upsert: desired_cols
84+
.value_columns
85+
.iter()
86+
.filter(|(column_name, schema)| {
87+
!possible_existing_cols
8688
.iter()
87-
.filter(|(column_name, schema)| {
88-
!possible_existing_cols
89-
.iter()
90-
.all(|v| v.value_columns.get(*column_name) == Some(schema))
91-
})
92-
.map(|(k, v)| (k.to_owned(), v.to_owned()))
93-
.collect(),
94-
}
95-
} else {
96-
TableUpsertionAction::Create {
97-
keys: desired.key_columns.to_owned(),
98-
values: desired.value_columns.to_owned(),
99-
}
100-
}
101-
})
102-
.filter(|action| !action.is_empty());
89+
.all(|v| v.value_columns.get(*column_name) == Some(schema))
90+
})
91+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
92+
.collect(),
93+
}
94+
} else {
95+
TableUpsertionAction::Create {
96+
keys: desired_cols.key_columns.to_owned(),
97+
values: desired_cols.value_columns.to_owned(),
98+
}
99+
};
100+
103101
Self {
104102
drop_existing,
105-
table_upsertion,
103+
table_upsertion: Some(table_upsertion).filter(|action| !action.is_empty()),
106104
}
107105
}
108106

0 commit comments

Comments
 (0)