Skip to content

Commit fbf8069

Browse files
committed
Fix evicting map get lock and fast slow store EOF
1 parent 658dd53 commit fbf8069

File tree

3 files changed

+102
-67
lines changed

3 files changed

+102
-67
lines changed

nativelink-store/src/fast_slow_store.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl StoreDriver for FastSlowStore {
396396
return self.slow_store.update(key, reader, size_info).await;
397397
}
398398

399-
let (mut fast_tx, fast_rx) = make_buf_channel_pair();
399+
let (fast_tx, fast_rx) = make_buf_channel_pair();
400400
let (mut slow_tx, slow_rx) = make_buf_channel_pair();
401401

402402
let key_debug = format!("{key:?}");
@@ -408,16 +408,17 @@ impl StoreDriver for FastSlowStore {
408408
let mut bytes_sent: u64 = 0;
409409

410410
let data_stream_fut = async move {
411+
let mut fast_tx = Some(fast_tx);
411412
loop {
412413
let buffer = reader
413414
.recv()
414415
.await
415416
.err_tip(|| "Failed to read buffer in fastslow store")?;
416417
if buffer.is_empty() {
417418
// EOF received.
418-
fast_tx.send_eof().err_tip(
419-
|| "Failed to write eof to fast store in fast_slow store update",
420-
)?;
419+
if let Some(mut ftx) = fast_tx.take() {
420+
drop(ftx.send_eof());
421+
}
421422
slow_tx
422423
.send_eof()
423424
.err_tip(|| "Failed to write eof to writer in fast_slow store update")?;
@@ -429,34 +430,43 @@ impl StoreDriver for FastSlowStore {
429430
}
430431

431432
let chunk_len = buffer.len();
432-
let send_start = std::time::Instant::now();
433-
let (fast_result, slow_result) =
434-
join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer));
435-
let send_elapsed = send_start.elapsed();
436-
if send_elapsed.as_secs() >= 5 {
437-
warn!(
438-
chunk_len,
439-
send_elapsed_ms = send_elapsed.as_millis(),
440-
total_bytes = bytes_sent,
441-
"FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
442-
);
443-
}
444-
bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
445-
fast_result
446-
.map_err(|e| {
433+
if let Some(ref mut ftx) = fast_tx {
434+
let send_start = std::time::Instant::now();
435+
let (fast_result, slow_result) =
436+
join!(ftx.send(buffer.clone()), slow_tx.send(buffer));
437+
let send_elapsed = send_start.elapsed();
438+
if send_elapsed.as_secs() >= 5 {
439+
warn!(
440+
chunk_len,
441+
send_elapsed_ms = send_elapsed.as_millis(),
442+
total_bytes = bytes_sent,
443+
"FastSlowStore::update: channel send stalled (>5s). A downstream store may be hanging",
444+
);
445+
}
446+
if fast_result.is_err() {
447+
warn!(
448+
total_bytes = bytes_sent,
449+
"FastSlowStore::update: fast store channel failed, continuing with slow store only",
450+
);
451+
fast_tx = None;
452+
}
453+
slow_result.map_err(|e| {
447454
make_err!(
448455
Code::Internal,
449-
"Failed to send message to fast_store in fast_slow_store {:?}",
456+
"Failed to send message to slow_store in fast_slow store {:?}",
450457
e
451458
)
452-
})
453-
.merge(slow_result.map_err(|e| {
459+
})?;
460+
} else {
461+
slow_tx.send(buffer).await.map_err(|e| {
454462
make_err!(
455463
Code::Internal,
456464
"Failed to send message to slow_store in fast_slow store {:?}",
457465
e
458466
)
459-
}))?;
467+
})?;
468+
}
469+
bytes_sent += u64::try_from(chunk_len).unwrap_or(u64::MAX);
460470
}
461471
};
462472

@@ -483,7 +493,15 @@ impl StoreDriver for FastSlowStore {
483493
"FastSlowStore::update: completed successfully",
484494
);
485495
}
486-
data_stream_res.merge(fast_res).merge(slow_res)?;
496+
// Slow store success is required; fast store failure is tolerated since it's a cache.
497+
data_stream_res.merge(slow_res)?;
498+
if let Err(err) = fast_res {
499+
warn!(
500+
?err,
501+
key = %key_debug,
502+
"FastSlowStore::update: fast store failed during upload; data stored in slow store",
503+
);
504+
}
487505
Ok(())
488506
}
489507

