Skip to content

Commit 62c33c8

Browse files
authored
[support bundle] Ensure that the transfer buffers are flushed (#8708)
Fixes #8681
1 parent 49da865 commit 62c33c8

File tree

2 files changed

+51
-30
lines changed

2 files changed

+51
-30
lines changed

nexus/src/app/background/tasks/support_bundle_collector.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,11 +592,12 @@ impl BundleCollection {
592592
})?;
593593

594594
// Only stream at most "transfer_chunk_size" bytes at once
595-
let remaining = std::cmp::min(
595+
let chunk_size = std::cmp::min(
596596
self.transfer_chunk_size.get(),
597597
total_len - offset,
598598
);
599-
let limited_file = file.take(remaining);
599+
600+
let limited_file = file.take(chunk_size);
600601
let stream = tokio_util::io::ReaderStream::new(limited_file);
601602
let body = reqwest::Body::wrap_stream(stream);
602603

@@ -605,16 +606,16 @@ impl BundleCollection {
605606
"Streaming bundle chunk";
606607
"bundle" => %self.bundle.id,
607608
"offset" => offset,
608-
"length" => remaining,
609+
"length" => chunk_size,
609610
);
610611

611612
sled_client.support_bundle_transfer(
612613
&zpool, &dataset, &support_bundle, offset, body
613614
).await.with_context(|| {
614-
format!("Failed to transfer bundle: {remaining}@{offset} of {total_len} to sled")
615+
format!("Failed to transfer bundle: {chunk_size}@{offset} of {total_len} to sled")
615616
})?;
616617

617-
offset += self.transfer_chunk_size.get();
618+
offset += chunk_size;
618619
}
619620

620621
sled_client
@@ -1561,7 +1562,7 @@ mod test {
15611562
.expect("Bundle should definitely be in db by this point");
15621563
assert_eq!(observed_bundle.state, SupportBundleState::Active);
15631564

1564-
// Download a file from the bundle, to verify that it was trasnferred
1565+
// Download a file from the bundle, to verify that it was transferred
15651566
// successfully.
15661567
let head = false;
15671568
let range = None;

sled-agent/src/support_bundle/storage.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,12 @@ impl<'a> SupportBundleManager<'a> {
591591
return Ok(digest.as_slice() == expected.as_ref());
592592
}
593593

594-
// A helper function which streams the contents of a bundle to a file.
594+
// A helper function which streams the contents of a bundle to a file,
595+
// and flushes it to the filesystem.
596+
//
597+
// Note that this doesn't necessarily "sync" it to the underlying disk,
598+
// but that's fine -- we just want to make sure the next call to "finalize"
599+
// will reliably see what we last wrote.
595600
async fn stream_bundle(
596601
mut tmp_file: tokio::fs::File,
597602
stream: impl Stream<Item = Result<Bytes, HttpError>>,
@@ -603,6 +608,18 @@ impl<'a> SupportBundleManager<'a> {
603608
let chunk = chunk?;
604609
tmp_file.write_all(&chunk).await?;
605610
}
611+
612+
// From the tokio docs:
613+
//
614+
// > A file will not be closed immediately when it goes out of scope if
615+
// > there are any IO operations that have not yet completed. To ensure
616+
// > that a file is closed immediately when it is dropped, you should
617+
// > call flush before dropping it.
618+
//
619+
// It is possible, although uncommon, for us to write to this file,
620+
// drop the handle to it, and for it to have not been fully written to
621+
// storage.
622+
tmp_file.flush().await?;
606623
Ok(())
607624
}
608625

@@ -726,9 +743,7 @@ impl<'a> SupportBundleManager<'a> {
726743
.open(&support_bundle_path_tmp)
727744
.await?;
728745

729-
tmp_file
730-
.seek(tokio::io::SeekFrom::Current(i64::try_from(offset)?))
731-
.await?;
746+
tmp_file.seek(tokio::io::SeekFrom::Start(offset)).await?;
732747

733748
if let Err(err) = Self::stream_bundle(tmp_file, stream).await {
734749
warn!(log, "Failed to write bundle to storage"; "error" => ?err);
@@ -743,7 +758,7 @@ impl<'a> SupportBundleManager<'a> {
743758
info!(log, "Bundle written successfully");
744759
let metadata = SupportBundleMetadata {
745760
support_bundle_id,
746-
state: SupportBundleState::Complete,
761+
state: SupportBundleState::Incomplete,
747762
};
748763
Ok(metadata)
749764
}
@@ -1551,26 +1566,31 @@ mod tests {
15511566
.expect("Should have started creation");
15521567

15531568
// Split the zipfile into halves, so we can transfer it in two chunks
1554-
let len1 = zipfile_data.len() / 2;
1555-
let stream1 = stream::once(async {
1556-
Ok(Bytes::copy_from_slice(&zipfile_data.as_slice()[..len1]))
1557-
});
1558-
let stream2 = stream::once(async {
1559-
Ok(Bytes::copy_from_slice(&zipfile_data.as_slice()[len1..]))
1560-
});
1561-
1562-
mgr.transfer(zpool_id, dataset_id, support_bundle_id, 0, stream1)
1569+
let total_len = zipfile_data.len();
1570+
let chunk_size = total_len / 2;
1571+
1572+
let mut offset = 0;
1573+
while offset < total_len {
1574+
let end_offset = std::cmp::min(offset + chunk_size, total_len);
1575+
let chunk = &zipfile_data[offset..end_offset];
1576+
1577+
let stream =
1578+
stream::once(async move { Ok(Bytes::copy_from_slice(chunk)) });
1579+
1580+
mgr.transfer(
1581+
zpool_id,
1582+
dataset_id,
1583+
support_bundle_id,
1584+
offset as u64,
1585+
stream,
1586+
)
15631587
.await
1564-
.expect("Should have transferred bundle (part1)");
1565-
mgr.transfer(
1566-
zpool_id,
1567-
dataset_id,
1568-
support_bundle_id,
1569-
len1 as u64,
1570-
stream2,
1571-
)
1572-
.await
1573-
.expect("Should have transferred bundle (part2)");
1588+
.unwrap_or_else(|_| {
1589+
panic!("Should have transferred chunk at offset {}", offset)
1590+
});
1591+
1592+
offset = end_offset;
1593+
}
15741594
let bundle = mgr
15751595
.finalize(zpool_id, dataset_id, support_bundle_id, hash)
15761596
.await

0 commit comments

Comments
 (0)