Skip to content

Commit d5d3b79

Browse files
committed
feat: delete chunk groups in batch instead of one big query + lint the error formatting in dataset_operator.rs
1 parent 35b84aa commit d5d3b79

File tree

1 file changed

+81
-74
lines changed

1 file changed

+81
-74
lines changed

server/src/operators/dataset_operator.rs

Lines changed: 81 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub async fn create_datasets_query(
8787
.get_results::<Dataset>(&mut conn)
8888
.await
8989
.map_err(|err| {
90-
log::error!("Could not create dataset batch: {}", err);
90+
log::error!("Could not create dataset batch: {err}");
9191
ServiceError::BadRequest(
9292
"Could not create dataset batch due to pg error".to_string(),
9393
)
@@ -99,7 +99,7 @@ pub async fn create_datasets_query(
9999
.get_results::<Dataset>(&mut conn)
100100
.await
101101
.map_err(|err| {
102-
log::error!("Could not create dataset batch: {}", err);
102+
log::error!("Could not create dataset batch: {err}");
103103
ServiceError::BadRequest(
104104
"Could not create dataset batch due to pg error".to_string(),
105105
)
@@ -428,13 +428,12 @@ pub async fn soft_delete_dataset_by_id_query(
428428
}
429429

430430
diesel::sql_query(format!(
431-
"UPDATE datasets SET deleted = 1, tracking_id = NULL WHERE id = '{}'::uuid",
432-
id
431+
"UPDATE datasets SET deleted = 1, tracking_id = NULL WHERE id = '{id}'::uuid",
433432
))
434433
.execute(&mut conn)
435434
.await
436435
.map_err(|err| {
437-
log::error!("Could not delete dataset: {}", err);
436+
log::error!("Could not delete dataset: {err}");
438437
ServiceError::BadRequest("Could not delete dataset".to_string())
439438
})?;
440439

@@ -524,40 +523,60 @@ pub async fn clear_dataset_query(
524523

525524
let qdrant_collection = get_qdrant_collection_from_dataset_config(&dataset_config);
526525

527-
let chunk_groups = chunk_group::chunk_group
528-
.filter(chunk_group::dataset_id.eq(id))
529-
.filter(chunk_group::created_at.le(deleted_at))
530-
.select(chunk_group::id)
531-
.load::<uuid::Uuid>(&mut conn)
526+
// Delete chunk groups and bookmarks in batches using offset
527+
let mut last_group_offset_id = uuid::Uuid::nil();
528+
loop {
529+
let chunk_group_ids: Vec<uuid::Uuid> = chunk_group::chunk_group
530+
.filter(chunk_group::dataset_id.eq(id))
531+
.filter(chunk_group::created_at.le(deleted_at))
532+
.filter(chunk_group::id.gt(last_group_offset_id))
533+
.select(chunk_group::id)
534+
.order(chunk_group::id)
535+
.limit(
536+
option_env!("DELETE_CHUNK_BATCH_SIZE")
537+
.unwrap_or("5000")
538+
.parse::<i64>()
539+
.unwrap_or(5000),
540+
)
541+
.load::<uuid::Uuid>(&mut conn)
542+
.await
543+
.map_err(|err| {
544+
log::error!("Could not fetch chunk group ids: {err}");
545+
ServiceError::BadRequest("Could not fetch chunk group ids".to_string())
546+
})?;
547+
548+
if chunk_group_ids.is_empty() {
549+
break;
550+
}
551+
552+
diesel::delete(
553+
chunk_group_bookmarks_columns::chunk_group_bookmarks
554+
.filter(chunk_group_bookmarks_columns::group_id.eq_any(&chunk_group_ids))
555+
.filter(chunk_group_bookmarks_columns::created_at.le(deleted_at)),
556+
)
557+
.execute(&mut conn)
532558
.await
533559
.map_err(|err| {
534-
log::error!("Could not fetch groups: {}", err);
535-
ServiceError::BadRequest("Could not fetch groups".to_string())
560+
log::error!("Could not delete chunk_group_bookmarks: {err}");
561+
ServiceError::BadRequest("Could not delete chunk_group_bookmarks".to_string())
536562
})?;
537563

538-
diesel::delete(
539-
chunk_group_bookmarks_columns::chunk_group_bookmarks
540-
.filter(chunk_group_bookmarks_columns::group_id.eq_any(chunk_groups))
541-
.filter(chunk_group_bookmarks_columns::created_at.le(deleted_at)),
542-
)
543-
.execute(&mut conn)
544-
.await
545-
.map_err(|err| {
546-
log::error!("Could not delete chunk_group_bookmarks: {}", err);
547-
ServiceError::BadRequest("Could not delete chunk_group_bookmarks".to_string())
548-
})?;
564+
diesel::delete(chunk_group::chunk_group.filter(chunk_group::id.eq_any(&chunk_group_ids)))
565+
.execute(&mut conn)
566+
.await
567+
.map_err(|err| {
568+
log::error!("Could not delete chunk groups: {err}");
569+
ServiceError::BadRequest("Could not delete chunk groups".to_string())
570+
})?;
549571

550-
diesel::delete(
551-
chunk_group::chunk_group
552-
.filter(chunk_group::dataset_id.eq(id))
553-
.filter(chunk_group::created_at.le(deleted_at)),
554-
)
555-
.execute(&mut conn)
556-
.await
557-
.map_err(|err| {
558-
log::error!("Could not delete groups: {}", err);
559-
ServiceError::BadRequest("Could not delete groups".to_string())
560-
})?;
572+
log::info!(
573+
"Deleted {} chunk groups for dataset {}",
574+
chunk_group_ids.len(),
575+
id
576+
);
577+
578+
last_group_offset_id = *chunk_group_ids.last().unwrap();
579+
}
561580

562581
diesel::delete(
563582
files_column::files
@@ -567,7 +586,7 @@ pub async fn clear_dataset_query(
567586
.execute(&mut conn)
568587
.await
569588
.map_err(|err| {
570-
log::error!("Could not delete files: {}", err);
589+
log::error!("Could not delete files: {err}");
571590
ServiceError::BadRequest("Could not delete files".to_string())
572591
})?;
573592

@@ -593,7 +612,7 @@ pub async fn clear_dataset_query(
593612
.load::<(uuid::Uuid, uuid::Uuid)>(&mut conn)
594613
.await
595614
.map_err(|err| {
596-
log::error!("Could not fetch chunk IDs: {}", err);
615+
log::error!("Could not fetch chunk IDs: {err}");
597616
ServiceError::BadRequest("Could not fetch chunk IDs to delete".to_string())
598617
})?;
599618

@@ -607,7 +626,7 @@ pub async fn clear_dataset_query(
607626
.collect::<Vec<uuid::Uuid>>();
608627

609628
if chunk_ids.is_empty() {
610-
log::info!("No more chunks to delete for dataset {}", id);
629+
log::info!("No more chunks to delete for dataset {id}");
611630
break;
612631
}
613632

@@ -618,20 +637,16 @@ pub async fn clear_dataset_query(
618637
.execute(&mut conn)
619638
.await
620639
.map_err(|err| {
621-
log::error!("Could not delete chunks in current batch: {}", err);
640+
log::error!("Could not delete chunks in current batch: {err}");
622641
ServiceError::BadRequest("Could not delete chunks in current batch".to_string())
623642
})?;
624643

625644
delete_points_from_qdrant(qdrant_point_ids, qdrant_collection.clone())
626645
.await
627646
.map_err(|err| {
628-
log::error!(
629-
"Could not delete points in current batch from qdrant: {}",
630-
err
631-
);
647+
log::error!("Could not delete points in current batch from qdrant: {err}");
632648
ServiceError::BadRequest(format!(
633-
"Could not delete points in current batch from qdrant: {}",
634-
err
649+
"Could not delete points in current batch from qdrant: {err}"
635650
))
636651
})?;
637652

@@ -677,11 +692,11 @@ pub async fn delete_dataset_by_id_query(
677692
bulk_delete_chunks_query(None, deleted_at, id, dataset_config.clone(), pool.clone())
678693
.await
679694
.map_err(|err| {
680-
log::error!("Failed to bulk delete chunks: {:?}", err);
695+
log::error!("Failed to bulk delete chunks: {err:?}");
681696
err
682697
})?;
683698

684-
log::info!("Bulk deleted chunks for dataset: {:?}", id);
699+
log::info!("Bulk deleted chunks for dataset: {id:?}");
685700
} else {
686701
clear_dataset_query(
687702
id,
@@ -698,7 +713,7 @@ pub async fn delete_dataset_by_id_query(
698713
.get_result(&mut conn)
699714
.await
700715
.map_err(|err| {
701-
log::error!("Could not delete dataset: {}", err);
716+
log::error!("Could not delete dataset: {err}");
702717
ServiceError::BadRequest("Could not delete dataset".to_string())
703718
})?;
704719

@@ -849,16 +864,14 @@ pub async fn get_tags_in_dataset_query(
849864
.offset((page - 1) * page_size)
850865
.load(&mut conn)
851866
.await
852-
.map_err(|err| {
853-
ServiceError::BadRequest(format!("Failed to get items with tags {}", err))
854-
})?;
867+
.map_err(|err| ServiceError::BadRequest(format!("Failed to get items with tags {err}")))?;
855868

856869
let total_count = dataset_tags_columns::dataset_tags
857870
.select(count(dataset_tags_columns::tag))
858871
.filter(dataset_tags_columns::dataset_id.eq(dataset_id))
859872
.first::<i64>(&mut conn)
860873
.await
861-
.map_err(|err| ServiceError::BadRequest(format!("Failed to get count of tags {}", err)))?;
874+
.map_err(|err| ServiceError::BadRequest(format!("Failed to get count of tags {err}")))?;
862875

863876
Ok((items, total_count))
864877
}
@@ -906,20 +919,20 @@ pub async fn add_words_to_dataset(
906919
.collect_vec();
907920

908921
let mut words_inserter = clickhouse_client.insert("words_datasets").map_err(|e| {
909-
log::error!("Error inserting words_datasets: {:?}", e);
910-
ServiceError::InternalServerError(format!("Error inserting words_datasets: {:?}", e))
922+
log::error!("Error inserting words_datasets: {e:?}");
923+
ServiceError::InternalServerError(format!("Error inserting words_datasets: {e:?}"))
911924
})?;
912925

913926
for row in rows {
914927
words_inserter.write(&row).await.map_err(|e| {
915-
log::error!("Error inserting words_datasets: {:?}", e);
916-
ServiceError::InternalServerError(format!("Error inserting words_datasets: {:?}", e))
928+
log::error!("Error inserting words_datasets: {e:?}");
929+
ServiceError::InternalServerError(format!("Error inserting words_datasets: {e:?}"))
917930
})?;
918931
}
919932

920933
words_inserter.end().await.map_err(|e| {
921-
log::error!("Error inserting words_datasets: {:?}", e);
922-
ServiceError::InternalServerError(format!("Error inserting words_datasets: {:?}", e))
934+
log::error!("Error inserting words_datasets: {e:?}");
935+
ServiceError::InternalServerError(format!("Error inserting words_datasets: {e:?}"))
923936
})?;
924937

925938
Ok(())
@@ -948,9 +961,8 @@ pub async fn scroll_words_from_dataset(
948961
word,
949962
count,
950963
FROM words_datasets
951-
WHERE dataset_id = '{}' AND id > '{}'
952-
",
953-
dataset_id, offset,
964+
WHERE dataset_id = '{dataset_id}' AND id > '{offset}'
965+
"
954966
);
955967

956968
if let Some(last_processed) = last_processed {
@@ -963,24 +975,23 @@ pub async fn scroll_words_from_dataset(
963975
.unwrap()
964976
)
965977
.map_err(|e| {
966-
log::error!("Error formatting last processed time: {:?}", e);
978+
log::error!("Error formatting last processed time: {e:?}");
967979
ServiceError::InternalServerError(format!(
968-
"Error formatting last processed time: {:?}",
969-
e
980+
"Error formatting last processed time: {e:?}"
970981
))
971982
})?
972983
);
973984
}
974985

975-
query = format!("{} ORDER BY id LIMIT {}", query, limit);
986+
query = format!("{query} ORDER BY id LIMIT {limit}");
976987

977988
let words = clickhouse_client
978989
.query(&query)
979990
.fetch_all::<WordDatasetCount>()
980991
.await
981992
.map_err(|e| {
982-
log::error!("Error fetching words from dataset: {:?}", e);
983-
ServiceError::InternalServerError(format!("Error fetching words from dataset: {:?}", e))
993+
log::error!("Error fetching words from dataset: {e:?}");
994+
ServiceError::InternalServerError(format!("Error fetching words from dataset: {e:?}"))
984995
})?;
985996

986997
if words.is_empty() {
@@ -998,21 +1009,17 @@ pub async fn update_dataset_last_processed_query(
9981009
let query = format!(
9991010
"
10001011
INSERT INTO dataset_words_last_processed (dataset_id, last_processed)
1001-
VALUES ('{}', now())
1002-
",
1003-
dataset_id
1012+
VALUES ('{dataset_id}', now())
1013+
"
10041014
);
10051015

10061016
clickhouse_client
10071017
.query(&query)
10081018
.execute()
10091019
.await
10101020
.map_err(|e| {
1011-
log::error!("Error updating last processed time: {:?}", e);
1012-
ServiceError::InternalServerError(format!(
1013-
"Error updating last processed time: {:?}",
1014-
e
1015-
))
1021+
log::error!("Error updating last processed time: {e:?}");
1022+
ServiceError::InternalServerError(format!("Error updating last processed time: {e:?}"))
10161023
})?;
10171024

10181025
Ok(())

0 commit comments

Comments
 (0)