Skip to content

Commit 25ad99d

Browse files
nuno-fariaalamb
andauthored
feat: Add the ability to review the contents of the Metadata Cache (#17126)
* feat: Add the ability to review the contents of the Metadata Cache * Remove e_tag from test_metadata_cache * Add entry in the user doc about this function * Change type to UInt64 * Fix prettier --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent f603d34 commit 25ad99d

File tree

9 files changed

+665
-70
lines changed

9 files changed

+665
-70
lines changed

datafusion-cli/src/functions.rs

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@ use std::fs::File;
2222
use std::str::FromStr;
2323
use std::sync::Arc;
2424

25-
use arrow::array::{Int64Array, StringArray};
26-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
25+
use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
2727
use arrow::record_batch::RecordBatch;
2828
use arrow::util::pretty::pretty_format_batches;
2929
use datafusion::catalog::{Session, TableFunctionImpl};
3030
use datafusion::common::{plan_err, Column};
3131
use datafusion::datasource::memory::MemorySourceConfig;
3232
use datafusion::datasource::TableProvider;
3333
use datafusion::error::Result;
34+
use datafusion::execution::cache::cache_manager::CacheManager;
3435
use datafusion::logical_expr::Expr;
3536
use datafusion::physical_plan::ExecutionPlan;
3637
use datafusion::scalar::ScalarValue;
@@ -460,3 +461,121 @@ impl TableFunctionImpl for ParquetMetadataFunc {
460461
Ok(Arc::new(parquet_metadata))
461462
}
462463
}
464+
465+
/// METADATA_CACHE table function
466+
#[derive(Debug)]
467+
struct MetadataCacheTable {
468+
schema: SchemaRef,
469+
batch: RecordBatch,
470+
}
471+
472+
#[async_trait]
473+
impl TableProvider for MetadataCacheTable {
474+
fn as_any(&self) -> &dyn std::any::Any {
475+
self
476+
}
477+
478+
fn schema(&self) -> arrow::datatypes::SchemaRef {
479+
self.schema.clone()
480+
}
481+
482+
fn table_type(&self) -> datafusion::logical_expr::TableType {
483+
datafusion::logical_expr::TableType::Base
484+
}
485+
486+
async fn scan(
487+
&self,
488+
_state: &dyn Session,
489+
projection: Option<&Vec<usize>>,
490+
_filters: &[Expr],
491+
_limit: Option<usize>,
492+
) -> Result<Arc<dyn ExecutionPlan>> {
493+
Ok(MemorySourceConfig::try_new_exec(
494+
&[vec![self.batch.clone()]],
495+
TableProvider::schema(self),
496+
projection.cloned(),
497+
)?)
498+
}
499+
}
500+
501+
#[derive(Debug)]
502+
pub struct MetadataCacheFunc {
503+
cache_manager: Arc<CacheManager>,
504+
}
505+
506+
impl MetadataCacheFunc {
507+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
508+
Self { cache_manager }
509+
}
510+
}
511+
512+
impl TableFunctionImpl for MetadataCacheFunc {
513+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
514+
if !exprs.is_empty() {
515+
return plan_err!("metadata_cache should have no arguments");
516+
}
517+
518+
let schema = Arc::new(Schema::new(vec![
519+
Field::new("path", DataType::Utf8, false),
520+
Field::new(
521+
"file_modified",
522+
DataType::Timestamp(TimeUnit::Millisecond, None),
523+
false,
524+
),
525+
Field::new("file_size_bytes", DataType::UInt64, false),
526+
Field::new("e_tag", DataType::Utf8, true),
527+
Field::new("version", DataType::Utf8, true),
528+
Field::new("metadata_size_bytes", DataType::UInt64, false),
529+
Field::new("hits", DataType::UInt64, false),
530+
Field::new("extra", DataType::Utf8, true),
531+
]));
532+
533+
// construct record batch from metadata
534+
let mut path_arr = vec![];
535+
let mut file_modified_arr = vec![];
536+
let mut file_size_bytes_arr = vec![];
537+
let mut e_tag_arr = vec![];
538+
let mut version_arr = vec![];
539+
let mut metadata_size_bytes = vec![];
540+
let mut hits_arr = vec![];
541+
let mut extra_arr = vec![];
542+
543+
let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
544+
545+
for (path, entry) in cached_entries {
546+
path_arr.push(path.to_string());
547+
file_modified_arr
548+
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
549+
file_size_bytes_arr.push(entry.object_meta.size);
550+
e_tag_arr.push(entry.object_meta.e_tag);
551+
version_arr.push(entry.object_meta.version);
552+
metadata_size_bytes.push(entry.size_bytes as u64);
553+
hits_arr.push(entry.hits as u64);
554+
555+
let mut extra = entry
556+
.extra
557+
.iter()
558+
.map(|(k, v)| format!("{k}={v}"))
559+
.collect::<Vec<_>>();
560+
extra.sort();
561+
extra_arr.push(extra.join(" "));
562+
}
563+
564+
let batch = RecordBatch::try_new(
565+
schema.clone(),
566+
vec![
567+
Arc::new(StringArray::from(path_arr)),
568+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
569+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
570+
Arc::new(StringArray::from(e_tag_arr)),
571+
Arc::new(StringArray::from(version_arr)),
572+
Arc::new(UInt64Array::from(metadata_size_bytes)),
573+
Arc::new(UInt64Array::from(hits_arr)),
574+
Arc::new(StringArray::from(extra_arr)),
575+
],
576+
)?;
577+
578+
let metadata_cache = MetadataCacheTable { schema, batch };
579+
Ok(Arc::new(metadata_cache))
580+
}
581+
}