@@ -591,16 +609,23 @@ impl StoreDriver for FastSlowStore {
591609
// TODO(palfrey) Investigate if we should maybe ignore errors here instead of
592610
// forwarding them up.
593611
if self.fast_store.has(key.borrow()).await?.is_some() {
594-
self.metrics
595-
.fast_store_hit_count
596-
.fetch_add(1, Ordering::Acquire);
597-
self.fast_store
598-
.get_part(key, writer.borrow_mut(), offset, length)
599-
.await?;
600-
self.metrics
601-
.fast_store_downloaded_bytes
602-
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
603-
return Ok(());
612+
match self
613+
.fast_store
614+
.get_part(key.borrow(), writer.borrow_mut(), offset, length)
615+
.await
616+
{
617+
Ok(()) => {
618+
self.metrics
619+
.fast_store_hit_count
620+
.fetch_add(1, Ordering::Acquire);
621+
self.metrics
622+
.fast_store_downloaded_bytes
623+
.fetch_add(writer.get_bytes_written(), Ordering::Acquire);
624+
return Ok(());
625+
}
626+
Err(err) if err.code == Code::NotFound => {}
627+
Err(err) => return Err(err),
628+
}
604629
}
605630

606631
// If the fast store is noop or read only or update only then bypass it.

nativelink-store/src/filesystem_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,8 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
816816
// it still exists in there. But first, get the lock...
817817
let mut encoded_file_path = entry.get_encoded_file_path().write().await;
818818
// Then check it's still in there...
819-
if evicting_map.get(&key).await.is_none() {
819+
// Use size_for_key instead of get() to avoid triggering bulk eviction of other entries.
820+
if evicting_map.size_for_key(&key).await.is_none() {
820821
info!(%key, "Got eviction while emplacing, dropping");
821822
return Ok(());
822823
}

nativelink-util/src/evicting_map.rs

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -435,41 +435,50 @@ where
435435
}
436436

437437
pub async fn get(&self, key: &Q) -> Option<T> {
438-
// Fast path: Check if we need eviction before acquiring lock for eviction
439-
let needs_eviction = {
440-
let state = self.state.lock();
441-
if let Some((_, peek_entry)) = state.lru.peek_lru() {
442-
self.should_evict(
443-
state.lru.len(),
444-
peek_entry,
445-
state.sum_store_size,
446-
self.max_bytes,
447-
)
438+
let (result, items_to_unref, removal_futures) = {
439+
let mut state = self.state.lock();
440+
// Check if the requested item is expired before promoting it.
441+
let result = if let Some(entry) = state.lru.peek(key.borrow()) {
442+
if self.should_evict(state.lru.len(), entry, 0, u64::MAX) {
443+
// Item is expired, remove it.
444+
if let Some((k, eviction_item)) = state.lru.pop_entry(key.borrow()) {
445+
let (data, futures) = state.remove(k.borrow(), &eviction_item, false);
446+
let (mut items, mut removals) = self.evict_items(&mut *state);
447+
items.push(data);
448+
removals.extend(futures);
449+
return {
450+
Self::unref_items(items, removals).await;
451+
None
452+
};
453+
}
454+
None
455+
} else {
456+
// Item is valid. Promote it in LRU so it's safe from eviction.
457+
state.lru.get_mut(key.borrow()).map(|entry| {
458+
entry.seconds_since_anchor =
459+
i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
460+
entry.data.clone()
461+
})
462+
}
448463
} else {
449-
false
450-
}
451-
};
452-
453-
// Perform eviction if needed
454-
if needs_eviction {
455-
let (items_to_unref, removal_futures) = {
456-
let mut state = self.state.lock();
457-
self.evict_items(&mut *state)
464+
None
458465
};
459-
// Unref items outside of lock
460-
let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
461-
while callbacks.next().await.is_some() {}
462-
let mut callbacks: FuturesUnordered<_> =
463-
items_to_unref.iter().map(LenEntry::unref).collect();
464-
while callbacks.next().await.is_some() {}
465-
}
466+
// Evict other items if needed
467+
let (items_to_unref, removal_futures) = self.evict_items(&mut *state);
468+
(result, items_to_unref, removal_futures)
469+
};
470+
// Unref items outside of lock
471+
Self::unref_items(items_to_unref, removal_futures).await;
472+
result
473+
}
466474

467-
// Now get the item
468-
let mut state = self.state.lock();
469-
let entry = state.lru.get_mut(key.borrow())?;
470-
entry.seconds_since_anchor =
471-
i32::try_from(self.anchor_time.elapsed().as_secs()).unwrap_or(i32::MAX);
472-
Some(entry.data.clone())
475+
/// Helper to unref evicted items outside of lock.
476+
async fn unref_items(items_to_unref: Vec<T>, removal_futures: Vec<RemoveFuture>) {
477+
let mut callbacks: FuturesUnordered<_> = removal_futures.into_iter().collect();
478+
while callbacks.next().await.is_some() {}
479+
let mut callbacks: FuturesUnordered<_> =
480+
items_to_unref.iter().map(LenEntry::unref).collect();
481+
while callbacks.next().await.is_some() {}
473482
}
474483

475484
/// Returns the replaced item if any.

0 commit comments

Comments
 (0)