Skip to content

Commit b89650d

Browse files
committed
[ENH]: SPANN - Delete empty PLs
1 parent 091f8bd commit b89650d

File tree

1 file changed

+133
-28
lines changed

1 file changed

+133
-28
lines changed

rust/index/src/spann/types.rs

Lines changed: 133 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,58 @@ impl SpannIndexWriter {
766766
Ok(false)
767767
}
768768

769+
async fn try_delete_posting_list(&self, head_id: u32) -> Result<(), SpannIndexWriterError> {
770+
let _write_guard = self.posting_list_partitioned_mutex.lock(&head_id).await;
771+
if self.is_head_deleted(head_id as usize).await? {
772+
return Ok(());
773+
}
774+
let result = self
775+
.posting_list_writer
776+
.get_owned::<u32, &SpannPostingList<'_>>("", head_id)
777+
.await;
778+
// If the error is posting list not found, then return ok.
779+
match result {
780+
Ok(Some((doc_offset_ids, doc_versions, _))) => {
781+
let mut outdated_count = 0;
782+
for (doc_offset_id, doc_version) in doc_offset_ids.iter().zip(doc_versions.iter()) {
783+
if self.is_outdated(*doc_offset_id, *doc_version).await? {
784+
outdated_count += 1;
785+
}
786+
}
787+
if outdated_count == doc_offset_ids.len() {
788+
{
789+
let hnsw_write_guard = self.hnsw_index.inner.write();
790+
hnsw_write_guard
791+
.hnsw_index
792+
.delete(head_id as usize)
793+
.map_err(|e| {
794+
tracing::error!(
795+
"Error deleting head {} from hnsw index: {}",
796+
head_id,
797+
e
798+
);
799+
SpannIndexWriterError::HnswIndexMutateError(e)
800+
})?;
801+
}
802+
self.posting_list_writer
803+
.delete::<u32, &SpannPostingList<'_>>("", head_id)
804+
.await
805+
.map_err(|e| {
806+
tracing::error!(
807+
"Error deleting posting list for head {}: {}",
808+
head_id,
809+
e
810+
);
811+
SpannIndexWriterError::PostingListSetError(e)
812+
})?;
813+
}
814+
}
815+
Ok(None) => {}
816+
Err(_) => {}
817+
}
818+
Ok(())
819+
}
820+
769821
#[allow(clippy::too_many_arguments)]
770822
async fn collect_and_reassign_split_points(
771823
&self,
@@ -814,6 +866,8 @@ impl SpannIndexWriter {
814866
.await?;
815867
}
816868
}
869+
// Delete head if all points were moved out.
870+
self.try_delete_posting_list(new_head_ids[k] as u32).await?;
817871
}
818872
Ok(assigned_ids)
819873
}
@@ -946,17 +1000,20 @@ impl SpannIndexWriter {
9461000
let doc_versions;
9471001
let doc_embeddings;
9481002
{
949-
// TODO(Sanket): Check if head is deleted, can happen if another concurrent thread
950-
// deletes it.
951-
(doc_offset_ids, doc_versions, doc_embeddings) = self
1003+
let result = self
9521004
.posting_list_writer
9531005
.get_owned::<u32, &SpannPostingList<'_>>("", head_id as u32)
954-
.await
955-
.map_err(|e| {
956-
tracing::error!("Error getting posting list for head {}: {}", head_id, e);
957-
SpannIndexWriterError::PostingListGetError(e)
958-
})?
959-
.ok_or(SpannIndexWriterError::PostingListNotFound)?;
1006+
.await;
1007+
match result {
1008+
Ok(Some((offset_ids, versions, embeddings))) => {
1009+
doc_offset_ids = offset_ids;
1010+
doc_versions = versions;
1011+
doc_embeddings = embeddings;
1012+
}
1013+
// Posting list can be concurrent deleted so bail out early if not found.
1014+
Ok(None) => return Ok(()),
1015+
Err(e) => return Err(SpannIndexWriterError::PostingListGetError(e)),
1016+
}
9601017
}
9611018
for (index, doc_offset_id) in doc_offset_ids.iter().enumerate() {
9621019
if assigned_ids.contains(doc_offset_id)
@@ -1004,6 +1061,8 @@ impl SpannIndexWriter {
10041061
)
10051062
.await?;
10061063
}
1064+
// Delete head if all points were moved out.
1065+
self.try_delete_posting_list(head_id as u32).await?;
10071066
Ok(())
10081067
}
10091068