datafusion-cli/src/main.rs

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion::execution::memory_pool::{
3030
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3131
use datafusion::prelude::SessionContext;
3232
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
33-
use datafusion_cli::functions::ParquetMetadataFunc;
33+
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
3434
use datafusion_cli::{
3535
exec,
3636
pool_type::PoolType,
@@ -219,6 +219,14 @@ async fn main_inner() -> Result<()> {
219219
// register `parquet_metadata` table function to get metadata from parquet files
220220
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
221221

222+
// register `metadata_cache` table function to get the contents of the file metadata cache
223+
ctx.register_udtf(
224+
"metadata_cache",
225+
Arc::new(MetadataCacheFunc::new(
226+
ctx.task_ctx().runtime_env().cache_manager.clone(),
227+
)),
228+
);
229+
222230
let mut print_options = PrintOptions {
223231
format: args.format,
224232
quiet: args.quiet,
@@ -397,7 +405,7 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
397405
#[cfg(test)]
398406
mod tests {
399407
use super::*;
400-
use datafusion::common::test_util::batches_to_string;
408+
use datafusion::{common::test_util::batches_to_string, prelude::ParquetReadOptions};
401409
use insta::assert_snapshot;
402410

403411
fn assert_conversion(input: &str, expected: Result<usize, String>) {
@@ -512,4 +520,97 @@ mod tests {
512520

513521
Ok(())
514522
}
523+
524+
#[tokio::test]
525+
async fn test_metadata_cache() -> Result<(), DataFusionError> {
526+
let ctx = SessionContext::new();
527+
ctx.register_udtf(
528+
"metadata_cache",
529+
Arc::new(MetadataCacheFunc::new(
530+
ctx.task_ctx().runtime_env().cache_manager.clone(),
531+
)),
532+
);
533+
534+
ctx.register_parquet(
535+
"alltypes_plain",
536+
"../parquet-testing/data/alltypes_plain.parquet",
537+
ParquetReadOptions::new(),
538+
)
539+
.await?;
540+
541+
ctx.register_parquet(
542+
"alltypes_tiny_pages",
543+
"../parquet-testing/data/alltypes_tiny_pages.parquet",
544+
ParquetReadOptions::new(),
545+
)
546+
.await?;
547+
548+
ctx.register_parquet(
549+
"lz4_raw_compressed_larger",
550+
"../parquet-testing/data/lz4_raw_compressed_larger.parquet",
551+
ParquetReadOptions::new(),
552+
)
553+
.await?;
554+
555+
ctx.sql("select * from alltypes_plain")
556+
.await?
557+
.collect()
558+
.await?;
559+
ctx.sql("select * from alltypes_tiny_pages")
560+
.await?
561+
.collect()
562+
.await?;
563+
ctx.sql("select * from lz4_raw_compressed_larger")
564+
.await?
565+
.collect()
566+
.await?;
567+
568+
// initial state
569+
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order by filename";
570+
let df = ctx.sql(sql).await?;
571+
let rbs = df.collect().await?;
572+
573+
assert_snapshot!(batches_to_string(&rbs),@r#"
574+
+-----------------------------------+-----------------+---------------------+------+------------------+
575+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
576+
+-----------------------------------+-----------------+---------------------+------+------------------+
577+
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false |
578+
| alltypes_tiny_pages.parquet | 454233 | 881634 | 2 | page_index=true |
579+
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false |
580+
+-----------------------------------+-----------------+---------------------+------+------------------+
581+
"#);
582+
583+
// increase the number of hits
584+
ctx.sql("select * from alltypes_plain")
585+
.await?
586+
.collect()
587+
.await?;
588+
ctx.sql("select * from alltypes_plain")
589+
.await?
590+
.collect()
591+
.await?;
592+
ctx.sql("select * from alltypes_plain")
593+
.await?
594+
.collect()
595+
.await?;
596+
ctx.sql("select * from lz4_raw_compressed_larger")
597+
.await?
598+
.collect()
599+
.await?;
600+
let sql = "select split_part(path, '/', -1) as filename, file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order by filename";
601+
let df = ctx.sql(sql).await?;
602+
let rbs = df.collect().await?;
603+
604+
assert_snapshot!(batches_to_string(&rbs),@r#"
605+
+-----------------------------------+-----------------+---------------------+------+------------------+
606+
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
607+
+-----------------------------------+-----------------+---------------------+------+------------------+
608+
| alltypes_plain.parquet | 1851 | 10181 | 5 | page_index=false |
609+
| alltypes_tiny_pages.parquet | 454233 | 881634 | 2 | page_index=true |
610+
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 3 | page_index=false |
611+
+-----------------------------------+-----------------+---------------------+------+------------------+
612+
"#);
613+
614+
Ok(())
615+
}
515616
}

datafusion/datasource-parquet/src/reader.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions;
3030
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
3131
use parquet::file::metadata::ParquetMetaData;
3232
use std::any::Any;
33+
use std::collections::HashMap;
3334
use std::fmt::Debug;
3435
use std::ops::Range;
3536
use std::sync::Arc;
@@ -296,4 +297,10 @@ impl FileMetadata for CachedParquetMetaData {
296297
fn memory_size(&self) -> usize {
297298
self.0.memory_size()
298299
}
300+
301+
fn extra_info(&self) -> HashMap<String, String> {
302+
let page_index =
303+
self.0.column_index().is_some() && self.0.offset_index().is_some();
304+
HashMap::from([("page_index".to_owned(), page_index.to_string())])
305+
}
299306
}

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use datafusion_common::{Result, Statistics};
2121
use object_store::path::Path;
2222
use object_store::ObjectMeta;
2323
use std::any::Any;
24+
use std::collections::HashMap;
2425
use std::fmt::{Debug, Formatter};
2526
use std::sync::Arc;
2627

@@ -42,17 +43,36 @@ pub trait FileMetadata: Any + Send + Sync {
4243

4344
/// Returns the size of the metadata in bytes.
4445
fn memory_size(&self) -> usize;
46+
47+
/// Returns extra information about this entry (used by [`FileMetadataCache::list_entries`]).
48+
fn extra_info(&self) -> HashMap<String, String>;
4549
}
4650

4751
/// Cache to store file-embedded metadata.
4852
pub trait FileMetadataCache:
4953
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
5054
{
51-
// Returns the cache's memory limit in bytes.
55+
/// Returns the cache's memory limit in bytes.
5256
fn cache_limit(&self) -> usize;
5357

54-
// Updates the cache with a new memory limit in bytes.
58+
/// Updates the cache with a new memory limit in bytes.
5559
fn update_cache_limit(&self, limit: usize);
60+
61+
/// Retrieves the information about the entries currently cached.
62+
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
63+
}
64+
65+
#[derive(Debug, Clone, PartialEq, Eq)]
66+
/// Represents information about a cached metadata entry.
67+
/// This is used to expose the metadata cache contents to outside modules.
68+
pub struct FileMetadataCacheEntry {
69+
pub object_meta: ObjectMeta,
70+
/// Size of the cached metadata, in bytes.
71+
pub size_bytes: usize,
72+
/// Number of times this entry was retrieved.
73+
pub hits: usize,
74+
/// Additional object-specific information.
75+
pub extra: HashMap<String, String>,
5676
}
5777

5878
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {

0 commit comments

Comments
 (0)