Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions nativelink-store/src/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ use tracing::{error, info};
use crate::cas_utils::is_zero_digest;
use crate::common_s3_utils::{BodyWrapper, TlsClient};

// S3 object cannot be larger than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MAX_UPLOAD_SIZE: u64 = 5 * 1024 * 1024 * 1024 * 1024; // 5TB.

// S3 parts cannot be smaller than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB.
Expand All @@ -67,7 +71,8 @@ const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB.

// S3 parts cannot be more than this number. See:
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
const MAX_UPLOAD_PARTS: usize = 10_000;
// Note: Type 'u64' chosen to simplify calculations
const MAX_UPLOAD_PARTS: u64 = 10_000;

// Default max buffer size for retrying upload requests.
// Note: If you change this, adjust the docs in the config.
Expand Down Expand Up @@ -278,6 +283,14 @@ where
UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz,
};

// Sanity check S3 maximum upload size.
if max_size > MAX_UPLOAD_SIZE {
return Err(make_err!(
Code::FailedPrecondition,
"File size exceeds max of {MAX_UPLOAD_SIZE}"
));
}

// Note(aaronmondal) It might be more optimal to use a different
// heuristic here, but for simplicity we use a hard coded value.
// Anything going down this if-statement will have the advantage of only
Expand Down Expand Up @@ -386,9 +399,24 @@ where
.await?;

// S3 requires us to upload in parts if the size is greater than 5GB. The part size must be at least
// 5mb (except last part) and can have up to 10,000 parts.
// 5MB (except last part) and can have up to 10,000 parts.

// Calculate of number of chunks if we upload in 5MB chucks (min chunk size), clamping to
// 10,000 parts and correcting for lossy integer division. This provides the
let chunk_count = (max_size / MIN_MULTIPART_SIZE).clamp(0, MAX_UPLOAD_PARTS - 1) + 1;

// Using clamped first approximation of number of chunks, calculate byte count of each
// chunk, excluding last chunk, clamping to min/max upload size 5MB, 5GB.
let bytes_per_upload_part =
(max_size / (MIN_MULTIPART_SIZE - 1)).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);
(max_size / chunk_count).clamp(MIN_MULTIPART_SIZE, MAX_MULTIPART_SIZE);

// Sanity check before continuing.
if !(MIN_MULTIPART_SIZE..MAX_MULTIPART_SIZE).contains(&bytes_per_upload_part) {
return Err(make_err!(
Code::FailedPrecondition,
"Failed to calculate file chuck size (min, max, calc): {MIN_MULTIPART_SIZE}, {MAX_MULTIPART_SIZE}, {bytes_per_upload_part}",
));
}

let upload_parts = move || async move {
// This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part`
Expand Down Expand Up @@ -455,11 +483,8 @@ where
let mut upload_futures = FuturesUnordered::new();

let mut completed_parts = Vec::with_capacity(
usize::try_from(cmp::min(
MAX_UPLOAD_PARTS as u64,
(max_size / bytes_per_upload_part) + 1,
))
.err_tip(|| "Could not convert u64 to usize")?,
usize::try_from(cmp::min(MAX_UPLOAD_PARTS, chunk_count))
.err_tip(|| "Could not convert u64 to usize")?,
);
tokio::pin!(read_stream_fut);
loop {
Expand Down
146 changes: 146 additions & 0 deletions nativelink-store/tests/s3_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,149 @@ async fn has_with_expired_result() -> Result<(), Error> {

Ok(())
}

// Regression test for multipart upload chunk size calculation.
// Verifies that chunk_count calculation: (max_size / MIN_MULTIPART_SIZE).clamp(0, MAX_UPLOAD_PARTS - 1) + 1
// correctly handles files that result in the minimum chunk size being used.
#[nativelink_test]
async fn multipart_chunk_size_clamp_min() -> Result<(), Error> {
// Same as in s3_store.
const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5mb.
// Use 4 chunks to test chunk calculation without excessive memory allocation.
const AC_ENTRY_SIZE: usize = MIN_MULTIPART_SIZE * 3 + 50;

let mut send_data: Vec<u8> = Vec::with_capacity(AC_ENTRY_SIZE);
for i in 0..send_data.capacity() {
send_data.push(u8::try_from((i * 3) % 256).expect("modulo 256 always fits in u8"));
}
let digest = DigestInfo::try_new(VALID_HASH1, send_data.len())?;

let mock_client = StaticReplayClient::new(vec![
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploads",
))
.method("POST")
.body(SdkBody::empty())
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::from(
r#"
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<UploadId>Dummy-uploadid</UploadId>
</InitiateMultipartUploadResult>"#
.as_bytes(),
))
.unwrap(),
),
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber=1&uploadId=Dummy-uploadid",
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "5242880")
.body(SdkBody::from(&send_data[0..MIN_MULTIPART_SIZE]))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber=2&uploadId=Dummy-uploadid",
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "5242880")
.body(SdkBody::from(&send_data[MIN_MULTIPART_SIZE..MIN_MULTIPART_SIZE * 2]))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber=3&uploadId=Dummy-uploadid",
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "5242880")
.body(SdkBody::from(&send_data[MIN_MULTIPART_SIZE * 2..MIN_MULTIPART_SIZE * 3]))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?x-id=UploadPart&partNumber=4&uploadId=Dummy-uploadid",
))
.method("PUT")
.header("content-type", "application/octet-stream")
.header("content-length", "50")
.body(SdkBody::from(
&send_data[MIN_MULTIPART_SIZE * 3..MIN_MULTIPART_SIZE * 3 + 50],
))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::empty())
.unwrap(),
),
ReplayEvent::new(
http::Request::builder()
.uri(format!(
"https://{BUCKET_NAME}.s3.{REGION}.amazonaws.com/{VALID_HASH1}-{AC_ENTRY_SIZE}?uploadId=Dummy-uploadid",
))
.method("POST")
.header("content-length", "255")
.body(SdkBody::from(concat!(
r#"<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#,
"<Part><PartNumber>1</PartNumber></Part>",
"<Part><PartNumber>2</PartNumber></Part>",
"<Part><PartNumber>3</PartNumber></Part>",
"<Part><PartNumber>4</PartNumber></Part>",
"</CompleteMultipartUpload>",
)))
.unwrap(),
http::Response::builder()
.status(StatusCode::OK)
.body(SdkBody::from(concat!(
"<CompleteMultipartUploadResult>",
"</CompleteMultipartUploadResult>",
)))
.unwrap(),
),
]);
let test_config = Builder::new()
.behavior_version(BehaviorVersion::v2025_08_07())
.region(Region::from_static(REGION))
.http_client(mock_client.clone())
.build();
let s3_client = aws_sdk_s3::Client::from_conf(test_config);
let store = S3Store::new_with_client_and_jitter(
&ExperimentalAwsSpec {
bucket: BUCKET_NAME.to_string(),
..Default::default()
},
s3_client,
Arc::new(move |_delay| Duration::from_secs(0)),
MockInstantWrapped::default,
)?;
store
.update_oneshot(digest, send_data.clone().into())
.await
.unwrap();
mock_client.assert_requests_match(&[]);
Ok(())
}
Loading