Skip to content

Commit 5a4b3e8

Browse files
committed
imporve span propagation
1 parent aedd5db commit 5a4b3e8

File tree

2 files changed

+74
-7
lines changed

2 files changed

+74
-7
lines changed

src/database.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -991,8 +991,9 @@ impl Database {
991991
// This allows the same cache to be used across all tables
992992
let cache_wrapped =
993993
Arc::new(FoyerObjectStoreCache::new_with_shared_cache(instrumented_store.clone(), shared_cache)) as Arc<dyn object_store::ObjectStore>;
994-
// Instrument the cache layer as well to see cache hits/misses
995-
instrument_object_store(cache_wrapped, "foyer_cache")
994+
// Note: We don't double-instrument with instrument_object_store here since FoyerObjectStoreCache
995+
// already has its own instrumentation that properly propagates parent spans
996+
cache_wrapped
996997
} else {
997998
warn!("Shared Foyer cache not initialized, using uncached object store");
998999
instrumented_store

src/object_store_cache.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use std::ops::Range;
1111
use std::path::PathBuf;
1212
use std::sync::Arc;
1313
use std::time::{Duration, SystemTime, UNIX_EPOCH};
14-
use tracing::{debug, info};
14+
use tracing::{debug, info, instrument, Instrument};
15+
use tracing::field::Empty;
1516

1617
use foyer::{
1718
BlockEngineBuilder, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder,
@@ -613,7 +614,17 @@ impl ObjectStore for FoyerObjectStoreCache {
613614
Ok(result)
614615
}
615616

617+
#[instrument(
618+
name = "foyer_cache.get",
619+
skip_all,
620+
fields(
621+
location = %location,
622+
cache_hit = Empty,
623+
is_checkpoint = Self::is_last_checkpoint(location),
624+
)
625+
)]
616626
async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
627+
let span = tracing::Span::current();
617628
let cache_key = Self::make_cache_key(location);
618629

