Skip to content

Commit 09b3101

Browse files
authored
Abort multipart uploads on error (#149)
1 parent ee6ad25 commit 09b3101

File tree

1 file changed

+24
-8
lines changed

1 file changed

+24
-8
lines changed

obstore/src/put.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,24 @@ async fn put_multipart_inner(
445445
}
446446

447447
let upload = store.put_multipart_opts(path, opts).await?;
448-
let mut write = WriteMultipart::new(upload);
448+
let mut writer = WriteMultipart::new_with_chunk_size(upload, chunk_size);
449+
450+
// Make sure to call abort if the multipart upload failed for any reason
451+
match write_multipart(&mut writer, reader, chunk_size, max_concurrency).await {
452+
Ok(()) => Ok(PyPutResult(writer.finish().await?)),
453+
Err(err) => {
454+
writer.abort().await?;
455+
Err(err)
456+
}
457+
}
458+
}
449459

460+
async fn write_multipart(
461+
writer: &mut WriteMultipart,
462+
reader: PutInput,
463+
chunk_size: usize,
464+
max_concurrency: usize,
465+
) -> PyObjectStoreResult<()> {
450466
// Match across pull, push, async push
451467
match reader {
452468
PutInput::Pull(mut pull_reader) => loop {
@@ -455,24 +471,24 @@ async fn put_multipart_inner(
455471
if read_size == 0 {
456472
break;
457473
} else {
458-
write.wait_for_capacity(max_concurrency).await?;
459-
write.write(&scratch_buffer[0..read_size]);
474+
writer.wait_for_capacity(max_concurrency).await?;
475+
writer.write(&scratch_buffer[0..read_size]);
460476
}
461477
},
462478
PutInput::SyncPush(push_reader) => {
463479
for buf in push_reader {
464-
write.wait_for_capacity(max_concurrency).await?;
465-
write.write(&buf?);
480+
writer.wait_for_capacity(max_concurrency).await?;
481+
writer.put(buf?);
466482
}
467483
}
468484
PutInput::AsyncPush(mut push_reader) => {
469485
// Note: I believe that only one __anext__ call can happen at a time
470486
while let Some(buf) = push_reader.next_chunk().await? {
471-
write.wait_for_capacity(max_concurrency).await?;
472-
write.write(&buf);
487+
writer.wait_for_capacity(max_concurrency).await?;
488+
writer.put(buf);
473489
}
474490
}
475491
}
476492

477-
Ok(PyPutResult(write.finish().await?))
493+
Ok(())
478494
}

0 commit comments

Comments
 (0)