Skip to content

Commit 7c0e1f1

Browse files
rejuvenileclaude
andcommitted
Improve store pipeline reliability, filesystem performance, and caching
Rewrite existence cache to prevent stale positives by bypassing cache on update and cleaning entries on NotFound. Fix BatchUpdateBlobs duplicate digest handling. Add POSIX_FADV_SEQUENTIAL for read-ahead. Pre-set CAS files to 0o555 to avoid redundant chmod on hardlink. Fix LRU eviction ordering at startup by sorting files by atime. Add stall detector for store operations. Replace async Mutex with parking_lot in EvictingMap. Increase gRPC connections_per_endpoint default to 32. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 1f8c2fe commit 7c0e1f1

28 files changed

+1983
-593
lines changed

nativelink-config/src/stores.rs

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ pub struct RefSpec {
578578
pub name: String,
579579
}
580580

581-
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
581+
#[derive(Serialize, Deserialize, Debug, Clone)]
582582
#[serde(deny_unknown_fields)]
583583
pub struct FilesystemSpec {
584584
/// Path on the system where to store the actual content. This is where
@@ -599,7 +599,7 @@ pub struct FilesystemSpec {
599599

600600
/// Buffer size to use when reading files. Generally this should be left
601601
/// to the default value except for testing.
602-
/// Default: 32k.
602+
/// Default: 256k.
603603
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
604604
pub read_buffer_size: u32,
605605

@@ -624,6 +624,41 @@ pub struct FilesystemSpec {
624624
/// Default: 0
625625
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
626626
pub max_concurrent_writes: usize,
627+
628+
/// If true, use sync_data() instead of sync_all() when flushing writes
629+
/// to disk. sync_data() only syncs the file data without metadata
630+
/// (timestamps, permissions), which is faster. For content-addressed
631+
/// storage where the content is verified by hash, metadata sync is
632+
/// unnecessary and this significantly reduces write latency.
633+
/// Default: true
634+
#[serde(default = "default_sync_data_only")]
635+
pub sync_data_only: bool,
636+
637+
/// If true, skip writes when a blob with the same key already exists
638+
/// in the store. This is safe for content-addressed storage (CAS) where
639+
/// identical keys guarantee identical content. Do NOT enable this for
640+
/// stores where the same key can hold different content (e.g. action
641+
/// cache).
642+
/// When a duplicate write is skipped, the existing entry's access time
643+
/// is updated in the LRU to prevent premature eviction.
644+
/// Default: false
645+
#[serde(default)]
646+
pub content_is_immutable: bool,
647+
}
648+
649+
impl Default for FilesystemSpec {
650+
fn default() -> Self {
651+
Self {
652+
content_path: String::new(),
653+
temp_path: String::new(),
654+
read_buffer_size: 0,
655+
eviction_policy: None,
656+
block_size: 0,
657+
max_concurrent_writes: 0,
658+
sync_data_only: true,
659+
content_is_immutable: false,
660+
}
661+
}
627662
}
628663

629664
// NetApp ONTAP S3 Spec
@@ -1095,6 +1130,32 @@ pub struct GrpcEndpoint {
10951130
/// If not set or 0, defaults to 20 seconds.
10961131
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
10971132
pub http2_keepalive_timeout_s: u64,
1133+
1134+
/// Whether to set TCP_NODELAY on the connection socket.
1135+
/// Disables Nagle's algorithm, reducing latency for small writes.
1136+
/// Default: true
1137+
#[serde(default = "default_tcp_nodelay")]
1138+
pub tcp_nodelay: bool,
1139+
}
1140+
1141+
fn default_sync_data_only() -> bool {
1142+
true
1143+
}
1144+
1145+
fn default_tcp_nodelay() -> bool {
1146+
true
1147+
}
1148+
1149+
fn default_batch_update_threshold_bytes() -> u64 {
1150+
1_048_576
1151+
}
1152+
1153+
fn default_batch_coalesce_delay_ms() -> u64 {
1154+
10
1155+
}
1156+
1157+
const fn default_connections_per_endpoint() -> usize {
1158+
32
10981159
}
10991160

11001161
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -1121,8 +1182,8 @@ pub struct GrpcSpec {
11211182
pub max_concurrent_requests: usize,
11221183

11231184
/// The number of connections to make to each specified endpoint to balance
1124-
/// the load over multiple TCP connections. Default 1.
1125-
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
1185+
/// the load over multiple TCP connections. Default 16.
1186+
#[serde(default = "default_connections_per_endpoint", deserialize_with = "convert_numeric_with_shellexpand")]
11261187
pub connections_per_endpoint: usize,
11271188

11281189
/// Maximum time (seconds) allowed for a single RPC request (e.g. a
@@ -1132,6 +1193,35 @@ pub struct GrpcSpec {
11321193
/// Default: 120 (seconds)
11331194
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
11341195
pub rpc_timeout_s: u64,
1196+
1197+
/// Maximum blob size (in bytes) for using BatchUpdateBlobs instead of
1198+
/// ByteStream.Write. Blobs at or below this size skip per-blob streaming
1199+
/// overhead (UUID generation, resource_name, streaming setup). Only
1200+
/// applies to CAS stores, not AC.
1201+
///
1202+
/// Set to 0 to disable (all uploads use ByteStream.Write).
1203+
///
1204+
/// Default: 1048576 (1 MiB)
1205+
#[serde(
1206+
default = "default_batch_update_threshold_bytes",
1207+
deserialize_with = "convert_numeric_with_shellexpand"
1208+
)]
1209+
pub batch_update_threshold_bytes: u64,
1210+
1211+
/// Time window (in milliseconds) to coalesce multiple small blob uploads
1212+
/// into a single BatchUpdateBlobs RPC. Requires
1213+
/// `batch_update_threshold_bytes > 0`.
1214+
///
1215+
/// When > 0, incoming small uploads are buffered for up to this duration
1216+
/// before being sent as one batch. When 0, each small upload is sent
1217+
/// immediately as a single-element BatchUpdateBlobs RPC.
1218+
///
1219+
/// Default: 10 (milliseconds)
1220+
#[serde(
1221+
default = "default_batch_coalesce_delay_ms",
1222+
deserialize_with = "convert_numeric_with_shellexpand"
1223+
)]
1224+
pub batch_coalesce_delay_ms: u64,
11351225
}
11361226

