Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl StoreDriver for FastSlowStore {
return self.slow_store.update(key, reader, size_info).await;
}

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (fast_tx, fast_rx) = make_buf_channel_pair();
let (mut slow_tx, slow_rx) = make_buf_channel_pair();

let key_debug = format!("{key:?}");
Expand All @@ -408,16 +408,17 @@ impl StoreDriver for FastSlowStore {
let mut bytes_sent: u64 = 0;

let data_stream_fut = async move {
let mut fast_tx = Some(fast_tx);
loop {
let buffer = reader
.recv()
.await
.err_tip(|| "Failed to read buffer in fastslow store")?;
if buffer.is_empty() {
// EOF received.
fast_tx.send_eof().err_tip(
|| "Failed to write eof to fast store in fast_slow store update",
)?;
if let Some(mut ftx) = fast_tx.take() {
drop(ftx.send_eof());
}
slow_tx
.send_eof()
.err_tip(|| "Failed to write eof to writer in fast_slow store update")?;
Expand All @@ -429,34 +430,43 @@ impl StoreDriver for FastSlowStore {
}

let chunk_len = buffer.len();
let send_start = std::time::Instant::now();
let (fast_result, slow_result) =
join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
let send_elapsed = send_start.elapsed();
if send_elapsed.as_secs() >= 5 {
warn!(
chunk_len,
send_elapsed_ms = send_elapsed.as_millis(),
total_bytes = bytes_sent,
"FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
);
}
bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
fast_result
.map_err(|e| {
if let Some(ref mut ftx) = fast_tx {
let send_start = std::time::Instant::now();
let (fast_result, slow_result) =
join!(ftx.send(buffer.clone()), slow_tx.send(buffer));
let send_elapsed = send_start.elapsed();
if send_elapsed.as_secs() >= 5 {
warn!(
chunk_len,
send_elapsed_ms = send_elapsed.as_millis(),
total_bytes = bytes_sent,
"FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
);
}
if fast_result.is_err() {
warn!(
total_bytes = bytes_sent,
"FastSlowStore::update: fast store channel failed, continuing with slow store only",
);
fast_tx = None;
}
slow_result.map_err(|e| {
make_err!(
Code::Internal,
"Failed to send message to fast_store in fast_slow_store {:?}",
"Failed to send message to slow_store in fast_slow store {:?}",
e
)
})
.merge(slow_result.map_err(|e| {
})?;
} else {
slow_tx.send(buffer).await.map_err(|e| {
make_err!(
Code::Internal,
"Failed to send message to slow_store in fast_slow store {:?}",
e
)
}))?;
})?;
}
bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
}
};

Expand All @@ -483,7 +493,15 @@ impl StoreDriver for FastSlowStore {
"FastSlowStore::update: completed successfully",
);
}
data_stream_res.merge(fast_res).merge(slow_res)?;
// Slow store success is required; fast store failure is tolerated since it's a cache.
data_stream_res.merge(slow_res)?;
if let Err(err) = fast_res {
warn!(
?err,
key = %key_debug,
"FastSlowStore::update: fast store failed during upload; data stored in slow store",
);
}
Ok(())
}

Expand Down Expand Up @@ -538,10 +556,20 @@ impl StoreDriver for FastSlowStore {
{
return Ok(Some(file));
}
return self
return match self
.fast_store
.update_with_whole_file(key, path, file, upload_size)
.await;
.await
{
Ok(file_slot) => Ok(file_slot),
Err(err) => {
warn!(
?err,
"FastSlowStore::update_with_whole_file: fast store failed; data stored in slow store",
);
Ok(None)
}
};
}