619630
// Try cache first
@@ -626,6 +637,7 @@ impl ObjectStore for FoyerObjectStoreCache {
626637
// Special handling for _last_checkpoint: stale-while-revalidate
627638
if Self::is_last_checkpoint(location) && !value.is_expired(ttl) {
628639
self.update_stats(|s| s.hits += 1).await;
640+
span.record("cache_hit", true);
629641

630642
// Check if older than 5 seconds
631643
let age_millis = current_millis().saturating_sub(value.timestamp_millis);
@@ -698,6 +710,7 @@ impl ObjectStore for FoyerObjectStoreCache {
698710
);
699711
} else {
700712
self.update_stats(|s| s.hits += 1).await;
713+
span.record("cache_hit", true);
701714
let is_parquet = location.as_ref().ends_with(".parquet");
702715
debug!(
703716
"Foyer cache HIT for: {} (avoiding S3 access, parquet={}, TTL={}s, age={}ms, size={} bytes)",
@@ -712,6 +725,7 @@ impl ObjectStore for FoyerObjectStoreCache {
712725
}
713726

714727
// Cache miss - fetch from inner store
728+
span.record("cache_hit", false);
715729
self.update_stats(|s| {
716730
s.misses += 1;
717731
s.inner_gets += 1;
@@ -727,7 +741,10 @@ impl ObjectStore for FoyerObjectStoreCache {
727741
);
728742

729743
let start_time = std::time::Instant::now();
730-
let result = self.inner.get(location).await?;
744+
let inner_span = tracing::trace_span!(parent: &span, "s3.get", location = %location);
745+
let result = self.inner.get(location)
746+
.instrument(inner_span)
747+
.await?;
731748
let duration = start_time.elapsed();
732749

733750
debug!(
@@ -773,7 +790,21 @@ impl ObjectStore for FoyerObjectStoreCache {
773790
self.get(location).await
774791
}
775792

793+
#[instrument(
794+
name = "foyer_cache.get_range",
795+
skip_all,
796+
fields(
797+
location = %location,
798+
range.start = range.start,
799+
range.end = range.end,
800+
range.size = range.end - range.start,
801+
is_parquet = location.as_ref().ends_with(".parquet"),
802+
cache_hit = Empty,
803+
is_metadata = Empty,
804+
)
805+
)]
776806
async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
807+
let span = tracing::Span::current();
777808
let is_parquet = location.as_ref().ends_with(".parquet");
778809

779810
// First check if we have the full file cached
@@ -783,6 +814,7 @@ impl ObjectStore for FoyerObjectStoreCache {
783814
let ttl = self.get_ttl_for_path(location);
784815
if !value.is_expired(ttl) && range.end <= value.data.len() as u64 {
785816
self.update_stats(|s| s.hits += 1).await;
817+
span.record("cache_hit", true);
786818
debug!(
787819
"Foyer cache HIT (full file) for range: {} (range: {}..{}, size: {} bytes, parquet={}, age={}ms)",
788820
location,
@@ -812,6 +844,7 @@ impl ObjectStore for FoyerObjectStoreCache {
812844

813845
// Check if this is likely a metadata request (reading from near the end of the file)
814846
let is_metadata_request = range.start >= file_size.saturating_sub(metadata_size_hint);
847+
span.record("is_metadata", is_metadata_request);
815848

816849
if is_metadata_request {
817850
// For metadata requests, use the metadata cache
@@ -823,6 +856,7 @@ impl ObjectStore for FoyerObjectStoreCache {
823856
let ttl = self.config.ttl; // Use unified TTL
824857
if !value.is_expired(ttl) {
825858
self.update_metadata_stats(|s| s.hits += 1).await;
859+
span.record("cache_hit", true);
826860
debug!(
827861
"Metadata cache HIT for: {} (range: {}..{}, size: {} bytes, age={}ms)",
828862
location,
@@ -836,6 +870,7 @@ impl ObjectStore for FoyerObjectStoreCache {
836870
}
837871

838872
// Cache miss for metadata range - fetch just the range
873+
span.record("cache_hit", false);
839874
self.update_metadata_stats(|s| {
840875
s.misses += 1;
841876
s.inner_gets += 1;
@@ -847,7 +882,15 @@ impl ObjectStore for FoyerObjectStoreCache {
847882
);
848883

849884
let start_time = std::time::Instant::now();
850-
let data = self.inner.get_range(location, range.clone()).await?;
885+
let inner_span = tracing::trace_span!(parent: &span, "s3.get_range",
886+
location = %location,
887+
range.start = range.start,
888+
range.end = range.end,
889+
is_metadata = true
890+
);
891+
let data = self.inner.get_range(location, range.clone())
892+
.instrument(inner_span)
893+
.await?;
851894
let duration = start_time.elapsed();
852895

853896
debug!(
@@ -909,6 +952,7 @@ impl ObjectStore for FoyerObjectStoreCache {
909952
}
910953

911954
// Fallback to regular range request for non-parquet files
955+
span.record("cache_hit", false);
912956
self.update_stats(|s| {
913957
s.misses += 1;
914958
s.inner_gets += 1;
@@ -920,7 +964,14 @@ impl ObjectStore for FoyerObjectStoreCache {
920964
);
921965

922966
let start_time = std::time::Instant::now();
923-
let result = self.inner.get_range(location, range.clone()).await?;
967+
let inner_span = tracing::trace_span!(parent: &span, "s3.get_range",
968+
location = %location,
969+
range.start = range.start,
970+
range.end = range.end
971+
);
972+
let result = self.inner.get_range(location, range.clone())
973+
.instrument(inner_span)
974+
.await?;
924975
let duration = start_time.elapsed();
925976

926977
debug!(
@@ -936,17 +987,32 @@ impl ObjectStore for FoyerObjectStoreCache {
936987
Ok(result)
937988
}
938989

990+
#[instrument(
991+
name = "foyer_cache.head",
992+
skip_all,
993+
fields(
994+
location = %location,
995+
cache_hit = Empty,
996+
)
997+
)]
939998
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
999+
let span = tracing::Span::current();
9401000
let cache_key = Self::make_cache_key(location);
9411001

9421002
if let Ok(Some(entry)) = self.cache.get(&cache_key).await {
9431003
let value = entry.value();
9441004
let ttl = self.get_ttl_for_path(location);
9451005
if !value.is_expired(ttl) {
1006+
span.record("cache_hit", true);
9461007
return Ok(value.meta.clone());
9471008
}
9481009
}
949-
self.inner.head(location).await
1010+
1011+
span.record("cache_hit", false);
1012+
let inner_span = tracing::trace_span!(parent: &span, "s3.head", location = %location);
1013+
self.inner.head(location)
1014+
.instrument(inner_span)
1015+
.await
9501016
}
9511017

9521018
async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {

0 commit comments

Comments
 (0)