11371227
/// The possible error codes that might occur on an upstream request.

nativelink-store/Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ bincode = { version = "2.0.1", default-features = false, features = [
3333
"alloc",
3434
"serde",
3535
] }
36-
blake3 = { version = "1.8.0", default-features = false }
36+
blake3 = { version = "1.8.0", default-features = false, features = ["std", "rayon"] }
3737
byteorder = { version = "1.5.0", default-features = false }
3838
bytes = { version = "1.10.1", default-features = false }
3939
const_format = { version = "0.2.34", default-features = false }
@@ -53,7 +53,7 @@ hyper = { version = "1.6.0", default-features = false }
5353
hyper-rustls = { version = "0.27.5", default-features = false, features = [
5454
"http1",
5555
"http2",
56-
"ring",
56+
"aws-lc-rs",
5757
"rustls-native-certs",
5858
"rustls-platform-verifier",
5959
] }
@@ -64,13 +64,13 @@ mongodb = { version = "3", features = [
6464
"compat-3-0-0",
6565
"rustls-tls",
6666
], default-features = false }
67-
opentelemetry = { version = "0.29.1", default-features = false }
67+
opentelemetry = { version = "0.31.0", default-features = false }
6868
parking_lot = { version = "0.12.3", features = [
6969
"arc_lock",
7070
"send_guard",
7171
], default-features = false }
7272
patricia_tree = { version = "0.9.0", default-features = false }
73-
prost = { version = "0.13.5", default-features = false }
73+
prost = { version = "0.14.3", default-features = false }
7474
rand = { version = "0.9.0", default-features = false, features = [
7575
"thread_rng",
7676
] }
@@ -83,13 +83,13 @@ redis = { version = "1.0.0", default-features = false, features = [
8383
"tokio-comp",
8484
] }
8585
regex = { version = "1.11.1", default-features = false }
86-
reqwest = { version = "0.12", default-features = false }
87-
reqwest-middleware = { version = "0.4.2", default-features = false }
86+
reqwest = { version = "0.13.2", default-features = false }
87+
reqwest-middleware = { version = "0.5.1", default-features = false }
8888
rustls = { version = "0.23.27", default-features = false, features = [] }
8989
rustls-pki-types = { version = "1.13.1", default-features = false }
9090
serde = { version = "1.0.219", default-features = false }
9191
serde_json = { version = "1.0.140", default-features = false }
92-
sha2 = { version = "0.10.8", default-features = false }
92+
sha2 = { version = "0.10.8", default-features = false, features = ["asm"] }
9393
tokio = { version = "1.44.1", features = [
9494
"fs",
9595
"io-util",
@@ -100,8 +100,8 @@ tokio-stream = { version = "0.1.17", features = [
100100
"fs",
101101
], default-features = false }
102102
tokio-util = { version = "0.7.14", default-features = false }
103-
tonic = { version = "0.13.0", features = [
104-
"tls-ring",
103+
tonic = { version = "0.14.5", features = [
104+
"tls-aws-lc",
105105
"transport",
106106
], default-features = false }
107107
tracing = { version = "0.1.41", default-features = false }

nativelink-store/src/ac_utils.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ use futures::TryFutureExt;
2424
use nativelink_error::{Code, Error, ResultExt};
2525
use nativelink_util::common::DigestInfo;
2626
use nativelink_util::digest_hasher::DigestHasher;
27+
use nativelink_util::log_utils::throughput_mbps;
2728
use nativelink_util::store_trait::{StoreKey, StoreLike};
2829
use prost::Message;
30+
use tracing::debug;
2931

3032
// NOTE(aaronmondal) From some local testing it looks like action cache items are rarely greater than
3133
// 1.2k. Giving a bit more just in case to reduce allocs.
@@ -104,15 +106,25 @@ pub async fn serialize_and_upload_message<'a, T: Message>(
104106
let mut buffer = BytesMut::with_capacity(message.encoded_len());
105107
let digest = message_to_digest(message, &mut buffer, hasher)
106108
.err_tip(|| "In serialize_and_upload_message")?;
109+
let size_bytes = buffer.len() as u64;
107110
// Note: For unknown reasons we appear to be hitting:
108111
// https://github.com/rust-lang/rust/issues/92096
109112
// or a smiliar issue if we try to use the non-store driver function, so we
110113
// are using the store driver function here.
114+
let start = std::time::Instant::now();
111115
cas_store
112116
.as_store_driver_pin()
113117
.update_oneshot(digest.into(), buffer.freeze())
114118
.await
115119
.err_tip(|| "In serialize_and_upload_message")?;
120+
let elapsed = start.elapsed();
121+
debug!(
122+
?digest,
123+
size_bytes,
124+
elapsed_ms = elapsed.as_millis() as u64,
125+
throughput_mbps = format!("{:.1}", throughput_mbps(size_bytes, elapsed)),
126+
"serialize_and_upload_message: CAS write completed",
127+
);
116128
Ok(digest)
117129
}
118130

nativelink-store/src/callback_utils.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@ use core::pin::Pin;
1717
use std::sync::Arc;
1818

1919
use nativelink_util::evicting_map;
20-
use nativelink_util::store_trait::{RemoveItemCallback, StoreKey};
20+
use nativelink_util::store_trait::{ItemCallback, StoreKey};
2121

22-
// Generic struct to hold a RemoveItemCallback ref for the purposes
23-
// of a RemoveStateCallback call
22+
// Generic struct to hold an ItemCallback ref for the purposes of an item callback call
2423
#[derive(Debug)]
25-
pub struct RemoveItemCallbackHolder {
26-
callback: Arc<dyn RemoveItemCallback>,
24+
pub struct ItemCallbackHolder {
25+
callback: Arc<dyn ItemCallback>,
2726
}
2827

29-
impl RemoveItemCallbackHolder {
30-
pub fn new(callback: Arc<dyn RemoveItemCallback>) -> Self {
28+
impl ItemCallbackHolder {
29+
pub fn new(callback: Arc<dyn ItemCallback>) -> Self {
3130
Self { callback }
3231
}
3332
}
3433

35-
impl<'a, Q> evicting_map::RemoveItemCallback<Q> for RemoveItemCallbackHolder
34+
impl<'a, Q> evicting_map::ItemCallback<Q> for ItemCallbackHolder
3635
where
3736
Q: Borrow<StoreKey<'a>>,
3837
{
@@ -42,4 +41,9 @@ where
4241
let store_key = store_key.borrow().into_owned();
4342
Box::pin(async move { callback.callback(store_key).await })
4443
}
44+
45+
fn on_insert(&self, store_key: &Q, size: u64) {
46+
let store_key: &StoreKey<'_> = Borrow::<StoreKey<'_>>::borrow(store_key);
47+
self.callback.on_insert(store_key.borrow().into_owned(), size);
48+
}
4549
}

nativelink-store/src/compression_store.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use nativelink_util::buf_channel::{
3131
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
3232
use nativelink_util::spawn;
3333
use nativelink_util::store_trait::{
34-
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
34+
ItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
3535
};
3636
use serde::{Deserialize, Serialize};
3737

@@ -44,7 +44,7 @@ pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
4444
// Default block size that will be used to slice stream into.
4545
pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;
4646

47-
const U32_SZ: u64 = size_of::<u8>() as u64;
47+
const U32_SZ: u64 = size_of::<u32>() as u64;
4848

4949
// We use a custom frame format here because I wanted the ability in the future to:
5050
// * Read a random part of the data without needing to parse entire file.
@@ -630,14 +630,16 @@ impl StoreDriver for CompressionStore {
630630
};
631631

632632
let (read_result, get_part_fut_result) = tokio::join!(read_fut, get_part_fut);
633-
if let Err(mut e) = read_result {
634-
// We may need to propagate the error from reading the data through first.
635-
if let Err(err) = get_part_fut_result {
636-
e = err.merge(e);
637-
}
638-
return Err(e);
633+
// Propagate errors from both futures. Previously, if read_fut
634+
// succeeded but get_part_fut failed (e.g., inner store returned
635+
// NotFound), the error was silently swallowed — masking real
636+
// data-loss errors from the caller.
637+
match (read_result, get_part_fut_result) {
638+
(Ok(()), Ok(())) => Ok(()),
639+
(Err(e), Ok(())) => Err(e),
640+
(Ok(()), Err(e)) => Err(e),
641+
(Err(read_err), Err(get_err)) => Err(get_err.merge(read_err)),
639642
}
640-
Ok(())
641643
}
642644

643645
fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
@@ -652,11 +654,11 @@ impl StoreDriver for CompressionStore {
652654
self
653655
}
654656

655-
fn register_remove_callback(
657+
fn register_item_callback(
656658
self: Arc<Self>,
657-
callback: Arc<dyn RemoveItemCallback>,
659+
callback: Arc<dyn ItemCallback>,
658660
) -> Result<(), Error> {
659-
self.inner_store.register_remove_callback(callback)
661+
self.inner_store.register_item_callback(callback)
660662
}
661663
}
662664

nativelink-store/src/dedup_store.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use nativelink_util::common::DigestInfo;
2727
use nativelink_util::fastcdc::FastCDC;
2828
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
2929
use nativelink_util::store_trait::{
30-
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
30+
ItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
3131
};
3232
use serde::{Deserialize, Serialize};
3333
use tokio_util::codec::FramedRead;
@@ -209,16 +209,13 @@ impl StoreDriver for DedupStore {
209209
.map_ok(|frame| async move {
210210
let hash = blake3::hash(&frame[..]).into();
211211
let index_entry = DigestInfo::new(hash, frame.len() as u64);
212-
if self
213-
.content_store
214-
.has(index_entry)
215-
.await
216-
.err_tip(|| "Failed to call .has() in DedupStore::update()")?
217-
.is_some()
218-
{
219-
// If our store has this digest, we don't need to upload it.
220-
return Result::<_, Error>::Ok(index_entry);
221-
}
212+
// Always upload the chunk unconditionally. A previous has()
213+
// check here skipped the upload when the chunk appeared to
214+
// exist, but the chunk could be evicted between that check
215+
// and the index commit — leaving the index pointing to a
216+
// missing chunk and causing "Lost inputs" errors.
217+
// Content-addressed upload is idempotent, so re-uploading
218+
// an existing chunk is safe and cheap.
222219
self.content_store
223220
.update_oneshot(index_entry, frame)
224221
.await
@@ -379,13 +376,13 @@ impl StoreDriver for DedupStore {
379376
self
380377
}
381378

382-
fn register_remove_callback(
379+
fn register_item_callback(
383380
self: Arc<Self>,
384-
callback: Arc<dyn RemoveItemCallback>,
381+
callback: Arc<dyn ItemCallback>,
385382
) -> Result<(), Error> {
386383
self.index_store
387-
.register_remove_callback(callback.clone())?;
388-
self.content_store.register_remove_callback(callback)?;
384+
.register_item_callback(callback.clone())?;
385+
self.content_store.register_item_callback(callback)?;
389386
Ok(())
390387
}
391388
}

0 commit comments

Comments
 (0)