Skip to content

Commit fc11b0b

Browse files
AnonymousKaya Balta
authored andcommitted
Add macOS and Linux read prefetch hints
Add macOS F_RDADVISE in advise_sequential for file-level readahead. Add advise_willneed method with 3-way cfg (F_RDADVISE on macOS, POSIX_FADV_WILLNEED on Linux, no-op elsewhere). Add micro-prefetch in read_file_to_channel read loop. Thread start_offset parameter through read_file_to_channel callers.
1 parent 28231f0 commit fc11b0b

File tree

3 files changed

+52
-6
lines changed

3 files changed

+52
-6
lines changed

nativelink-store/src/filesystem_store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use nativelink_util::store_trait::{
4040
};
4141
use tokio::sync::Semaphore;
4242
use tokio_stream::wrappers::ReadDirStream;
43-
use tracing::{debug, error, trace, warn};
43+
use tracing::{debug, error, info, trace, warn};
4444

4545
use crate::callback_utils::ItemCallbackHolder;
4646
use crate::cas_utils::is_zero_digest;
@@ -903,7 +903,7 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
903903
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
904904
// Then check it's still in there...
905905
if evicting_map.get(&key).await.is_none() {
906-
debug!(%key, "Got eviction while emplacing, dropping");
906+
info!(%key, "Got eviction while emplacing, dropping");
907907
return Ok(());
908908
}
909909

@@ -1256,7 +1256,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
12561256
// The same blobs are frequently read by multiple workers within
12571257
// seconds of each other — keeping them in page cache avoids
12581258
// redundant disk I/O (measured: 76% of read I/O is re-reads).
1259-
fs::read_file_to_channel(temp_file, writer, read_limit, self.read_buffer_size)
1259+
fs::read_file_to_channel(temp_file, writer, read_limit, self.read_buffer_size, offset)
12601260
.await
12611261
.err_tip(|| "Failed to read data in filesystem store")?;
12621262
writer

nativelink-util/src/fs.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,48 @@ impl FileSlot {
9191
}
9292
}
9393

94-
#[cfg(not(target_os = "linux"))]
94+
#[cfg(target_os = "macos")]
95+
pub fn advise_sequential(&self) {
96+
// F_RDADVISE hints that we'll read a range soon — use a 4MB initial
97+
// window to kick off readahead similar to Linux POSIX_FADV_SEQUENTIAL.
98+
self.advise_willneed(0, 4 * 1024 * 1024);
99+
}
100+
101+
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
95102
pub const fn advise_sequential(&self) {}
103+
104+
/// Advise the kernel that we will soon need data at [offset, offset+len).
105+
/// Best-effort: errors are silently ignored.
106+
#[cfg(target_os = "linux")]
107+
pub fn advise_willneed(&self, offset: u64, len: usize) {
108+
use std::os::unix::io::AsRawFd;
109+
let fd = self.inner.as_raw_fd();
110+
unsafe {
111+
libc::posix_fadvise(fd, offset as i64, len as i64, libc::POSIX_FADV_WILLNEED);
112+
}
113+
}
114+
115+
#[cfg(target_os = "macos")]
116+
pub fn advise_willneed(&self, offset: u64, len: usize) {
117+
use std::os::unix::io::AsRawFd;
118+
const F_RDADVISE: libc::c_int = 44;
119+
#[repr(C)]
120+
struct radvisory {
121+
ra_offset: libc::off_t, // i64
122+
ra_count: libc::c_int, // i32
123+
}
124+
let ra = radvisory {
125+
ra_offset: offset as libc::off_t,
126+
ra_count: len.min(i32::MAX as usize) as libc::c_int,
127+
};
128+
let fd = self.inner.as_raw_fd();
129+
unsafe {
130+
libc::fcntl(fd, F_RDADVISE, &ra);
131+
}
132+
}
133+
134+
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
135+
pub const fn advise_willneed(&self, _offset: u64, _len: usize) {}
96136
}
97137

98138
// Note: If the default changes make sure you update the documentation in
@@ -232,20 +272,23 @@ pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
232272
}
233273

234274
/// Read from `file` in a blocking thread, sending chunks to `writer`.
235-
/// Reads up to `limit` bytes starting from the current file position.
275+
/// Reads up to `limit` bytes starting from `start_offset`.
236276
/// `read_buffer_size` controls the chunk size (typically 256 KiB).
277+
/// After each read, prefetches the next 2 chunks via `advise_willneed`.
237278
/// Returns the `FileSlot` so the caller can reuse or drop it.
238279
pub async fn read_file_to_channel(
239280
file: FileSlot,
240281
writer: &mut DropCloserWriteHalf,
241282
limit: u64,
242283
read_buffer_size: usize,
284+
start_offset: u64,
243285
) -> Result<FileSlot, Error> {
244286
let (sync_tx, mut async_rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(4);
245287

246288
let read_task = spawn_blocking!("fs_read_file", move || {
247289
let mut f = file;
248290
let mut remaining = limit;
291+
let mut current_offset = start_offset;
249292
loop {
250293
let to_read = read_buffer_size.min(remaining as usize);
251294
if to_read == 0 {
@@ -256,7 +299,10 @@ pub async fn read_file_to_channel(
256299
Ok(0) => break,
257300
Ok(n) => {
258301
buf.truncate(n);
302+
current_offset += n as u64;
259303
remaining -= n as u64;
304+
// Prefetch next 2 chunks while this one travels over the network.
305+
f.advise_willneed(current_offset, read_buffer_size * 2);
260306
if sync_tx.blocking_send(Ok(buf.freeze())).is_err() {
261307
break; // reader dropped
262308
}

nativelink-util/src/store_trait.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
106106
.update(digest.into(), rx, upload_size)
107107
.map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
108108
let read_data_fut = async move {
109-
let file = fs::read_file_to_channel(file, &mut tx, u64::MAX, fs::DEFAULT_READ_BUFF_SIZE)
109+
let file = fs::read_file_to_channel(file, &mut tx, u64::MAX, fs::DEFAULT_READ_BUFF_SIZE, 0)
110110
.await
111111
.err_tip(|| "Failed to read in upload_file_to_store")?;
112112
tx.send_eof()

0 commit comments

Comments
 (0)