Skip to content

Commit d69709a

Browse files
dyroDylan Ross
andauthored
feat(sdk): falls back to fs for large intermediate streams (#1341)
Co-authored-by: Dylan Ross <[email protected]>
1 parent 3383392 commit d69709a

File tree

5 files changed

+96
-26
lines changed

5 files changed

+96
-26
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ serde_json = { version = "1.0.117", features = ["preserve_order"] }
177177
serde_with = "3.11.0"
178178
serde-transcode = "1.1.1"
179179
sha1 = "0.10.6"
180-
sha2 = "0.10.6"
180+
sha2 = { version = "0.10.6", features = ["asm"] }
181181
spki = { version = "0.7.3", optional = true }
182182
static-iref = "3.0"
183183
tempfile = "=3.15.0"

sdk/src/settings/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ pub(crate) struct Core {
173173
compress_manifests: bool,
174174
#[serde(skip_serializing_if = "Option::is_none")]
175175
max_memory_usage: Option<u64>,
176+
backing_store_memory_threshold_in_mb: usize,
176177
// TODO: pending https://github.com/contentauth/c2pa-rs/pull/1180
177178
// prefer_update_manifests: bool,
178179
}
@@ -189,6 +190,7 @@ impl Default for Core {
189190
merkle_tree_max_proofs: 5,
190191
compress_manifests: true,
191192
max_memory_usage: None,
193+
backing_store_memory_threshold_in_mb: 512,
192194
// prefer_update_manifests: true,
193195
}
194196
}

