Skip to content

Commit 99272ca

Browse files
authored
wait for snapshotting request to actually finish (#22)
1 parent cfa5300 commit 99272ca

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

src/workload.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,11 @@ impl Workload {
217217

218218
// Starts snapshotting process as the data was ingested properly
219219
log::info!("Run: trigger collection snapshot in the background");
220-
let snapshotting_handle = self.trigger_continuous_snapshotting(client);
220+
221+
// Graceful shutdown of the snapshotting task
222+
let is_run_finished = Arc::new(AtomicBool::new(false));
223+
let finish = is_run_finished.clone();
224+
let snapshotting_handle = self.trigger_continuous_snapshotting(client, finish);
221225

222226
log::info!("Run: set payload");
223227
for point_id in 1..self.points_count {
@@ -268,7 +272,7 @@ impl Workload {
268272
let _telemetry = get_telemetry(http_client).await?;
269273

270274
// Stop ongoing snapshotting task
271-
snapshotting_handle.abort();
275+
is_run_finished.store(true, Ordering::Relaxed);
272276
match snapshotting_handle.await {
273277
Ok(Ok(())) => (),
274278
Err(_join_error) => (), // ignore JoinError
@@ -323,12 +327,13 @@ impl Workload {
323327
fn trigger_continuous_snapshotting(
324328
&self,
325329
client: &Qdrant,
330+
finish: Arc<AtomicBool>,
326331
) -> JoinHandle<Result<(), CrasherError>> {
327332
let collection_name = self.collection_name.clone();
328333
let client_snapshot = client.clone();
329334
let stopped = self.stopped.clone();
330335
tokio::spawn(async move {
331-
while !stopped.load(Ordering::Relaxed) {
336+
while !stopped.load(Ordering::Relaxed) && !finish.load(Ordering::Relaxed) {
332337
if let Err(err) =
333338
churn_collection_snapshot(&client_snapshot, &collection_name).await
334339
{

0 commit comments

Comments
 (0)