if self
Expand All @@ -555,14 +583,19 @@ impl StoreDriver for FastSlowStore {
|| self.fast_direction == StoreDirection::ReadOnly
|| self.fast_direction == StoreDirection::Get;
if !ignore_fast {
slow_update_store_with_file(
if let Err(err) = slow_update_store_with_file(
self.fast_store.as_store_driver_pin(),
key.borrow(),
&mut file,
upload_size,
)
.await
.err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?;
{
warn!(
?err,
"FastSlowStore::update_with_whole_file: fast store failed; continuing with slow store",
);
}
}
let ignore_slow = self.slow_direction == StoreDirection::ReadOnly
|| self.slow_direction == StoreDirection::Get;
Expand Down
3 changes: 2 additions & 1 deletion nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,8 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
// it still exists in there. But first, get the lock...
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
// Then check it's still in there...
if evicting_map.get(&key).await.is_none() {
// Use size_for_key instead of get() to avoid triggering bulk eviction of other entries.
if evicting_map.size_for_key(&key).await.is_none() {
info!(%key, "Got eviction while emplacing, dropping");
return Ok(());
}
Expand Down
70 changes: 39 additions & 31 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,41 +435,49 @@ where
}

pub async fn get(&self, key: &Q) -> Option<T> {
// Fast path: Check if we need eviction before acquiring lock for eviction
let needs_eviction = {
let state = self.state.lock();
if let Some((_, peek_entry)) = state.lru.peek_lru() {
self.should_evict(
state.lru.len(),
peek_entry,
state.sum_store_size,
self.max_bytes,
)
let (result, items_to_unref, removal_futures) = {
let mut state = self.state.lock();
// Check if the requested item is expired before promoting it.
if let Some(entry) = state.lru.peek(key.borrow()) {
if self.should_evict(state.lru.len(), entry, 0, u64::MAX) {
// Item is expired, remove it.
if let Some((k, eviction_item)) = state.lru.pop_entry(key.borrow()) {
let (data, futures) = state.remove(k.borrow(), &eviction_item, false);
let (mut items, mut removals) = self.evict_items(&mut *state);
items.push(data);
removals.extend(futures);
(None, items, removals)
} else {
let (items, removals) = self.evict_items(&mut *state);
(None, items, removals)
}
} else {
// Item is valid. Promote it in LRU so it's safe from eviction.
let data = state.lru.get_mut(key.borrow()).map(|entry| {
entry.seconds_since_anchor =
i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
entry.data.clone()
});
let (items, removals) = self.evict_items(&mut *state);
(data, items, removals)
}
} else {
false
let (items, removals) = self.evict_items(&mut *state);
(None, items, removals)
}
};
// Unref items outside of lock — lock is guaranteed dropped here.
Self::unref_items(items_to_unref, removal_futures).await;
result
}

// Perform eviction if needed
if needs_eviction {
let (items_to_unref, removal_futures) = {
let mut state = self.state.lock();
self.evict_items(&mut *state)
};
// Unref items outside of lock
let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
while callbacks.next().await.is_some() {}
let mut callbacks: FuturesUnordered<_> =
items_to_unref.iter().map(LenEntry::unref).collect();
while callbacks.next().await.is_some() {}
}

// Now get the item
let mut state = self.state.lock();
let entry = state.lru.get_mut(key.borrow())?;
entry.seconds_since_anchor =
i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
Some(entry.data.clone())
/// Helper to unref evicted items outside of lock.
async fn unref_items(items_to_unref: Vec<T>, removal_futures: Vec<RemoveFuture>) {
let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
while callbacks.next().await.is_some() {}
let mut callbacks: FuturesUnordered<_> =
items_to_unref.iter().map(LenEntry::unref).collect();
while callbacks.next().await.is_some() {}
}

/// Returns the replaced item if any.
Expand Down
2 changes: 1 addition & 1 deletion nativelink-util/src/instant_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl InstantWrapper for MockInstantWrapped {
let baseline = self.0.elapsed();
loop {
tokio::task::yield_now().await;
if self.0.elapsed() - baseline >= duration {
if self.0.elapsed().saturating_sub(baseline) >= duration {
break;
}
}
Expand Down
33 changes: 28 additions & 5 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,34 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke
.err_tip(|| "Error while calling execution_response")?;
},
Err(e) => {
grpc_client.execution_response(ExecuteResult{
instance_name,
operation_id,
result: Some(execute_result::Result::InternalError(e.into())),
}).await.err_tip(|| "Error calling execution_response with error")?;
let is_cas_blob_missing = e.code == Code::NotFound
&& e.message_string().contains("not found in either fast or slow store");
if is_cas_blob_missing {
warn!(
?e,
"Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION"
);
let action_result = ActionResult {
error: Some(make_err!(
Code::FailedPrecondition,
"{}",
e.message_string()
)),
..ActionResult::default()
};
let action_stage = ActionStage::Completed(action_result);
grpc_client.execution_response(ExecuteResult{
instance_name,
operation_id,
result: Some(execute_result::Result::ExecuteResponse(action_stage.into())),
}).await.err_tip(|| "Error calling execution_response with missing inputs")?;
} else {
grpc_client.execution_response(ExecuteResult{
instance_name,
operation_id,
result: Some(execute_result::Result::InternalError(e.into())),
}).await.err_tip(|| "Error calling execution_response with error")?;
}
},
}
Ok(())
Expand Down
Loading