sdk/src/store.rs

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ use crate::{
8484
status_tracker::{ErrorBehavior, StatusTracker},
8585
utils::{
8686
hash_utils::HashRange,
87-
io_utils::{insert_data_at, safe_vec, stream_len},
87+
io_utils,
88+
io_utils::{insert_data_at, stream_len},
8889
is_zero,
8990
patch::patch_bytes,
9091
},
@@ -98,7 +99,6 @@ use crate::{
9899
use crate::{external_manifest::ManifestPatchCallback, RemoteSigner};
99100

100101
const MANIFEST_STORE_EXT: &str = "c2pa"; // file extension for external manifests
101-
const MANIFEST_RESERVE_SIZE: usize = 10 * 1024 * 1024; // 10MB reserve size for manifest
102102

103103
pub(crate) struct ManifestHashes {
104104
pub manifest_box_hash: Vec<u8>,
@@ -2788,11 +2788,7 @@ impl Store {
27882788
.await?
27892789
};
27902790

2791-
let intermediate_output: Vec<u8> = safe_vec(
2792-
stream_len(input_stream)? + MANIFEST_RESERVE_SIZE as u64,
2793-
None,
2794-
)?;
2795-
let mut intermediate_stream = Cursor::new(intermediate_output);
2791+
let mut intermediate_stream = io_utils::stream_with_fs_fallback(None)?;
27962792

27972793
#[allow(unused_mut)] // Not mutable in the non-async case.
27982794
let mut jumbf_bytes = self.start_save_stream(
@@ -3314,12 +3310,7 @@ impl Store {
33143310
output_stream: &mut dyn CAIReadWrite,
33153311
reserve_size: usize,
33163312
) -> Result<Vec<u8>> {
3317-
// make sure we can hold the intermediate stream before attempting
3318-
let intermediate_output: Vec<u8> = safe_vec(
3319-
stream_len(input_stream)? + MANIFEST_RESERVE_SIZE as u64,
3320-
None,
3321-
)?;
3322-
let mut intermediate_stream = Cursor::new(intermediate_output);
3313+
let mut intermediate_stream = io_utils::stream_with_fs_fallback(None)?;
33233314

33243315
let pc = self.provenance_claim_mut().ok_or(Error::ClaimEncoding)?;
33253316

@@ -3345,11 +3336,7 @@ impl Store {
33453336
.get_writer(format)
33463337
.ok_or(Error::UnsupportedType)?;
33473338

3348-
let tmp_output: Vec<u8> = safe_vec(
3349-
stream_len(input_stream)? + MANIFEST_RESERVE_SIZE as u64,
3350-
None,
3351-
)?;
3352-
let mut tmp_stream = Cursor::new(tmp_output);
3339+
let mut tmp_stream = io_utils::stream_with_fs_fallback(None)?;
33533340
manifest_writer.remove_cai_store_from_stream(input_stream, &mut tmp_stream)?;
33543341

33553342
// add external ref if possible
@@ -3393,11 +3380,7 @@ impl Store {
33933380

33943381
// insert UUID boxes at the correct location if required
33953382
if let Some(merkle_uuid_boxes) = &bmff_hash.merkle_uuid_boxes {
3396-
let temp_data: Vec<u8> = safe_vec(
3397-
stream_len(&mut intermediate_stream)? + MANIFEST_RESERVE_SIZE as u64,
3398-
None,
3399-
)?;
3400-
let mut temp_stream = Cursor::new(temp_data);
3383+
let mut temp_stream = io_utils::stream_with_fs_fallback(None)?;
34013384

34023385
insert_data_at(
34033386
&mut intermediate_stream,

sdk/src/utils/io_utils.rs

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::{
2020
};
2121

2222
#[allow(unused)] // different code path for WASI
23-
use tempfile::{tempdir, Builder, NamedTempFile, TempDir};
23+
use tempfile::{tempdir, Builder, NamedTempFile, SpooledTempFile, TempDir};
2424

25-
use crate::{asset_io::rename_or_move, Error, Result};
25+
use crate::{asset_io::rename_or_move, settings::get_settings_value, Error, Result};
2626
// Replace data at arbitrary location and len in a file.
2727
// start_location is where the replacement data will start
2828
// replace_len is how many bytes from source to replaced starting a start_location
@@ -118,6 +118,44 @@ pub(crate) fn stream_len<R: Read + Seek + ?Sized>(reader: &mut R) -> Result<u64>
118118
Ok(len)
119119
}
120120

121+
#[cfg(target_arch = "wasm32")]
122+
fn stream_with_fs_fallback_wasm(
123+
_threshold_override: Option<usize>,
124+
) -> Result<std::io::Cursor<Vec<u8>>> {
125+
Ok(std::io::Cursor::new(Vec::new()))
126+
}
127+
128+
#[cfg(not(target_arch = "wasm32"))]
129+
fn stream_with_fs_fallback_file_io(threshold_override: Option<usize>) -> Result<SpooledTempFile> {
130+
let threshold = threshold_override.unwrap_or(get_settings_value::<usize>(
131+
"core.backing_store_memory_threshold_in_mb",
132+
)?);
133+
134+
Ok(SpooledTempFile::new(threshold))
135+
}
136+
137+
/// Will create a [Read], [Write], and [Seek] capable stream that will stay in memory
138+
/// as long as the threshold is not exceeded. The threshold is specified in MB in the
139+
/// settings under ""core.backing_store_memory_threshold_in_mb"
140+
///
141+
/// # Parameters
142+
/// - `threshold_override`: Optional override for the threshold value in MB. If provided, this
143+
/// value will be used instead of the one from settings.
144+
///
145+
/// # Errors
146+
/// - Returns an error if the threshold value from settings is not valid.
147+
///
148+
/// # Note
149+
/// This will return a an in-memory stream when the compilation target doesn't support file I/O.
150+
pub(crate) fn stream_with_fs_fallback(
151+
threshold_override: Option<usize>,
152+
) -> Result<impl Read + Write + Seek> {
153+
#[cfg(target_arch = "wasm32")]
154+
return stream_with_fs_fallback_wasm(threshold_override);
155+
#[cfg(not(target_arch = "wasm32"))]
156+
return stream_with_fs_fallback_file_io(threshold_override);
157+
}
158+
121159
// Returns a new Vec first making sure it can hold the desired capacity. Fill
122160
// with default value if provided
123161
pub(crate) fn safe_vec<T: Clone>(item_cnt: u64, init_with: Option<T>) -> Result<Vec<T>> {
@@ -386,4 +424,41 @@ mod tests {
386424
)
387425
.is_err());
388426
}
427+
428+
#[cfg(not(target_arch = "wasm32"))]
429+
#[test]
430+
fn test_safe_stream_threshold_behavior() {
431+
let mut stream = stream_with_fs_fallback_file_io(Some(10)).unwrap();
432+
433+
// Less data written than required to write to the FS.
434+
let small_data = b"small"; // 5 bytes
435+
stream.write_all(small_data).unwrap();
436+
assert!(!stream.is_rolled(), "data still in memory");
437+
438+
// Adds more data to exceed the threshold.
439+
let large_data = b"this is larger than 10 bytes total";
440+
stream.write_all(large_data).unwrap();
441+
assert!(stream.is_rolled(), "data moved to disk");
442+
}
443+
444+
#[cfg(not(target_arch = "wasm32"))]
445+
#[test]
446+
fn test_safe_stream_no_threshold_behavior() {
447+
let mut stream = stream_with_fs_fallback_file_io(None).unwrap();
448+
449+
// Less data written than required to write to the FS.
450+
let small_data = b"small"; // 5 bytes
451+
stream.write_all(small_data).unwrap();
452+
assert!(!stream.is_rolled(), "data still in memory");
453+
454+
let large_data = vec![0; 1024 * 1024]; // 1MB.
455+
let threshold =
456+
get_settings_value::<usize>("core.backing_store_memory_threshold_in_mb").unwrap();
457+
458+
for _ in 0..threshold {
459+
stream.write_all(&large_data).unwrap();
460+
}
461+
462+
assert!(stream.is_rolled(), "data moved to disk");
463+
}
389464
}

0 commit comments

Comments
 (0)