@@ -1264,6 +1323,7 @@ impl SpannIndexWriter {
12641323
if !same_head
12651324
&& distance_function
12661325
.distance(&clustering_output.cluster_centers[k], &head_embedding)
1326+
.abs()
12671327
< 1e-6
12681328
{
12691329
same_head = true;
@@ -1350,17 +1410,32 @@ impl SpannIndexWriter {
13501410
}
13511411
if !same_head {
13521412
// Delete the old head
1353-
let hnsw_write_guard = self.hnsw_index.inner.write();
1354-
hnsw_write_guard
1355-
.hnsw_index
1356-
.delete(head_id as usize)
1413+
// First delete from hnsw then from postings list. This order
1414+
// ensures that the head is never dangling.
1415+
{
1416+
let hnsw_write_guard = self.hnsw_index.inner.write();
1417+
hnsw_write_guard
1418+
.hnsw_index
1419+
.delete(head_id as usize)
1420+
.map_err(|e| {
1421+
tracing::error!(
1422+
"Error deleting head {} from hnsw index: {}",
1423+
head_id,
1424+
e
1425+
);
1426+
SpannIndexWriterError::HnswIndexMutateError(e)
1427+
})?;
1428+
}
1429+
self.posting_list_writer
1430+
.delete::<u32, &SpannPostingList<'_>>("", head_id)
1431+
.await
13571432
.map_err(|e| {
13581433
tracing::error!(
1359-
"Error deleting head {} from hnsw index: {}",
1434+
"Error deleting posting list for head {}: {}",
13601435
head_id,
13611436
e
13621437
);
1363-
SpannIndexWriterError::HnswIndexMutateError(e)
1438+
SpannIndexWriterError::PostingListSetError(e)
13641439
})?;
13651440
self.stats
13661441
.num_heads_deleted
@@ -1755,12 +1830,29 @@ impl SpannIndexWriter {
17551830
self.stats
17561831
.num_pl_modified
17571832
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1758-
// Delete from hnsw.
1759-
let hnsw_write_guard = self.hnsw_index.inner.write();
1760-
hnsw_write_guard.hnsw_index.delete(head_id).map_err(|e| {
1761-
tracing::error!("Error deleting head {} from hnsw index: {}", head_id, e);
1762-
SpannIndexWriterError::HnswIndexMutateError(e)
1763-
})?;
1833+
{
1834+
// Delete from hnsw.
1835+
let hnsw_write_guard = self.hnsw_index.inner.write();
1836+
hnsw_write_guard.hnsw_index.delete(head_id).map_err(|e| {
1837+
tracing::error!(
1838+
"Error deleting head {} from hnsw index: {}",
1839+
head_id,
1840+
e
1841+
);
1842+
SpannIndexWriterError::HnswIndexMutateError(e)
1843+
})?;
1844+
}
1845+
self.posting_list_writer
1846+
.delete::<u32, &SpannPostingList<'_>>("", head_id as u32)
1847+
.await
1848+
.map_err(|e| {
1849+
tracing::error!(
1850+
"Error deleting posting list for head {}: {}",
1851+
head_id,
1852+
e
1853+
);
1854+
SpannIndexWriterError::PostingListSetError(e)
1855+
})?;
17641856
self.stats
17651857
.num_heads_deleted
17661858
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@@ -1779,18 +1871,31 @@ impl SpannIndexWriter {
17791871
self.stats
17801872
.num_pl_modified
17811873
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1782-
// Delete from hnsw.
1783-
let hnsw_write_guard = self.hnsw_index.inner.write();
1784-
hnsw_write_guard
1785-
.hnsw_index
1786-
.delete(nearest_head_id)
1874+
{
1875+
// Delete from hnsw.
1876+
let hnsw_write_guard = self.hnsw_index.inner.write();
1877+
hnsw_write_guard
1878+
.hnsw_index
1879+
.delete(nearest_head_id)
1880+
.map_err(|e| {
1881+
tracing::error!(
1882+
"Error deleting head {} from hnsw index: {}",
1883+
nearest_head_id,
1884+
e
1885+
);
1886+
SpannIndexWriterError::HnswIndexMutateError(e)
1887+
})?;
1888+
}
1889+
self.posting_list_writer
1890+
.delete::<u32, &SpannPostingList<'_>>("", nearest_head_id as u32)
1891+
.await
17871892
.map_err(|e| {
17881893
tracing::error!(
1789-
"Error deleting head {} from hnsw index: {}",
1894+
"Error deleting posting list for head {}: {}",
17901895
nearest_head_id,
17911896
e
17921897
);
1793-
SpannIndexWriterError::HnswIndexMutateError(e)
1898+
SpannIndexWriterError::PostingListSetError(e)
17941899
})?;
17951900
self.stats
17961901
.num_heads_deleted

0 commit comments

Comments
 (0)