Skip to content

Commit b89c1f8

Browse files
authored
Buffer more data in object store writer (#4619)
Mostly realized that this makes more sense: 1. Buffer is bigger than the chunk size, so we don't submit a part immediately when we pass the chunk size, which should give us some more concurrency. 2. We upload in buffer sizes that are aligned to 16MB, which the AWS perf guide seems to indicate is a good idea when reading it back. 3. Also fixes two typos. Signed-off-by: Adam Gutglick <[email protected]>
1 parent 7eafc3e commit b89c1f8

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

vortex-io/src/object_store.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,24 @@ use vortex_error::VortexResult;
1313

1414
use crate::{IoBuf, VortexWrite};
1515

16-
/// Adapter type to write data through a [`ObjectStore`] instace.
16+
/// Adapter type to write data through a [`ObjectStore`] instance.
1717
///
18-
/// After writing, the caller must make sure to call `shutdonw`, in order to ensure the data is actually persisted.
18+
/// After writing, the caller must make sure to call `shutdown`, in order to ensure the data is actually persisted.
1919
pub struct ObjectStoreWriter {
2020
upload: Box<dyn MultipartUpload>,
2121
buffer: BytesMut,
2222
put_result: Option<PutResult>,
2323
}
2424

25-
const CHUNKS_SIZE: usize = 25 * 1024 * 1024;
25+
const CHUNK_SIZE: usize = 16 * 1024 * 1024;
26+
const BUFFER_SIZE: usize = 128 * 1024 * 1024;
2627

2728
impl ObjectStoreWriter {
2829
pub async fn new(object_store: Arc<dyn ObjectStore>, location: &Path) -> VortexResult<Self> {
2930
let upload = object_store.put_multipart(location).await?;
3031
Ok(Self {
3132
upload,
32-
buffer: BytesMut::with_capacity(CHUNKS_SIZE),
33+
buffer: BytesMut::with_capacity(CHUNK_SIZE),
3334
put_result: None,
3435
})
3536
}
@@ -44,12 +45,15 @@ impl VortexWrite for ObjectStoreWriter {
4445
self.buffer.extend_from_slice(buffer.as_slice());
4546
let parts = FuturesUnordered::new();
4647

47-
// Split off chunks while buffer is larger than CHUNKS_SIZE
48-
while self.buffer.len() > CHUNKS_SIZE {
49-
let payload = self.buffer.split_to(CHUNKS_SIZE).freeze();
50-
let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
48+
// If the buffer is full
49+
if self.buffer.len() > BUFFER_SIZE {
50+
// Split off chunks while buffer is larger than CHUNKS_SIZE
51+
while self.buffer.len() > CHUNK_SIZE {
52+
let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
53+
let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
5154

52-
parts.push(part_fut);
55+
parts.push(part_fut);
56+
}
5357
}
5458

5559
parts.try_collect::<Vec<_>>().await?;
@@ -60,8 +64,8 @@ impl VortexWrite for ObjectStoreWriter {
6064
async fn flush(&mut self) -> io::Result<()> {
6165
let parts = FuturesUnordered::new();
6266

63-
while self.buffer.len() > CHUNKS_SIZE {
64-
let payload = self.buffer.split_to(CHUNKS_SIZE).freeze();
67+
while self.buffer.len() > CHUNK_SIZE {
68+
let payload = self.buffer.split_to(CHUNK_SIZE).freeze();
6569
let part_fut = self.upload.put_part(PutPayload::from_bytes(payload));
6670

6771
parts.push(part_fut);

0 commit comments

Comments
 (0)