Skip to content

Commit 7528351

Browse files
committed
dbeug cache hits and misses
1 parent 5f4d912 commit 7528351

File tree

1 file changed

+64
-5
lines changed

1 file changed

+64
-5
lines changed

src/object_store_cache.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -476,8 +476,28 @@ impl ObjectStore for FoyerObjectStoreCache {
476476
async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
477477
self.update_stats(|s| s.inner_puts += 1).await;
478478

479+
let payload_size = payload.content_length();
480+
let is_parquet = location.as_ref().ends_with(".parquet");
481+
482+
debug!(
483+
"S3 PUT request starting: {} (size: {} bytes, parquet: {})",
484+
location,
485+
payload_size,
486+
is_parquet
487+
);
488+
479489
// Write to S3 first without removing from cache (to avoid cache stampede)
490+
let start_time = std::time::Instant::now();
480491
let result = self.inner.put(location, payload).await?;
492+
let duration = start_time.elapsed();
493+
494+
debug!(
495+
"S3 PUT request completed: {} (size: {} bytes, duration: {}ms, parquet: {})",
496+
location,
497+
payload_size,
498+
duration.as_millis(),
499+
is_parquet
500+
);
481501

482502
// After successful write, update the cache with the new data
483503
self.update_stats(|s| s.inner_gets += 1).await;
@@ -641,11 +661,12 @@ impl ObjectStore for FoyerObjectStoreCache {
641661
self.update_stats(|s| s.hits += 1).await;
642662
let is_parquet = location.as_ref().ends_with(".parquet");
643663
debug!(
644-
"Foyer cache HIT for: {} (avoiding S3 access, parquet={}, TTL={}s, age={}ms)",
664+
"Foyer cache HIT for: {} (avoiding S3 access, parquet={}, TTL={}s, age={}ms, size={} bytes)",
645665
location,
646666
is_parquet,
647667
ttl.as_secs(),
648-
current_millis().saturating_sub(value.timestamp_millis)
668+
current_millis().saturating_sub(value.timestamp_millis),
669+
value.data.len()
649670
);
650671
return Ok(Self::make_get_result(Bytes::from(value.data.clone()), value.meta.clone()));
651672
}
@@ -666,7 +687,17 @@ impl ObjectStore for FoyerObjectStoreCache {
666687
ttl.as_secs()
667688
);
668689

690+
let start_time = std::time::Instant::now();
669691
let result = self.inner.get(location).await?;
692+
let duration = start_time.elapsed();
693+
694+
debug!(
695+
"S3 GET request: {} (size: {} bytes, duration: {}ms, parquet: {})",
696+
location,
697+
result.meta.size,
698+
duration.as_millis(),
699+
is_parquet
700+
);
670701

671702
// Collect payload for caching
672703
use futures::TryStreamExt;
@@ -714,10 +745,11 @@ impl ObjectStore for FoyerObjectStoreCache {
714745
if !value.is_expired(ttl) && range.end <= value.data.len() as u64 {
715746
self.update_stats(|s| s.hits += 1).await;
716747
debug!(
717-
"Foyer cache HIT (full file) for range: {} (range: {}..{}, parquet={}, age={}ms)",
748+
"Foyer cache HIT (full file) for range: {} (range: {}..{}, size: {} bytes, parquet={}, age={}ms)",
718749
location,
719750
range.start,
720751
range.end,
752+
range.end - range.start,
721753
is_parquet,
722754
current_millis().saturating_sub(value.timestamp_millis)
723755
);
@@ -753,10 +785,11 @@ impl ObjectStore for FoyerObjectStoreCache {
753785
if !value.is_expired(ttl) {
754786
self.update_metadata_stats(|s| s.hits += 1).await;
755787
debug!(
756-
"Metadata cache HIT for: {} (range: {}..{}, age={}ms)",
788+
"Metadata cache HIT for: {} (range: {}..{}, size: {} bytes, age={}ms)",
757789
location,
758790
range.start,
759791
range.end,
792+
value.data.len(),
760793
current_millis().saturating_sub(value.timestamp_millis)
761794
);
762795
return Ok(Bytes::from(value.data.clone()));
@@ -774,7 +807,18 @@ impl ObjectStore for FoyerObjectStoreCache {
774807
location, range.start, range.end, file_size
775808
);
776809

810+
let start_time = std::time::Instant::now();
777811
let data = self.inner.get_range(location, range.clone()).await?;
812+
let duration = start_time.elapsed();
813+
814+
debug!(
815+
"S3 GET_RANGE request (metadata): {} (range: {}..{}, size: {} bytes, duration: {}ms)",
816+
location,
817+
range.start,
818+
range.end,
819+
data.len(),
820+
duration.as_millis()
821+
);
778822

779823
// Cache the metadata range in the metadata cache
780824
let range_meta = ObjectMeta {
@@ -835,7 +879,22 @@ impl ObjectStore for FoyerObjectStoreCache {
835879
"get_range request for: {} (range: {}..{}, parquet={})",
836880
location, range.start, range.end, is_parquet
837881
);
838-
self.inner.get_range(location, range).await
882+
883+
let start_time = std::time::Instant::now();
884+
let result = self.inner.get_range(location, range.clone()).await?;
885+
let duration = start_time.elapsed();
886+
887+
debug!(
888+
"S3 GET_RANGE request: {} (range: {}..{}, size: {} bytes, duration: {}ms, parquet: {})",
889+
location,
890+
range.start,
891+
range.end,
892+
range.end - range.start,
893+
duration.as_millis(),
894+
is_parquet
895+
);
896+
897+
Ok(result)
839898
}
840899

841900
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {

0 commit comments

Comments
 (0)